Interface: IEventStreamConsumer
Defined in: src/types/interfaces/event-stream.interface.ts:202
Event stream consumer interface for firehose subscription.
Remarks
Provides subscription to AT Protocol firehose for real-time indexing.
Implementation notes:
- Uses WebSocket connection to relay
- Implements exponential backoff for reconnection
- Persists cursor in database for resumption
- Filters events by collection NSID
Methods
disconnect()
disconnect():
Promise<void>
Defined in: src/types/interfaces/event-stream.interface.ts:299
Disconnects from firehose.
Returns
Promise<void>
Promise resolving when disconnected
Remarks
Gracefully closes WebSocket connection and stops event iteration.
Example
await consumer.disconnect();
console.log('Disconnected from firehose');
getCurrentCursor()
getCurrentCursor():
Promise<null|number>
Defined in: src/types/interfaces/event-stream.interface.ts:259
Gets the current cursor position.
Returns
Promise<null | number>
Sequence number or null if never subscribed
Remarks
Returns the last successfully processed sequence number. Use this to resume subscription after restart.
Example
const cursor = await consumer.getCurrentCursor();
if (cursor) {
console.log('Resuming from sequence:', cursor);
}
saveCursor()
saveCursor(
cursor):Promise<void>
Defined in: src/types/interfaces/event-stream.interface.ts:281
Saves cursor position for resumption.
Parameters
cursor
number
Sequence number to save
Returns
Promise<void>
Promise resolving when saved
Remarks
Persists the cursor position after successfully processing an event. Called after each event or in batches for efficiency.
Example
for await (const event of events) {
await processEvent(event);
await consumer.saveCursor(event.seq);
}
subscribe()
subscribe(
options):AsyncIterable<RepoEvent>
Defined in: src/types/interfaces/event-stream.interface.ts:238
Subscribes to firehose events.
Parameters
options
Subscription configuration
Returns
AsyncIterable<RepoEvent>
Async iterable of events
Remarks
Returns an async iterable for memory-efficient streaming.
Use for await...of to process events.
Connection is maintained until disconnect() is called or an
unrecoverable error occurs.
Example
const events = consumer.subscribe({
relay: 'wss://bsky.network',
cursor: await consumer.getCurrentCursor(),
filter: {
collections: [toNSID('pub.chive.preprint.submission')!]
}
});
for await (const event of events) {
if (event.$type === 'com.atproto.sync.subscribeRepos#commit') {
for (const op of event.ops) {
console.log(`${op.action} ${op.path}`);
}
await consumer.saveCursor(event.seq);
}
}