Skip to main content
Use this page when you want the smallest working indexing setup before designing a larger backend.

Install

pnpm add @thru/indexer @thru/replay @thru/helpers @thru/token-program postgres drizzle-orm hono @hono/zod-openapi
pnpm add -D drizzle-kit tsx typescript
This quickstart assumes:
  • a Node and TypeScript backend
  • PostgreSQL
  • Drizzle for schema and migrations
  • Hono only if you want generated API routes

Step 1: Define A Token Transfer Event Stream

Use a generated token ABI type to decode the token program event payload, then return one row per matching transfer.
import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterSchema, FilterParamValueSchema, type Event } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;

const tokenTransfers = defineEventStream({
  name: "token-transfers",
  description: "Transfer events emitted by the token program",

  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    indexedAt: t.timestamp().notNull().defaultNow(),
  },

  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },

  parse: (event: Event) => {
    if (!event.payload || event.slot === undefined) return null;

    const envelope = TokenEvent.from_array(event.payload);
    const transfer = envelope?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
      indexedAt: new Date(),
    };
  },

  api: { filters: ["source", "dest"] },
});

export const tokenTransferEvents = tokenTransfers.table;
export default tokenTransfers;

Step 2: Define A Token Account Stream

For token account state inside the indexer runtime, decode the raw account bytes directly with the generated token ABI type.
import { decodeAddress, encodeAddress } from "@thru/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "./abi/thru/program/token/types";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const TOKEN_ACCOUNT_SIZE = 73;

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  description: "Latest token account balances by address",

  ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)),
  expectedSize: TOKEN_ACCOUNT_SIZE,

  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    isFrozen: t.boolean().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
    updatedAt: t.timestamp().notNull().defaultNow(),
  },

  parse: (account) => {
    if (account.data.length !== TOKEN_ACCOUNT_SIZE) return null;

    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;

    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      isFrozen: parsed.is_frozen !== 0,
      slot: account.slot,
      seq: account.seq,
      updatedAt: new Date(),
    };
  },

  api: { filters: ["mint", "owner"], idField: "address" },
});

export const tokenAccountsTable = tokenAccounts.table;
export default tokenAccounts;

Step 3: Export The Tables For Drizzle

export { checkpointTable } from "@thru/indexer";
export { tokenTransferEvents } from "./streams/token-transfers";
export { tokenAccountsTable } from "./account-streams/token-accounts";

Step 4: Create The Indexer Runtime

import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
import tokenAccounts from "./account-streams/token-accounts";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
  defaultStartSlot: 0n,
  safetyMargin: 64,
  pageSize: 512,
  logLevel: "info",
});

await indexer.start();

Step 5: Add Routes Later If You Need Them

If you want generated read APIs for the indexed tables, add Querying Indexed Data next.

What You Get

After this setup:
  • token_transfer_events stores immutable token transfer rows
  • token_accounts stores the latest token account state by address
  • indexer_checkpoints tracks resumable progress per stream

Next Step

Open Streams next if you need to customize filters, schemas, parsing, or API metadata.