Skip to main content
Use this page when the indexer runtime is clear but you need to design or extend the actual streams.

Event Streams Vs Account Streams

Stream typeBest forData model
Event streamImmutable logs such as transfers, mints, fills, or lifecycle eventsAppend-only rows keyed by event identity
Account streamCurrent on-chain account state such as balances, configuration accounts, or inventoryCurrent-state rows keyed by account identity
Use an event stream when the chain emits the thing you want directly. Use an account stream when the important answer is “what is the latest state of this account now?”

Event Stream Shape

An event stream needs:
  • name
  • schema
  • filter or filterFactory
  • parse(event)
Token transfer example:
const tokenTransfers = defineEventStream({
  name: "token-transfers",
  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
  },
  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },
  parse: (event) => {
    if (!event.payload || event.slot === undefined) return null;
    const tokenEvent = TokenEvent.from_array(event.payload);
    const transfer = tokenEvent?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
    };
  },
});

Account Stream Shape

An account stream needs:
  • name
  • schema
  • ownerProgram or ownerProgramFactory
  • optional expectedSize or dataSizes
  • parse(account)
Token account example:
import { encodeAddress } from "@thru/helpers";
import { TokenAccount } from "./abi/thru/program/token/types";

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  ownerProgramFactory: () => new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!)),
  expectedSize: 73,
  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    isFrozen: t.boolean().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
  },
  parse: (account) => {
    if (account.data.length !== 73) return null;
    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;
    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      isFrozen: parsed.is_frozen !== 0,
      slot: account.slot,
      seq: account.seq,
    };
  },
});

Practical Rules

  • Use filterFactory and ownerProgramFactory when values come from environment or config so migration tooling can still import the schema files safely.
  • Use expectedSize when the account layout is fixed and size mismatches should be skipped early.
  • Return null from parse when the event or account update should be ignored. It does not delete an existing row.
  • Export the generated .table from every stream so Drizzle can include it in migrations.

Next Step

Move to Running the Indexer once the stream definitions exist.