import { create } from "@bufbuild/protobuf";
import { decodeAddress, encodeAddress, encodeSignature } from "@thru/helpers";
import { defineEventStream, t } from "@thru/indexer";
import { FilterSchema, FilterParamValueSchema, type Event } from "@thru/replay";
import { TokenEvent } from "./abi/thru/program/token/types";
const TOKEN_PROGRAM = process.env.TOKEN_PROGRAM_ID!;
const tokenTransfers = defineEventStream({
name: "token-transfers",
description: "Transfer events emitted by the token program",
schema: {
id: t.text().primaryKey(),
slot: t.bigint().notNull().index(),
txnSignature: t.text().notNull(),
source: t.text().notNull().index(),
dest: t.text().notNull().index(),
amount: t.bigint().notNull(),
indexedAt: t.timestamp().notNull().defaultNow(),
},
filterFactory: () => {
const programBytes = new Uint8Array(decodeAddress(TOKEN_PROGRAM));
return create(FilterSchema, {
expression: "event.program.value == params.address",
params: {
address: create(FilterParamValueSchema, {
kind: { case: "bytesValue", value: programBytes },
}),
},
});
},
parse: (event: Event) => {
if (!event.payload || event.slot === undefined) return null;
const envelope = TokenEvent.from_array(event.payload);
const transfer = envelope?.payload()?.asTransfer();
if (!transfer) return null;
return {
id: event.eventId,
slot: event.slot,
txnSignature: encodeSignature(event.transactionSignature?.value ?? new Uint8Array()),
source: encodeAddress(new Uint8Array(transfer.source.get_bytes())),
dest: encodeAddress(new Uint8Array(transfer.dest.get_bytes())),
amount: transfer.amount,
indexedAt: new Date(),
};
},
api: { filters: ["source", "dest"] },
});
export const tokenTransferEvents = tokenTransfers.table;
export default tokenTransfers;