Skip to main content

Interface: IEventStreamConsumer

Defined in: src/types/interfaces/event-stream.interface.ts:202

Event stream consumer interface for firehose subscription.

Remarks

Provides subscription to AT Protocol firehose for real-time indexing.

Implementation notes:

  • Uses WebSocket connection to relay
  • Implements exponential backoff for reconnection
  • Persists cursor in database for resumption
  • Filters events by collection NSID

Methods

disconnect()

disconnect(): Promise<void>

Defined in: src/types/interfaces/event-stream.interface.ts:299

Disconnects from firehose.

Returns

Promise<void>

Promise resolving when disconnected

Remarks

Gracefully closes WebSocket connection and stops event iteration.

Example

await consumer.disconnect();
console.log('Disconnected from firehose');

getCurrentCursor()

getCurrentCursor(): Promise<null | number>

Defined in: src/types/interfaces/event-stream.interface.ts:259

Gets the current cursor position.

Returns

Promise<null | number>

Sequence number or null if never subscribed

Remarks

Returns the last successfully processed sequence number. Use this to resume subscription after restart.

Example

const cursor = await consumer.getCurrentCursor();
if (cursor) {
console.log('Resuming from sequence:', cursor);
}

saveCursor()

saveCursor(cursor): Promise<void>

Defined in: src/types/interfaces/event-stream.interface.ts:281

Saves cursor position for resumption.

Parameters

cursor

number

Sequence number to save

Returns

Promise<void>

Promise resolving when saved

Remarks

Persists the cursor position after successfully processing an event. Called after each event or in batches for efficiency.

Example

for await (const event of events) {
await processEvent(event);
await consumer.saveCursor(event.seq);
}

subscribe()

subscribe(options): AsyncIterable<RepoEvent>

Defined in: src/types/interfaces/event-stream.interface.ts:238

Subscribes to firehose events.

Parameters

options

SubscriptionOptions

Subscription configuration

Returns

AsyncIterable<RepoEvent>

Async iterable of events

Remarks

Returns an async iterable for memory-efficient streaming. Use for await...of to process events.

Connection is maintained until disconnect() is called or an unrecoverable error occurs.

Example

const events = consumer.subscribe({
relay: 'wss://bsky.network',
cursor: await consumer.getCurrentCursor(),
filter: {
collections: [toNSID('pub.chive.preprint.submission')!]
}
});

for await (const event of events) {
if (event.$type === 'com.atproto.sync.subscribeRepos#commit') {
for (const op of event.ops) {
console.log(`${op.action} ${op.path}`);
}
await consumer.saveCursor(event.seq);
}
}