Documentation Index
Fetch the complete documentation index at: https://docs.thru.org/llms.txt
Use this file to discover all available pages before exploring further.
Use this page when the indexer runtime is clear but you need to design or extend the actual streams.
Event Streams Vs Account Streams
| Stream type | Best for | Data model |
|---|
| Event stream | Immutable logs such as transfers, mints, fills, or lifecycle events | Append-only rows keyed by event identity |
| Account stream | Current on-chain account state such as balances, configuration accounts, or inventory | Current-state rows keyed by account identity |
Use an event stream when the chain emits the thing you want directly.
Use an account stream when the important answer is “what is the latest state of this account now?”
Event Stream Shape
An event stream needs:
name
schema
filter or filterFactory
parse(event)
Token transfer example:
const tokenTransfers = defineEventStream({
name: "token-transfers",
schema: {
id: t.text().primaryKey(),
slot: t.bigint().notNull().index(),
txnSignature: t.text().notNull(),
source: t.text().notNull().index(),
dest: t.text().notNull().index(),
amount: t.bigint().notNull(),
},
filterFactory: () => {
const programBytes = new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!));
return create(FilterSchema, {
expression: "event.program.value == params.address",
params: {
address: create(FilterParamValueSchema, {
kind: { case: "bytesValue", value: programBytes },
}),
},
});
},
parse: (event) => {
if (!event.payload || event.slot === undefined) return null;
const tokenEvent = TokenEvent.from_array(event.payload);
const transfer = tokenEvent?.payload()?.asTransfer();
if (!transfer) return null;
return {
id: event.eventId,
slot: event.slot,
txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
amount: transfer.amount,
};
},
});
Account Stream Shape
An account stream needs:
name
schema
ownerProgram or ownerProgramFactory
- optional
expectedSize or dataSizes
parse(account)
Token account example:
import { encodeAddress } from "@thru/helpers";
import { TokenAccount } from "./abi/thru/program/token/types";
const tokenAccounts = defineAccountStream({
name: "token-accounts",
ownerProgramFactory: () => new Uint8Array(decodeAddress(process.env.TOKEN_PROGRAM_ID!)),
expectedSize: 73,
schema: {
address: t.text().primaryKey(),
mint: t.text().notNull().index(),
owner: t.text().notNull().index(),
amount: t.bigint().notNull(),
isFrozen: t.boolean().notNull(),
slot: t.bigint().notNull(),
seq: t.bigint().notNull(),
},
parse: (account) => {
if (account.data.length !== 73) return null;
const parsed = TokenAccount.from_array(account.data);
if (!parsed) return null;
return {
address: encodeAddress(account.address),
mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
amount: parsed.amount,
isFrozen: parsed.is_frozen !== 0,
slot: account.slot,
seq: account.seq,
};
},
});
Practical Rules
- Use
filterFactory and ownerProgramFactory when values come from environment or config so migration tooling can still import the schema files safely.
- Use
expectedSize when the account layout is fixed and size mismatches should be skipped early.
- Return
null from parse when the event or account update should be ignored. It does not delete an existing row.
- Export the generated
.table from every stream so Drizzle can include it in migrations.
Next Steps