Skip to main content

PDS discovery

The PDS discovery system enables Chive to index records from Personal Data Servers (PDSes) that are not connected to the main relay firehose. This ensures full coverage of Chive records across the AT Protocol network.

Problem

The primary data source for Chive is the relay firehose, which streams events from PDSes subscribed to the relay. However, some PDSes operate independently:

  • Self-hosted PDSes not connected to any relay
  • PDSes using alternative relays
  • New PDSes with records created before connecting to a relay

Without proactive discovery, records on these PDSes would never appear in Chive's index.

Components

The PDS Discovery system consists of three main components:

ComponentPurposeSource
PDSRegistryTracks known PDSes and their scan statesrc/services/pds-discovery/pds-registry.ts
PDSDiscoveryServiceDiscovers PDSes from various sourcessrc/services/pds-discovery/discovery-service.ts
PDSScannerScans PDSes for all pub.chive.* collectionssrc/services/pds-discovery/pds-scanner.ts

PDSRegistry

Maintains a database of known PDSes with their scan state:

interface PDSRegistryEntry {
pdsUrl: string;
discoveredAt: Date;
discoverySource: DiscoverySource;
status: PDSStatus;
lastScanAt?: Date;
nextScanAt?: Date;
hasChiveRecords?: boolean;
chiveRecordCount: number;
consecutiveFailures: number;
scanPriority: number;
}

type PDSStatus = 'pending' | 'active' | 'scanning' | 'unreachable' | 'no_chive_records';

type DiscoverySource =
| 'plc_enumeration' // Found via PLC directory
| 'relay_listhosts' // Found via relay's listHosts
| 'user_registration' // User-submitted PDS URL
| 'did_mention'; // Found via DID in indexed record

Key methods:

interface IPDSRegistry {
registerPDS(pdsUrl: string, source: DiscoverySource): Promise<void>;
getPDSesForScan(limit: number): Promise<PDSRegistryEntry[]>;
markScanStarted(pdsUrl: string): Promise<void>;
markScanCompleted(pdsUrl: string, result: ScanResult): Promise<void>;
markScanFailed(pdsUrl: string, error: string): Promise<void>;
getPDSStats(): Promise<{
total: number;
active: number;
withChiveRecords: number;
unreachable: number;
}>;
}

PDSDiscoveryService

Discovers PDSes from multiple sources:

1. PLC Directory enumeration

Streams the PLC directory export to find unique PDS endpoints:

const discoveryService = new PDSDiscoveryService(registry, logger, redis);

// Stream through PLC directory (rate-limited)
for await (const pds of discoveryService.discoverFromPLCDirectory()) {
console.log(`Found PDS: ${pds.pdsUrl} from ${pds.discoveredFrom}`);
}

2. Relay listHosts

Queries relays for their subscribed PDSes:

const pdses = await discoveryService.discoverFromRelay('wss://bsky.network');
// Returns all PDSes known to the relay

3. DID mentions

Extracts PDS endpoints from DIDs found in indexed records:

const authorDids = ['did:plc:abc123', 'did:plc:xyz789'];
const pdses = await discoveryService.discoverFromDIDMentions(authorDids);

PDSScanner

Scans PDSes for all pub.chive.* records and indexes them via the appropriate services.

Instantiation

The scanner requires service dependencies for each record type it indexes:

import { PDSScanner } from '@/services/pds-discovery/pds-scanner.js';
import { EprintService } from '@/services/eprint/eprint-service.js';
import { ReviewService } from '@/services/review/review-service.js';
import { CollectionService } from '@/services/collection/collection-service.js';
import { AnnotationService } from '@/services/annotation/annotation-service.js';
import { PersonalGraphService } from '@/services/personal-graph/personal-graph-service.js';

const scanner = new PDSScanner(
registry,
eprintService,
reviewService,
logger,
{ requestsPerMinute: 10, scanTimeoutMs: 60000, maxRecordsPerPDS: 1000 },
collectionService,
annotationService,
personalGraphService,
pool
);

// Scan a single PDS
const result = await scanner.scanPDS('https://pds.example.com');
console.log(`Found ${result.chiveRecordCount} records`);

// Scan multiple PDSes concurrently
const results = await scanner.scanMultiplePDSes(pdsUrls, 2);

Supported collections

The scanner indexes records from all pub.chive.* collections:

CollectionIndexed viaDescription
pub.chive.eprint.submissionEprintServiceCore eprint submissions
pub.chive.eprint.versionDirect SQL insertEprint version metadata
pub.chive.eprint.userTagDirect SQL insertUser-assigned tags on eprints
pub.chive.eprint.tagDirect SQL insertAuthor-assigned tags
pub.chive.eprint.citationDirect SQL insertExtracted and curated citations
pub.chive.eprint.relatedWorkDirect SQL insertRelated paper links
pub.chive.eprint.changelogEprintServiceEprint edit changelogs
pub.chive.review.commentReviewServiceReview comments with threading
pub.chive.review.endorsementReviewServiceEndorsements with contribution types
pub.chive.review.entityLinkAnnotationServiceEntity links on reviews
pub.chive.graph.nodePersonalGraphServiceKnowledge graph and personal nodes
pub.chive.graph.edgePersonalGraphServiceGraph edges (contains, subcollection-of, etc.)
pub.chive.graph.nodeProposal(logged only)Governance node proposals
pub.chive.graph.edgeProposal(logged only)Governance edge proposals
pub.chive.graph.vote(logged only)Governance votes
pub.chive.annotation.commentAnnotationServiceInline text annotations
pub.chive.annotation.entityLinkAnnotationServiceEntity links on annotations
pub.chive.actor.profileDirect SQL insertActor profile records
pub.chive.actor.profileConfigDirect SQL insertProfile display configuration

Governance records (nodeProposal, edgeProposal, vote) are logged during scans but not fully indexed. Governance indexing is planned for a future release.

Each collection is scanned with cursor-based pagination, fetching up to 100 records per page and continuing until all records are retrieved or the per-PDS maximum is reached.

Record routing

The scanner routes records to the appropriate service based on collection type:

// Simplified routing logic in indexRecord()
switch (collection) {
case 'pub.chive.eprint.submission':
const transformed = transformPDSRecord(record.value, uri, cid);
await this.eprintService.indexEprint(transformed, metadata);
break;

case 'pub.chive.eprint.version':
await this.indexVersion(uri, cid, record.value, pdsUrl);
break;

case 'pub.chive.review.comment':
await this.reviewService.indexReview(record.value, metadata);
break;

case 'pub.chive.review.endorsement':
await this.reviewService.indexEndorsement(record.value, metadata);
break;

case 'pub.chive.review.entityLink':
case 'pub.chive.annotation.entityLink':
await this.annotationService.indexEntityLink(record.value, metadata);
break;

case 'pub.chive.annotation.comment':
await this.annotationService.indexAnnotation(record.value, metadata);
break;

case 'pub.chive.graph.node':
await this.personalGraphService.indexNode(record.value, metadata);
break;

case 'pub.chive.graph.edge':
await this.personalGraphService.indexEdge(record.value, metadata);
break;

case 'pub.chive.eprint.userTag':
case 'pub.chive.eprint.tag':
await this.indexUserTag(uri, cid, record.value, pdsUrl);
break;

case 'pub.chive.graph.nodeProposal':
case 'pub.chive.graph.edgeProposal':
case 'pub.chive.graph.vote':
this.logger.debug('Scanned governance record', { uri, collection });
break;
}

Runtime validation

Records are validated at runtime using generated lexicon type guards before indexing. This approach avoids unsafe type assertions and ensures schema compliance.

For eprint submissions, the scanner uses transformPDSRecord() which performs structural validation:

import { transformPDSRecord } from '@/services/eprint/pds-record-transformer.js';

// Throws ValidationError if record is malformed
const eprint = transformPDSRecord(record.value, uri, cid);

For reviews and endorsements, ReviewService uses the generated isRecord type guards from the lexicon types:

// In ReviewService.indexReview()
import {
isRecord as isCommentRecord,
type Main as CommentRecord,
} from '@/lexicons/generated/types/pub/chive/review/comment.js';

async indexReview(record: unknown, metadata: RecordMetadata): Promise<Result<void, ValidationError>> {
// Runtime validation using generated type guard
if (!isCommentRecord(record)) {
return Err(new ValidationError(
'Record does not match pub.chive.review.comment schema',
'record',
'schema'
));
}

// TypeScript now knows record is CommentRecord
const comment = record as CommentRecord;
// ... proceed with indexing
}

The generated isRecord function checks that:

  1. The record is a non-null object
  2. The $type field matches the expected lexicon ID (e.g., pub.chive.review.comment)
  3. Required fields are present with correct types

This pattern provides type safety without relying on as casts or any types

User registration endpoint

Users can register their PDS for scanning via the pub.chive.sync.registerPDS endpoint:

POST /xrpc/pub.chive.sync.registerPDS

{
"pdsUrl": "https://my-pds.example.com"
}

Response:

{
"pdsUrl": "https://my-pds.example.com",
"registered": true,
"status": "scanned",
"message": "PDS registered and 5 record(s) indexed from your account."
}

If the user is authenticated, their DID is scanned immediately. Otherwise, the PDS is queued for the next scan cycle.

Scheduled scanning

The PDSScanSchedulerJob runs periodic scans:

const scanJob = new PDSScanSchedulerJob({
registry,
scanner,
logger,
scanIntervalMs: 900000, // 15 minutes
batchSize: 5,
concurrency: 2,
});

await scanJob.start();

Scan priority:

PriorityConditionScan frequency
HighHas Chive recordsEvery 24 hours
MediumNew/pending PDSNext scheduled run
LowNo Chive recordsEvery 7 days

Failed scans use exponential backoff (2^n hours, max 16 hours). After 5 consecutive failures, the PDS is marked as unreachable.

ATProto compliance

The PDS Discovery system is fully compliant with ATProto principles:

  • Read-only: Uses standard XRPC calls (listRepos, listRecords) to read from PDSes
  • Never writes: Chive never writes to user PDSes
  • Rebuildable: All indexed data can be rebuilt from source PDSes
  • Rate-limited: Respects PDS rate limits (configurable requests per minute)

Configuration

interface PDSDiscoveryConfig {
plcDirectoryUrl: string; // Default: 'https://plc.directory'
plcRateLimitPerSecond: number; // Default: 5
enabled: boolean; // Default: true
}

interface PDSScannerConfig {
requestsPerMinute: number; // Default: 10
scanTimeoutMs: number; // Default: 60000
maxRecordsPerPDS: number; // Default: 1000
}

Database schema

The registry uses a PostgreSQL table:

CREATE TABLE pds_registry (
pds_url TEXT PRIMARY KEY,
discovered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
discovery_source TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
last_scan_at TIMESTAMPTZ,
next_scan_at TIMESTAMPTZ,
has_chive_records BOOLEAN,
chive_record_count INTEGER NOT NULL DEFAULT 0,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
scan_priority INTEGER NOT NULL DEFAULT 100,
last_error TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Usage examples

Manual PDS registration

import { PDSRegistry } from '@/services/pds-discovery/pds-registry.js';

const registry = new PDSRegistry(pool, logger);
await registry.registerPDS('https://pds.example.com', 'user_registration');

Running a discovery cycle

import { PDSDiscoveryService } from '@/services/pds-discovery/discovery-service.js';

const discovery = new PDSDiscoveryService(registry, logger, redis);

// Discover from relays
const result = await discovery.runDiscoveryCycle(['wss://bsky.network', 'wss://bsky.social']);

console.log(`Discovered ${result.discovered} PDSes`);

Scanning a specific DID

import { PDSScanner } from '@/services/pds-discovery/pds-scanner.js';

const scanner = new PDSScanner(registry, eprintService, reviewService, logger);

// Scan a specific DID on a known PDS
const recordsIndexed = await scanner.scanDID('https://pds.example.com', 'did:plc:abc123');

The scanDID method scans all supported collections for the given DID and returns the total number of records indexed.

Metrics

The PDSScanner exposes Prometheus metrics for observability:

MetricTypeLabelsDescription
chive_pds_scan_duration_secondsHistogramstatusDuration of PDS scan operations
chive_pds_scans_totalCounterstatusTotal scan operations by status
chive_pds_records_scannedCountercollectionRecords scanned by collection
chive_pds_records_indexedCountercollection, statusRecords indexed by status
chive_pds_record_index_durationHistogramcollectionDuration of record indexing

Status values for chive_pds_records_indexed:

  • success: Record indexed successfully
  • error: Indexing failed (validation or database error)
  • skipped: Record type not yet supported (e.g., user tags)