Skip to main content

IndexingService

The IndexingService orchestrates firehose consumption and event processing. It consumes the AT Protocol relay firehose, filters for pub.chive.* records, and dispatches events to appropriate handlers.

Architecture

FirehoseConsumer → EventFilter → CommitHandler → EventQueue → Processor → DLQ

Components

ComponentFilePurpose
FirehoseConsumerfirehose-consumer.tsWebSocket connection to relay
EventFilterevent-filter.tsFilters for pub.chive.* collections
CommitHandlercommit-handler.tsParses CAR files, extracts records
CursorManagercursor-manager.tsPersists cursor for resumption
EventQueueevent-queue.tsBullMQ async processing queue
ReconnectionManagerreconnection-manager.tsExponential backoff reconnection
DLQHandlerdlq-handler.tsDead letter queue for failures

Usage

import { IndexingService } from '@/services/indexing';

const indexingService = container.resolve(IndexingService);

// Start consuming firehose
await indexingService.start();

// Check status
const status = await indexingService.getStatus();
// { status: 'running', cursor: 12345678, lag: '2s', eventsProcessed: 50000 }

// Graceful shutdown
await indexingService.stop();

Firehose consumption

The service connects to the configured relay (default: wss://bsky.network) and subscribes to the com.atproto.sync.subscribeRepos stream:

// Connection options
const options = {
relay: process.env.RELAY_URL || 'wss://bsky.network',
cursor: await this.cursorManager.getCursor(),
collections: ['pub.chive.*'],
};

Event filtering

Only events matching pub.chive.* collections are processed:

const CHIVE_COLLECTIONS = [
'pub.chive.eprint.submission',
'pub.chive.eprint.version',
'pub.chive.eprint.changelog',
'pub.chive.eprint.tag',
'pub.chive.eprint.userTag',
'pub.chive.eprint.citation',
'pub.chive.eprint.relatedWork',
'pub.chive.review.comment',
'pub.chive.review.endorsement',
'pub.chive.review.entityLink',
'pub.chive.annotation.comment',
'pub.chive.annotation.entityLink',
'pub.chive.graph.node',
'pub.chive.graph.edge',
'pub.chive.graph.nodeProposal',
'pub.chive.graph.edgeProposal',
'pub.chive.graph.vote',
'pub.chive.actor.profile',
'pub.chive.actor.profileConfig',
];

Commit parsing

The CommitHandler supports two event formats: full firehose events (with CAR blocks) and Jetstream events (with pre-decoded records).

Event format detection

CommitHandler automatically detects the event format:

  1. Jetstream events: Operations have record fields already attached. No CAR block parsing required.
  2. Full firehose events: Operations reference CIDs; records must be decoded from CAR blocks.
// Jetstream event detection
const opsWithRecords = event.ops as readonly RepoOpWithRecord[];
const hasPreDecodedRecords = opsWithRecords.some(
(op) => (op.action === 'create' || op.action === 'update') && op.record !== undefined
);

if (hasPreDecodedRecords) {
// Return ops with pre-decoded records directly (Jetstream path)
return opsWithRecords.map((op) => ({
action: op.action,
path: op.path,
cid: op.cid,
record: op.record,
}));
}

// Fall through to CAR block parsing (full firehose path)

Jetstream events

Jetstream provides events with records pre-decoded as JSON. This eliminates the need for:

  • CAR (Content Addressable aRchive) file parsing
  • CBOR (Concise Binary Object Representation) decoding
  • CID-based block lookups

The FirehoseConsumer transforms Jetstream's JSON format into the standard RepoEvent format, attaching records directly to operations:

// Jetstream JSON format
{
"did": "did:plc:abc123",
"time_us": 1725911162329308,
"kind": "commit",
"commit": {
"rev": "...",
"operation": "create",
"collection": "pub.chive.eprint.submission",
"rkey": "3kj5h2k3j5h",
"record": { /* decoded record data */ },
"cid": "bafyrei..."
}
}

// Transformed to RepoEvent with attached record
{
"$type": "com.atproto.sync.subscribeRepos#commit",
"repo": "did:plc:abc123",
"commit": "bafyrei...",
"ops": [{
"action": "create",
"path": "pub.chive.eprint.submission/3kj5h2k3j5h",
"cid": "bafyrei...",
"record": { /* decoded record data */ } // Pre-attached
}],
"seq": 1725911162329308,
"time": "2024-09-09T..."
}

Full firehose events

Full firehose events include binary CAR files containing IPLD blocks. CommitHandler decodes these using:

  1. @ipld/car for CAR file parsing
  2. @ipld/dag-cbor for CBOR decoding
  3. multiformats/cid for CID parsing
interface ParsedCommit {
repo: string; // DID of the repository
rev: string; // Revision/CID of the commit
ops: Operation[]; // Create, update, delete operations
blobs: CID[]; // Referenced blob CIDs
}

interface Operation {
action: 'create' | 'update' | 'delete';
path: string; // Collection + rkey
cid?: CID; // Record CID (null for deletes)
record?: unknown; // Decoded record (null for deletes)
}

RepoOpWithRecord interface

For Jetstream events, operations include pre-decoded records:

/**
* Extended RepoOp with pre-decoded record (from Jetstream).
*/
interface RepoOpWithRecord extends RepoOp {
record?: unknown;
}

This interface extends the base RepoOp to include the optional record field that Jetstream populates.

Performance comparison

OperationFull FirehoseJetstream
FormatBinary CBORJSON
Record decodingRequired (CAR + CBOR)Not required
Block lookupsRequired (by CID)Not required
Network overheadHigher (binary)Lower (text)
CPU usageHigherLower
Cryptographic verificationPossibleNot possible

Jetstream is the default for Chive because:

  1. Lower CPU overhead (no CBOR decoding)
  2. Simpler processing (records already decoded)
  3. JSON format easier to debug
  4. Sufficient for indexing (cryptographic verification not required for read-only AppView)

Use the full firehose only when cryptographic verification of record content is required.

Event processing

Events are dispatched to the appropriate service based on collection:

async processEvent(event: RepoEvent): Promise<void> {
for (const op of event.ops) {
const collection = op.path.split('/')[0];

switch (collection) {
case 'pub.chive.eprint.submission':
await this.eprintService.indexEprint(op.record, {
uri: `at://${event.repo}/${op.path}`,
cid: op.cid,
pdsEndpoint: await this.resolvePds(event.repo)
});
break;
case 'pub.chive.eprint.version':
// Upsert version metadata into eprint_versions_index
// On delete: remove from eprint_versions_index
break;
case 'pub.chive.review.comment':
await this.reviewService.indexReview(op.record, metadata);
break;
case 'pub.chive.review.endorsement':
await this.reviewService.indexEndorsement(op.record, metadata);
break;
case 'pub.chive.graph.node':
case 'pub.chive.graph.edge':
await this.personalGraphService.indexRecord(op, metadata);
break;
case 'pub.chive.annotation.entityLink':
case 'pub.chive.review.entityLink':
await this.annotationService.indexEntityLink(op.record, metadata);
break;
// ... other collections
}
}
}

Cursor management

The cursor tracks the last processed event for resumption:

class CursorManager {
private cursor: number;
private flushInterval: NodeJS.Timeout;

async getCursor(): Promise<number> {
// Load from PostgreSQL on startup
const row = await this.db.query('SELECT cursor FROM indexing_state WHERE id = $1', [
'firehose',
]);
return row?.cursor ?? 0;
}

async updateCursor(cursor: number): Promise<void> {
this.cursor = cursor;
// Batched flush every 5 seconds
}

async flush(): Promise<void> {
await this.db.query('UPDATE indexing_state SET cursor = $1 WHERE id = $2', [
this.cursor,
'firehose',
]);
}
}

Error handling

Dead letter queue

Failed events are sent to a dead letter queue for manual inspection:

interface DLQEntry {
id: number;
seq: number;
repoDid: string;
eventType: string;
eventData: DLQEvent;
errorMessage: string;
errorType: ErrorType;
retryCount: number;
createdAt: Date;
lastRetryAt?: Date;
}

// Events are retried 3 times with exponential backoff
// After 3 failures, moved to DLQ

Error classification

Errors are classified to determine retry behavior:

enum ErrorType {
TRANSIENT = 'transient', // Network errors, timeouts - retry
PERMANENT = 'permanent', // Invalid record, schema mismatch - DLQ
RATE_LIMIT = 'rate_limit', // Rate limit - backoff and retry
}

Backpressure handling

The service implements backpressure to prevent memory exhaustion:

const MAX_QUEUE_SIZE = 10000;

if (this.eventQueue.size() > MAX_QUEUE_SIZE) {
this.logger.warn('Queue full, applying backpressure');
await this.pauseConsumer();
await this.eventQueue.drain(MAX_QUEUE_SIZE / 2);
await this.resumeConsumer();
}

Reconnection

The ReconnectionManager handles network failures:

const RECONNECTION_CONFIG = {
initialDelay: 1000, // 1 second
maxDelay: 300000, // 5 minutes
multiplier: 2, // Exponential backoff
jitter: 0.1, // 10% randomization
};

Graceful shutdown

The service drains queues and flushes cursor before shutdown:

async stop(): Promise<void> {
this.logger.info('Initiating graceful shutdown');

// Stop accepting new events
await this.consumer.disconnect();

// Wait for queue to drain (max 30 seconds)
await this.eventQueue.drain(30000);

// Flush cursor to database
await this.cursorManager.flush();

this.logger.info('Shutdown complete');
}

Metrics

The service exposes Prometheus metrics:

MetricTypeDescription
chive_indexing_events_totalCounterTotal events processed
chive_indexing_events_failedCounterFailed events
chive_indexing_lag_secondsGaugeTime behind firehose head
chive_indexing_queue_sizeGaugeCurrent queue depth
chive_indexing_cursorGaugeCurrent cursor position

Running the indexer

The indexer runs as a separate process from the API server. In production, it's deployed as the chive-indexer container.

Entry point

The indexer entry point is src/indexer.ts, which:

  1. Initializes all database connections (PostgreSQL, Redis, Elasticsearch, Neo4j)
  2. Creates the services needed for event processing (EprintService, ReviewService, KnowledgeGraphService, ActivityService)
  3. Creates the event processor using createEventProcessor()
  4. Starts the IndexingService to consume the firehose
  5. Handles graceful shutdown on SIGTERM/SIGINT

Development

# Run the indexer locally
pnpm exec tsx src/indexer.ts

Production (Docker)

In production, the indexer runs as the chive-indexer service:

chive-indexer:
image: chive:latest
command: ['node', '--enable-source-maps', 'dist/src/indexer.js']
environment:
- ATPROTO_RELAY_URL=wss://bsky.network
- INDEXER_CONCURRENCY=10

Why a separate process?

The indexer runs separately from the API for several reasons:

  1. Resource isolation: Firehose consumption is CPU/memory intensive
  2. Independent scaling: Can scale indexer independently from API
  3. Failure isolation: Indexer failures don't affect API availability
  4. Different lifecycle: Indexer runs continuously, API handles request/response

Configuration

interface IndexingConfig {
relay: string; // Relay WebSocket URL
collections: string[]; // Collections to filter
batchSize: number; // Events per batch
flushInterval: number; // Cursor flush interval (ms)
maxRetries: number; // Max retry attempts
queueConcurrency: number; // Parallel processors
}

Environment variables:

VariableDefaultDescription
RELAY_URLwss://bsky.networkAT Protocol relay URL
INDEXING_BATCH_SIZE100Events per batch
INDEXING_FLUSH_INTERVAL5000Cursor flush interval
INDEXING_MAX_RETRIES3Max retry attempts
INDEXING_QUEUE_CONCURRENCY10Parallel processors