IndexingService
The IndexingService orchestrates firehose consumption and event processing. It consumes the AT Protocol relay firehose, filters for pub.chive.* records, and dispatches events to appropriate handlers.
Architecture
FirehoseConsumer → EventFilter → CommitHandler → EventQueue → Processor → DLQ
Components
| Component | File | Purpose |
|---|---|---|
| FirehoseConsumer | firehose-consumer.ts | WebSocket connection to relay |
| EventFilter | event-filter.ts | Filters for pub.chive.* collections |
| CommitHandler | commit-handler.ts | Parses CAR files, extracts records |
| CursorManager | cursor-manager.ts | Persists cursor for resumption |
| EventQueue | event-queue.ts | BullMQ async processing queue |
| ReconnectionManager | reconnection-manager.ts | Exponential backoff reconnection |
| DLQHandler | dlq-handler.ts | Dead letter queue for failures |
Usage
import { IndexingService } from '@/services/indexing';
const indexingService = container.resolve(IndexingService);
// Start consuming firehose
await indexingService.start();
// Check status
const status = await indexingService.getStatus();
// { status: 'running', cursor: 12345678, lag: '2s', eventsProcessed: 50000 }
// Graceful shutdown
await indexingService.stop();
Firehose consumption
The service connects to the configured relay (default: wss://bsky.network) and subscribes to the com.atproto.sync.subscribeRepos stream:
// Connection options
const options = {
relay: process.env.RELAY_URL || 'wss://bsky.network',
cursor: await this.cursorManager.getCursor(),
collections: ['pub.chive.*'],
};
Event filtering
Only events matching pub.chive.* collections are processed:
const CHIVE_COLLECTIONS = [
'pub.chive.eprint.submission',
'pub.chive.eprint.version',
'pub.chive.eprint.changelog',
'pub.chive.eprint.tag',
'pub.chive.eprint.userTag',
'pub.chive.eprint.citation',
'pub.chive.eprint.relatedWork',
'pub.chive.review.comment',
'pub.chive.review.endorsement',
'pub.chive.review.entityLink',
'pub.chive.annotation.comment',
'pub.chive.annotation.entityLink',
'pub.chive.graph.node',
'pub.chive.graph.edge',
'pub.chive.graph.nodeProposal',
'pub.chive.graph.edgeProposal',
'pub.chive.graph.vote',
'pub.chive.actor.profile',
'pub.chive.actor.profileConfig',
];
Commit parsing
The CommitHandler supports two event formats: full firehose events (with CAR blocks) and Jetstream events (with pre-decoded records).
Event format detection
CommitHandler automatically detects the event format:
- Jetstream events: Operations have
recordfields already attached. No CAR block parsing required. - Full firehose events: Operations reference CIDs; records must be decoded from CAR blocks.
// Jetstream event detection
const opsWithRecords = event.ops as readonly RepoOpWithRecord[];
const hasPreDecodedRecords = opsWithRecords.some(
(op) => (op.action === 'create' || op.action === 'update') && op.record !== undefined
);
if (hasPreDecodedRecords) {
// Return ops with pre-decoded records directly (Jetstream path)
return opsWithRecords.map((op) => ({
action: op.action,
path: op.path,
cid: op.cid,
record: op.record,
}));
}
// Fall through to CAR block parsing (full firehose path)
Jetstream events
Jetstream provides events with records pre-decoded as JSON. This eliminates the need for:
- CAR (Content Addressable aRchive) file parsing
- CBOR (Concise Binary Object Representation) decoding
- CID-based block lookups
The FirehoseConsumer transforms Jetstream's JSON format into the standard RepoEvent format, attaching records directly to operations:
// Jetstream JSON format
{
"did": "did:plc:abc123",
"time_us": 1725911162329308,
"kind": "commit",
"commit": {
"rev": "...",
"operation": "create",
"collection": "pub.chive.eprint.submission",
"rkey": "3kj5h2k3j5h",
"record": { /* decoded record data */ },
"cid": "bafyrei..."
}
}
// Transformed to RepoEvent with attached record
{
"$type": "com.atproto.sync.subscribeRepos#commit",
"repo": "did:plc:abc123",
"commit": "bafyrei...",
"ops": [{
"action": "create",
"path": "pub.chive.eprint.submission/3kj5h2k3j5h",
"cid": "bafyrei...",
"record": { /* decoded record data */ } // Pre-attached
}],
"seq": 1725911162329308,
"time": "2024-09-09T..."
}
Full firehose events
Full firehose events include binary CAR files containing IPLD blocks. CommitHandler decodes these using:
@ipld/carfor CAR file parsing@ipld/dag-cborfor CBOR decodingmultiformats/cidfor CID parsing
interface ParsedCommit {
repo: string; // DID of the repository
rev: string; // Revision/CID of the commit
ops: Operation[]; // Create, update, delete operations
blobs: CID[]; // Referenced blob CIDs
}
interface Operation {
action: 'create' | 'update' | 'delete';
path: string; // Collection + rkey
cid?: CID; // Record CID (null for deletes)
record?: unknown; // Decoded record (null for deletes)
}
RepoOpWithRecord interface
For Jetstream events, operations include pre-decoded records:
/**
* Extended RepoOp with pre-decoded record (from Jetstream).
*/
interface RepoOpWithRecord extends RepoOp {
record?: unknown;
}
This interface extends the base RepoOp to include the optional record field that Jetstream populates.
Performance comparison
| Operation | Full Firehose | Jetstream |
|---|---|---|
| Format | Binary CBOR | JSON |
| Record decoding | Required (CAR + CBOR) | Not required |
| Block lookups | Required (by CID) | Not required |
| Network overhead | Higher (binary) | Lower (text) |
| CPU usage | Higher | Lower |
| Cryptographic verification | Possible | Not possible |
Jetstream is the default for Chive because:
- Lower CPU overhead (no CBOR decoding)
- Simpler processing (records already decoded)
- JSON format easier to debug
- Sufficient for indexing (cryptographic verification not required for read-only AppView)
Use the full firehose only when cryptographic verification of record content is required.
Event processing
Events are dispatched to the appropriate service based on collection:
async processEvent(event: RepoEvent): Promise<void> {
for (const op of event.ops) {
const collection = op.path.split('/')[0];
switch (collection) {
case 'pub.chive.eprint.submission':
await this.eprintService.indexEprint(op.record, {
uri: `at://${event.repo}/${op.path}`,
cid: op.cid,
pdsEndpoint: await this.resolvePds(event.repo)
});
break;
case 'pub.chive.eprint.version':
// Upsert version metadata into eprint_versions_index
// On delete: remove from eprint_versions_index
break;
case 'pub.chive.review.comment':
await this.reviewService.indexReview(op.record, metadata);
break;
case 'pub.chive.review.endorsement':
await this.reviewService.indexEndorsement(op.record, metadata);
break;
case 'pub.chive.graph.node':
case 'pub.chive.graph.edge':
await this.personalGraphService.indexRecord(op, metadata);
break;
case 'pub.chive.annotation.entityLink':
case 'pub.chive.review.entityLink':
await this.annotationService.indexEntityLink(op.record, metadata);
break;
// ... other collections
}
}
}
Cursor management
The cursor tracks the last processed event for resumption:
class CursorManager {
private cursor: number;
private flushInterval: NodeJS.Timeout;
async getCursor(): Promise<number> {
// Load from PostgreSQL on startup
const row = await this.db.query('SELECT cursor FROM indexing_state WHERE id = $1', [
'firehose',
]);
return row?.cursor ?? 0;
}
async updateCursor(cursor: number): Promise<void> {
this.cursor = cursor;
// Batched flush every 5 seconds
}
async flush(): Promise<void> {
await this.db.query('UPDATE indexing_state SET cursor = $1 WHERE id = $2', [
this.cursor,
'firehose',
]);
}
}
Error handling
Dead letter queue
Failed events are sent to a dead letter queue for manual inspection:
interface DLQEntry {
id: number;
seq: number;
repoDid: string;
eventType: string;
eventData: DLQEvent;
errorMessage: string;
errorType: ErrorType;
retryCount: number;
createdAt: Date;
lastRetryAt?: Date;
}
// Events are retried 3 times with exponential backoff
// After 3 failures, moved to DLQ
Error classification
Errors are classified to determine retry behavior:
enum ErrorType {
TRANSIENT = 'transient', // Network errors, timeouts - retry
PERMANENT = 'permanent', // Invalid record, schema mismatch - DLQ
RATE_LIMIT = 'rate_limit', // Rate limit - backoff and retry
}
Backpressure handling
The service implements backpressure to prevent memory exhaustion:
const MAX_QUEUE_SIZE = 10000;
if (this.eventQueue.size() > MAX_QUEUE_SIZE) {
this.logger.warn('Queue full, applying backpressure');
await this.pauseConsumer();
await this.eventQueue.drain(MAX_QUEUE_SIZE / 2);
await this.resumeConsumer();
}
Reconnection
The ReconnectionManager handles network failures:
const RECONNECTION_CONFIG = {
initialDelay: 1000, // 1 second
maxDelay: 300000, // 5 minutes
multiplier: 2, // Exponential backoff
jitter: 0.1, // 10% randomization
};
Graceful shutdown
The service drains queues and flushes cursor before shutdown:
async stop(): Promise<void> {
this.logger.info('Initiating graceful shutdown');
// Stop accepting new events
await this.consumer.disconnect();
// Wait for queue to drain (max 30 seconds)
await this.eventQueue.drain(30000);
// Flush cursor to database
await this.cursorManager.flush();
this.logger.info('Shutdown complete');
}
Metrics
The service exposes Prometheus metrics:
| Metric | Type | Description |
|---|---|---|
chive_indexing_events_total | Counter | Total events processed |
chive_indexing_events_failed | Counter | Failed events |
chive_indexing_lag_seconds | Gauge | Time behind firehose head |
chive_indexing_queue_size | Gauge | Current queue depth |
chive_indexing_cursor | Gauge | Current cursor position |
Running the indexer
The indexer runs as a separate process from the API server. In production, it's deployed as the chive-indexer container.
Entry point
The indexer entry point is src/indexer.ts, which:
- Initializes all database connections (PostgreSQL, Redis, Elasticsearch, Neo4j)
- Creates the services needed for event processing (EprintService, ReviewService, KnowledgeGraphService, ActivityService)
- Creates the event processor using
createEventProcessor() - Starts the IndexingService to consume the firehose
- Handles graceful shutdown on SIGTERM/SIGINT
Development
# Run the indexer locally
pnpm exec tsx src/indexer.ts
Production (Docker)
In production, the indexer runs as the chive-indexer service:
chive-indexer:
image: chive:latest
command: ['node', '--enable-source-maps', 'dist/src/indexer.js']
environment:
- ATPROTO_RELAY_URL=wss://bsky.network
- INDEXER_CONCURRENCY=10
Why a separate process?
The indexer runs separately from the API for several reasons:
- Resource isolation: Firehose consumption is CPU/memory intensive
- Independent scaling: Can scale indexer independently from API
- Failure isolation: Indexer failures don't affect API availability
- Different lifecycle: Indexer runs continuously, API handles request/response
Configuration
interface IndexingConfig {
relay: string; // Relay WebSocket URL
collections: string[]; // Collections to filter
batchSize: number; // Events per batch
flushInterval: number; // Cursor flush interval (ms)
maxRetries: number; // Max retry attempts
queueConcurrency: number; // Parallel processors
}
Environment variables:
| Variable | Default | Description |
|---|---|---|
RELAY_URL | wss://bsky.network | AT Protocol relay URL |
INDEXING_BATCH_SIZE | 100 | Events per batch |
INDEXING_FLUSH_INTERVAL | 5000 | Cursor flush interval |
INDEXING_MAX_RETRIES | 3 | Max retry attempts |
INDEXING_QUEUE_CONCURRENCY | 10 | Parallel processors |