Skip to main content
Use this guide to go from zero to a production-oriented Thru indexer. It starts with the smallest working setup and then walks through the concerns you will need for a real backend service.

Prerequisites

  • Node 20+ and TypeScript
  • PostgreSQL
  • Drizzle for schema and migrations
  • Hono (optional, for generated API routes)

Install

pnpm add @thru/indexer @thru/replay @thru/helpers @thru/token-program postgres drizzle-orm hono @hono/zod-openapi
pnpm add -D drizzle-kit tsx typescript

Step 1: Define A Token Transfer Event Stream

Use a generated token ABI type to decode the token program event payload, then return one row per matching transfer.
import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterSchema, FilterParamValueSchema, type Event } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;

const tokenTransfers = defineEventStream({
  name: "token-transfers",
  description: "Transfer events emitted by the token program",

  schema: {
    id: t.text().primaryKey(),
    slot: t.bigint().notNull().index(),
    txnSignature: t.text().notNull(),
    source: t.text().notNull().index(),
    dest: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    indexedAt: t.timestamp().notNull().defaultNow(),
  },

  filterFactory: () => {
    const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
    return create(FilterSchema, {
      expression: "event.program.value == params.address",
      params: {
        address: create(FilterParamValueSchema, {
          kind: { case: "bytesValue", value: programBytes },
        }),
      },
    });
  },

  parse: (event: Event) => {
    if (!event.payload || event.slot === undefined) return null;

    const envelope = TokenEvent.from_array(event.payload);
    const transfer = envelope?.payload()?.asTransfer();
    if (!transfer) return null;

    return {
      id: event.eventId,
      slot: event.slot,
      txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
      source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
      dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
      amount: transfer.amount,
      indexedAt: new Date(),
    };
  },

  api: { filters: ["source", "dest"] },
});

export const tokenTransferEvents = tokenTransfers.table;
export default tokenTransfers;

Step 2: Define A Token Account Stream

For token account state inside the indexer runtime, decode the raw account bytes directly with the generated token ABI type.
import { decodeAddress, encodeAddress } from "@thru/helpers";
import { defineAccountStream, t } from "@thru/indexer";
import { TokenAccount } from "./abi/thru/program/token/types";

const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const TOKEN_ACCOUNT_SIZE = 73;

const tokenAccounts = defineAccountStream({
  name: "token-accounts",
  description: "Latest token account balances by address",

  ownerProgramFactory: () => new Uint8Array(decodeAddress(TOKEN_PROGRAM)),
  expectedSize: TOKEN_ACCOUNT_SIZE,

  schema: {
    address: t.text().primaryKey(),
    mint: t.text().notNull().index(),
    owner: t.text().notNull().index(),
    amount: t.bigint().notNull(),
    isFrozen: t.boolean().notNull(),
    slot: t.bigint().notNull(),
    seq: t.bigint().notNull(),
    updatedAt: t.timestamp().notNull().defaultNow(),
  },

  parse: (account) => {
    if (account.data.length !== TOKEN_ACCOUNT_SIZE) return null;

    const parsed = TokenAccount.from_array(account.data);
    if (!parsed) return null;

    return {
      address: encodeAddress(account.address),
      mint: encodeAddress(new Uint8Array(parsed.mint.get_bytes())),
      owner: encodeAddress(new Uint8Array(parsed.owner.get_bytes())),
      amount: parsed.amount,
      isFrozen: parsed.is_frozen !== 0,
      slot: account.slot,
      seq: account.seq,
      updatedAt: new Date(),
    };
  },

  api: { filters: ["mint", "owner"], idField: "address" },
});

export const tokenAccountsTable = tokenAccounts.table;
export default tokenAccounts;

Step 3: Export The Tables For Drizzle

export { checkpointTable } from "@thru/indexer";
export { tokenTransferEvents } from "./streams/token-transfers";
export { tokenAccountsTable } from "./account-streams/token-accounts";

Step 4: Create The Indexer Runtime

import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
import tokenAccounts from "./account-streams/token-accounts";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
  defaultStartSlot: 0n,
  safetyMargin: 64,
  pageSize: 512,
  logLevel: "info",
});

await indexer.start();

Step 5: Add Routes Later If You Need Them

If you want generated read APIs for the indexed tables, add Querying Indexed Data next.

What You Get

After this setup:
  • token_transfer_events stores immutable token transfer rows
  • token_accounts stores the latest token account state by address
  • indexer_checkpoints tracks resumable progress per stream

Production Concerns

The steps above give you a working indexer. The sections below cover what you need when you move that indexer into a real backend service.

Separate Worker And API Processes

In production, run the indexer worker and the API server as separate processes. This keeps write-heavy backfill work from competing with read traffic, and lets you scale or restart each side independently. A common layout:
src/
  streams/          # shared stream definitions
  account-streams/  # shared account stream definitions
  schema.ts         # shared Drizzle schema exports
  db.ts             # shared database client
  worker.ts         # indexer process — runs Indexer.start()
  api.ts            # API process — mounts routes and serves HTTP
The worker process owns the Indexer instance and calls indexer.start(). It writes rows and updates checkpoints.
// worker.ts
import { Indexer } from "@thru/indexer";
import { ChainClient } from "@thru/replay";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
import tokenAccounts from "./account-streams/token-accounts";

const indexer = new Indexer({
  db,
  clientFactory: () => new ChainClient({ baseUrl: process.env.CHAIN_RPC_URL! }),
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
  defaultStartSlot: 0n,
  safetyMargin: 64,
  pageSize: 512,
  logLevel: "info",
});

process.on("SIGTERM", () => indexer.stop());
process.on("SIGINT", () => indexer.stop());

await indexer.start();
The API process mounts generated or custom routes and serves HTTP. It reads from the same database but never runs the indexer.
// api.ts
import { serve } from "@hono/node-server";
import { OpenAPIHono } from "@hono/zod-openapi";
import { mountStreamRoutes } from "@thru/indexer";
import { db } from "./db";
import tokenTransfers from "./streams/token-transfers";
import tokenAccounts from "./account-streams/token-accounts";

const app = new OpenAPIHono();

mountStreamRoutes(app, {
  db,
  pathPrefix: "/api/v1",
  eventStreams: [tokenTransfers],
  accountStreams: [tokenAccounts],
});

serve({ fetch: app.fetch, port: 3001 });
Both processes share the stream definitions and the Drizzle schema, but only the worker writes. You can deploy each as its own container, service, or process manager entry.

Resumability

@thru/indexer stores a checkpoint row for each stream in the indexer_checkpoints table. When the worker restarts, it reads the last committed slot from the checkpoint and resumes from there. To make this work:
  1. Export checkpointTable from your Drizzle schema (see Step 3).
  2. Run migrations so the indexer_checkpoints table exists before starting the worker.
  3. Set defaultStartSlot to the slot you want first-time runs to begin from. Subsequent runs ignore this value and use the checkpoint.
The safetyMargin option controls how far behind the live chain tip the indexer stays during the backfill-to-live switchover. A margin of 64 slots is a safe default that prevents writing rows that could be affected by short-lived forks.

Indexer Status Surface

Expose a health or status endpoint that reads the checkpoint table so you can monitor indexer progress.
import { getAllCheckpoints } from "@thru/indexer";
import { db } from "./db";

// Inside your API app
app.get("/status/indexer", async (c) => {
  const checkpoints = await getAllCheckpoints(db);
  return c.json({
    streams: checkpoints.map((cp) => ({
      name: cp.streamName,
      lastSlot: cp.lastSlot.toString(),
      updatedAt: cp.updatedAt,
    })),
  });
});
This gives you a single endpoint to check whether the indexer is keeping up. Wire it into your monitoring or alerting system by comparing lastSlot against the current chain tip from ChainClient.

Custom SQL Over Indexed Tables

Generated routes cover filtered lists and single-row lookups. For anything more complex, query the indexed tables directly with Drizzle. Common patterns:
import { sql, desc, eq, gte } from "drizzle-orm";
import { db } from "./db";
import { tokenTransferEvents, tokenAccountsTable } from "./schema";

// Aggregate: total transferred volume per source in the last 1000 slots
const volumeBySource = await db
  .select({
    source: tokenTransferEvents.source,
    totalAmount: sql<bigint>`sum(${tokenTransferEvents.amount})`,
  })
  .from(tokenTransferEvents)
  .where(gte(tokenTransferEvents.slot, latestSlot - 1000n))
  .groupBy(tokenTransferEvents.source)
  .orderBy(desc(sql`sum(${tokenTransferEvents.amount})`))
  .limit(50);

// Join: token accounts with their most recent transfer
const accountsWithLastTransfer = await db
  .select()
  .from(tokenAccountsTable)
  .leftJoin(
    tokenTransferEvents,
    eq(tokenAccountsTable.address, tokenTransferEvents.dest),
  )
  .where(eq(tokenAccountsTable.owner, ownerAddress))
  .orderBy(desc(tokenTransferEvents.slot))
  .limit(20);
Because the indexer writes standard Drizzle tables, you can also use raw SQL through db.execute(sql…) for one-off analytics or reporting queries. See Querying Indexed Data for more on when to use direct queries vs generated routes.

Live Validation Against A Deployed Program

When your indexer parses events or accounts from a program you control, validate that the indexed data matches on-chain state. This catches ABI drift, parsing bugs, or missed events before they become stale data in production. A straightforward approach is to periodically sample a row from the indexed table and compare it against a live RPC read:
import { ChainClient } from "@thru/replay";
import { encodeAddress } from "@thru/helpers";
import { desc } from "drizzle-orm";
import { TokenAccount } from "./abi/thru/program/token/types";
import { db } from "./db";
import { tokenAccountsTable } from "./schema";

async function validateSample(client: ChainClient) {
  // Pick a recently updated row
  const [row] = await db
    .select()
    .from(tokenAccountsTable)
    .orderBy(desc(tokenAccountsTable.slot))
    .limit(1);

  if (!row) return;

  // Fetch the same account from the chain
  const liveAccount = await client.getAccount(row.address);
  if (!liveAccount?.data) return;

  const parsed = TokenAccount.from_array(liveAccount.data);
  if (!parsed) {
    console.error(`Validation failed: could not parse live account ${row.address}`);
    return;
  }

  const liveAmount = parsed.amount;
  if (liveAmount !== row.amount) {
    console.error(
      `Validation mismatch for ${row.address}: indexed=${row.amount}, live=${liveAmount}`,
    );
  }
}
Run this check on a timer or as part of your status surface. You do not need to cover every row. Sampling a handful of recent rows per cycle is enough to catch systemic issues early.

Next Steps