Skip to main content

Core business services

This guide covers the core services that form Chive's business logic layer. These services sit between the firehose indexing pipeline and the storage backends.

Overview

Chive's core services handle:

  • PreprintService: Indexes preprints from the firehose into PostgreSQL and Elasticsearch
  • BlobProxyService: Proxies PDF and image blobs from user PDSes with 3-tier caching
  • MetricsService: Tracks view counts and downloads using Redis data structures
  • PDSSyncService: Detects stale indexes and triggers re-sync from source PDSes
  • ReviewService: Indexes reviews and endorsements with threaded discussions

All services follow ATProto compliance rules: they index data from the firehose but never write to user PDSes.

Architecture

Service dependencies

┌─────────────────────────────────────────────────────────────┐
│ Firehose Consumer │
│ (receives ATProto events) │
└──────────────────────────┬──────────────────────────────────┘

┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Preprint │ │ Review │ │ Metrics │
│ Service │ │ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐
│ PostgreSQL │ │ PostgreSQL │ │ Redis │
│Elasticsearch│ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘

┌─────────────┐ ┌─────────────┐
│ BlobProxy │ │ PDSSync │
│ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘
│ │
┌──────┴──────┐ ┌──────┴──────┐
│ Redis (L1) │ │ PostgreSQL │
│ R2/CDN (L2) │ │ IRepository │
│ PDS (L3) │ │ │
└─────────────┘ └─────────────┘

Data flow

  1. User creates a preprint record in their PDS
  2. PDS publishes event to the ATProto relay
  3. Chive's firehose consumer receives the event
  4. PreprintService indexes the record to PostgreSQL and Elasticsearch
  5. MetricsService tracks views when users access the preprint
  6. BlobProxyService fetches PDFs from the user's PDS on demand
  7. PDSSyncService periodically checks for stale records

PreprintService

Indexes preprints from firehose events into local storage for search and discovery.

Key methods

import { PreprintService } from '@/services/preprint/preprint-service.js';

const service = new PreprintService({
storage, // IStorageBackend (PostgreSQL adapter)
search, // ISearchEngine (Elasticsearch adapter)
repository, // IRepository (PDS client)
identity, // IIdentityResolver (DID resolver)
logger,
});

// Index a preprint from firehose event
await service.indexPreprint(preprint, {
uri: 'at://did:plc:abc/pub.chive.preprint.submission/xyz' as AtUri,
cid: 'bafyrei...' as CID,
pdsUrl: 'https://pds.example.com',
indexedAt: new Date(),
});

// Retrieve by URI
const result = await service.getPreprint(uri);

// List by author
const preprints = await service.getPreprintsByAuthor(authorDid);

// Handle deletion
await service.indexPreprintDelete(uri);

ATProto compliance

PreprintService stores BlobRefs (CID pointers), not blob data:

// Correct: Store BlobRef metadata
const storedPreprint: StoredPreprint = {
uri,
cid,
author: record.author,
title: record.title,
abstract: record.abstract,
pdfBlobRef: record.pdfBlobRef, // CID reference only
pdsUrl: metadata.pdsUrl, // Track source PDS
indexedAt: new Date(),
};

// Wrong: Never store blob content
// await storage.storeBlob(pdfBuffer); // FORBIDDEN

BlobProxyService

Serves PDFs and images through a 3-tier cache hierarchy.

Cache tiers

TierStorageTTLUse case
L1Redis1 hourHot blobs, repeated requests
L2Cloudflare R224 hoursWarm blobs, CDN edge delivery
L3User PDSN/ASource of truth

RedisCache usage

import { RedisCache } from '@/services/blob-proxy/redis-cache.js';

const cache = new RedisCache({
redis, // ioredis client
defaultTTL: 3600, // 1 hour
beta: 1.0, // Probabilistic early expiration factor
maxBlobSize: 5 * 1024 * 1024, // 5MB limit
keyPrefix: 'blob:',
logger,
});

// Cache a blob
await cache.set(cid, buffer, 'application/pdf', 7200);

// Retrieve with early expiration check
const result = await cache.get(cid);
if (result?.isEarlyFetch) {
// Background refresh before TTL expires
scheduleRefresh(cid);
}

// Check existence
const exists = await cache.has(cid);

// Remove from cache
await cache.delete(cid);

// Get cache stats
const stats = await cache.getStats();
console.log(`Keys: ${stats.keyCount}, Memory: ${stats.memoryUsage}`);

Probabilistic early expiration

The cache uses XFetch algorithm to prevent cache stampedes. As a cached entry approaches expiration, isEarlyFetch returns true with increasing probability, allowing background refresh before the TTL expires.

MetricsService

Tracks views, downloads, and trending preprints using Redis data structures.

Data structures

  • Counters (INCR) - Total view and download counts
  • HyperLogLog (PFADD) - Unique viewer estimation
  • Sorted Sets (ZADD) - Time-windowed metrics for trending

Recording metrics

import { MetricsService } from '@/services/metrics/metrics-service.js';

const service = new MetricsService({
storage,
redis,
logger,
keyPrefix: 'metrics:',
});

// Record a view (with optional viewer DID for unique tracking)
await service.recordView(preprintUri, viewerDid);

// Record a download
await service.recordDownload(preprintUri, viewerDid);

// Batch operations (uses Redis pipeline)
await service.batchIncrement([
{ type: 'view', uri: uri1, viewerDid: did1 },
{ type: 'view', uri: uri1, viewerDid: did2 },
{ type: 'download', uri: uri2, viewerDid: did1 },
]);

Querying metrics

// Get all metrics for a preprint
const metrics = await service.getMetrics(preprintUri);
console.log(`Total views: ${metrics.totalViews}`);
console.log(`Unique viewers: ${metrics.uniqueViews}`);
console.log(`Views (24h): ${metrics.views24h}`);
console.log(`Views (7d): ${metrics.views7d}`);

// Get view count only
const viewCount = await service.getViewCount(preprintUri);

// Get trending preprints
const trending = await service.getTrending('24h', 10);
for (const entry of trending) {
console.log(`${entry.uri}: ${entry.score} views`);
}

Persistence

MetricsService stores counters in Redis for speed. Call flushToDatabase() periodically to persist to PostgreSQL for durability:

// Flush Redis counters to PostgreSQL
const result = await service.flushToDatabase();
if (result.ok) {
console.log(`Flushed ${result.value} records`);
}

PDSSyncService

Detects when indexed records become stale and refreshes them from the source PDS.

Staleness detection

A record is stale when:

  1. The indexed CID differs from the current PDS CID
  2. The record hasn't been synced within the threshold (default: 7 days)
import { PDSSyncService } from '@/services/pds-sync/sync-service.js';
import { createResiliencePolicy } from '@/services/common/resilience.js';

const resiliencePolicy = createResiliencePolicy({
circuitBreaker: { name: 'pds-sync', failureThreshold: 5, timeout: 60000 },
retry: { name: 'pds-sync', maxAttempts: 3, baseDelay: 100 },
});

const service = new PDSSyncService({
storage,
repository,
resiliencePolicy,
logger,
});

// Check if a record is stale
const check = await service.checkStaleness(uri);
if (check.isStale) {
console.log(`Record ${uri} is stale`);
console.log(`Indexed CID: ${check.indexedCID}`);
console.log(`PDS CID: ${check.pdsCID}`);
}

// Refresh a record from PDS
const result = await service.refreshRecord(uri);
if (result.ok) {
console.log(`Changed: ${result.value.changed}`);
console.log(`Previous CID: ${result.value.previousCID}`);
console.log(`Current CID: ${result.value.currentCID}`);
}

// Track PDS update after indexing
await service.trackPDSUpdate(uri, cid, pdsUrl);

Background sync job

// Find stale records (older than 7 days)
const staleUris = service.detectStaleRecords();

// Refresh each stale record
for (const uri of staleUris) {
const result = await service.refreshRecord(uri);
if (!result.ok) {
logger.error('Failed to refresh record', result.error, { uri });
}
}

ReviewService

Indexes reviews and endorsements with support for threaded discussions.

Indexing reviews

import { ReviewService } from '@/services/review/review-service.js';

const service = new ReviewService({
storage,
logger,
});

// Index a review comment
const review: ReviewComment = {
$type: 'pub.chive.review.comment',
subject: { uri: preprintUri, cid: preprintCid },
text: 'The methodology section needs more detail on sample size.',
reviewType: 'methodology',
parent: parentReviewUri, // Optional: for threaded replies
createdAt: new Date().toISOString(),
};

await service.indexReview(review, metadata);

// Index an endorsement
const endorsement: Endorsement = {
$type: 'pub.chive.review.endorsement',
subject: { uri: preprintUri, cid: preprintCid },
endorsementType: 'reproducibility',
createdAt: new Date().toISOString(),
};

await service.indexEndorsement(endorsement, metadata);

Threading with ThreadingHandler

import { ThreadingHandler } from '@/services/review/threading-handler.js';

const handler = new ThreadingHandler({
maxDepth: 20,
sortByDate: true,
});

// Build thread tree from flat review list
const reviews = await service.getReviews(preprintUri);
const threads = handler.buildThreads(reviews);

// Each thread has nested replies
for (const thread of threads) {
console.log(`Root: ${thread.root.text}`);
console.log(`Replies: ${thread.totalReplies}`);

for (const reply of thread.replies) {
console.log(` Reply: ${reply.root.text}`);
}
}

// Flatten threads back to list
const flat = handler.flattenThreads(threads);

// Find specific thread containing a review
const found = handler.findThread(threads, reviewUri);

// Get ancestor chain for a nested review
const ancestors = handler.getAncestors(threads, nestedReviewUri);

Testing

Run integration tests

Integration tests require the Docker test stack:

# Start test infrastructure
./scripts/start-test-stack.sh

# Run integration tests
npm run test:integration

# Run specific service tests
npx vitest run tests/integration/services/preprint
npx vitest run tests/integration/services/metrics
npx vitest run tests/integration/services/blob-proxy
npx vitest run tests/integration/services/pds-sync
npx vitest run tests/integration/services/review

Run compliance tests

# Verify ATProto compliance
npm run test:compliance

# Run core services compliance tests
npx vitest run tests/compliance/core-services-compliance.test.ts

Run performance benchmarks

# Install k6
brew install k6 # macOS
# or: https://k6.io/docs/getting-started/installation/

# Run benchmarks
k6 run tests/performance/k6/scenarios/preprint-indexing.js
k6 run tests/performance/k6/scenarios/blob-proxy-load.js
k6 run tests/performance/k6/scenarios/search-query.js
k6 run tests/performance/k6/scenarios/metrics-recording.js

Performance targets

OperationTarget p95
Preprint indexing< 200ms
Blob cache hit (L1)< 50ms
Blob cache miss (L2/L3)< 200ms
Search query< 300ms
Metrics recording< 10ms

Error handling

All services return Result<T, Error> types for operations that can fail:

const result = await service.indexPreprint(preprint, metadata);

if (result.ok) {
console.log('Indexed successfully');
} else {
// Handle typed errors
if (result.error instanceof ValidationError) {
console.log('Invalid preprint data:', result.error.message);
} else if (result.error instanceof DatabaseError) {
console.log('Storage failed:', result.error.message);
}
}

DiscoveryService

Provides personalized preprint recommendations and related paper discovery using Semantic Scholar, OpenAlex, and citation graph data.

Key constraint

All discovery features recommend only preprints indexed in Chive. External APIs (Semantic Scholar, OpenAlex) are used as enrichment signals, not as recommendation sources.

Key methods

import { DiscoveryService } from '@/services/discovery/discovery-service.js';

const service = new DiscoveryService(
logger,
db,
searchEngine,
rankingService,
citationGraph
);

// Optional: Enable external API enrichment
service.setPluginManager(pluginManager);

// Enrich a preprint with external metadata
const enrichment = await service.enrichPreprint({
uri: preprintUri,
doi: '10.1234/example',
title: 'Example Paper',
});
console.log(`Citations: ${enrichment.citationCount}`);

// Find related Chive preprints
const related = await service.findRelatedPreprints(preprintUri, {
signals: ['citations', 'concepts', 'semantic'],
limit: 10,
minScore: 0.3,
});

// Get personalized For You feed
const recommendations = await service.getRecommendationsForUser(userDid, {
signals: ['fields', 'citations', 'semantic'],
limit: 20,
});

// Record user interaction for feedback loop
await service.recordInteraction(userDid, {
preprintUri,
type: 'dismiss',
recommendationId: 'rec-123',
});

// Get citation network data
const counts = await service.getCitationCounts(preprintUri);
const citing = await service.getCitingPapers(preprintUri, { limit: 20 });
const references = await service.getReferences(preprintUri, { limit: 20 });

Recommendation signals

SignalDescriptionWeight
fieldsUser's research fields (from profile or claimed papers)60%
citationsPapers citing user's work25%
semanticSPECTER2 similarity to claimed papers30%

RankingService

Personalizes search results based on user fields and query relevance.

Score components

ComponentWeightDescription
Field match60%Category overlap with user's research fields
Text relevance40%BM25-style text matching (title + abstract)
RecencyBoostExponential decay with 30-day half-life

Usage

import { RankingService } from '@/services/search/ranking-service.js';

const service = new RankingService(db, logger, textScorer, categoryMatcher, {
fieldWeight: 0.6,
textWeight: 0.4,
maxUserFields: 10,
mode: 'heuristic', // or 'ltr' for Learning to Rank
});

// Rank search results by personalized relevance
const ranked = await service.rank(searchResults, {
userDid: user.did,
query: 'attention mechanisms',
});

for (const { item, score, fieldMatchScore, textRelevanceScore } of ranked) {
console.log(`${item.title}: ${score}`);
}

// Get user's research fields (cached for 5 minutes)
const fields = await service.getUserFields(userDid);

// Rank with discovery signals
const rankedWithDiscovery = await service.rankWithDiscoverySignals(
items,
{ userDid, query },
{ s2Scores, citationScores, conceptScores }
);

LTR feature extraction

RankingService extracts feature vectors for Learning to Rank model training:

interface LTRFeatureVector {
textRelevance: number;
fieldMatchScore: number;
titleMatchScore: number;
abstractMatchScore: number;
recencyScore: number;
bm25Score: number;
originalPosition: number;
// Discovery signals
specter2Similarity?: number;
coCitationScore?: number;
conceptOverlapScore?: number;
}

ClaimingService

Manages authorship claims for imported preprints using multi-authority verification.

Evidence types and weights

Evidence TypeWeightSource
orcid-match35%ORCID API (cryptographically verified)
author-claim20%Self-attestation with evidence
semantic-scholar-match15%S2 claimed author profile
openreview-match15%OpenReview authenticated profile
openalex-match10%OpenAlex ORCID-linked author
arxiv-ownership10%arXiv author ownership system
institutional-email8%Handle domain verification
ror-affiliation5%ROR organization match
coauthor-overlap5%Network analysis
name-match2%Fuzzy name matching

Decision thresholds

DecisionScoreWorkflow
Auto-approve≥0.90Claim approved immediately
Expedited≥0.70Fast-track review
Manual≥0.50Standard review queue
Insufficient<0.50Rejected or needs more evidence

Claim workflow

import { ClaimingService } from '@/services/claiming/claiming-service.js';

const service = new ClaimingService(logger, db, importService, identity);
service.setPluginManager(pluginManager); // Enable external search

// Search external sources for preprints
const papers = await service.searchAllSources({
query: 'attention mechanisms',
author: 'Vaswani',
limit: 20,
});

// Start claim from external search result
const claim = await service.startClaimFromExternal(
'arxiv',
'2309.12345',
userDid
);

// Collect evidence from multiple authorities
const withEvidence = await service.collectEvidence(claim.id);
console.log(`Score: ${withEvidence.verificationScore}`);

// Check decision
const { score, decision } = service.computeScore(withEvidence.evidence);
if (decision === 'auto-approve') {
// User creates canonical record in their PDS
// Then complete the claim
await service.completeClaim(claim.id, canonicalUri);
}

// Get suggested papers for user based on profile
const suggestions = await service.getSuggestedPapers(userDid, {
limit: 20,
});
for (const paper of suggestions.papers) {
console.log(`${paper.title} - Match: ${paper.matchScore}%`);
}

ATProto compliance

The ClaimingService never writes to user PDSes. The claim flow is:

  1. User searches external sources via Chive
  2. User starts claim (stored in AppView database)
  3. Chive collects evidence from external authorities
  4. If approved, user creates canonical record in THEIR PDS
  5. Chive indexes the canonical record from firehose
  6. Claim marked complete with link to canonical URI

SearchService

Full-text search with Elasticsearch and faceted filtering.

Features

  • BM25 text ranking
  • PMEST faceted search (Personality, Matter, Energy, Space, Time)
  • Field highlighting
  • Autocomplete suggestions
  • Search tracking for analytics
import { SearchService } from '@/services/search/search-service.js';

const service = new SearchService(elasticsearch, storage, logger);

// Full-text search
const results = await service.search({
q: 'neural attention mechanisms',
limit: 20,
cursor: 'page2',
});

// Faceted search
const facetedResults = await service.facetedSearch({
q: 'machine learning',
facets: {
fields: ['cs.AI', 'cs.LG'],
sources: ['arxiv'],
dateRange: { from: '2024-01-01' },
},
});

// Get facet counts
const counts = await service.getFacetCounts({
q: 'neural networks',
});

// Autocomplete
const suggestions = await service.autocomplete('atten', 8);

BacklinkService

Tracks references to Chive preprints from Bluesky posts.

SourceDescription
bluesky_postBluesky post mentioning the preprint
bluesky_replyReply discussing the preprint
bluesky_quoteQuote post of the preprint
import { BacklinkService } from '@/services/backlink/backlink-service.js';

const service = new BacklinkService(db, firehoseClient, logger);

// Get backlinks for a preprint
const backlinks = await service.getBacklinks(preprintUri, {
limit: 20,
sourceTypes: ['bluesky_post', 'bluesky_quote'],
});

// Get backlink counts
const counts = await service.getBacklinkCounts(preprintUri);
console.log(`Total: ${counts.total}, Bluesky posts: ${counts.blueskyPosts}`);

ActivityService

Tracks user activity for analytics and discovery feedback.

Activity categories

CategoryActions
readview, dwell
engageclick, download, share
contributereview, endorse, tag, claim
import { ActivityService } from '@/services/activity/activity-service.js';

const service = new ActivityService(db, logger);

// Log an activity
await service.logActivity({
userDid,
category: 'read',
action: 'view',
targetUri: preprintUri,
collection: 'pub.chive.preprint.submission',
metadata: { source: 'search', query: 'neural networks' },
});

// Get user's activity feed
const feed = await service.getActivityFeed(userDid, {
category: 'contribute',
limit: 50,
});

// Mark activity as failed (for retry logic)
await service.markFailed(activityId, 'API error');

NotificationService

Server-sent events (SSE) for real-time notifications.

Notification types

TypeDescription
new_reviewNew review on user's preprint
review_replyReply to user's review
endorsementNew endorsement on user's preprint
claim_updateClaim status change
proposal_updateGovernance proposal update
import { NotificationService } from '@/services/notification/notification-service.js';

const service = new NotificationService(redis, db, logger);

// Subscribe to user notifications (returns SSE stream)
const stream = service.subscribe(userDid);

// Send notification
await service.send(userDid, {
type: 'new_review',
title: 'New review on your preprint',
body: 'Alice left a review on "Attention Is All You Need"',
targetUri: reviewUri,
preprintUri,
});

// Batch notifications
await service.sendBatch([
{ userDid: did1, notification: notif1 },
{ userDid: did2, notification: notif2 },
]);

KnowledgeGraphService

Manages the field taxonomy and governance proposals.

Field operations

import { KnowledgeGraphService } from '@/services/knowledge-graph/knowledge-graph-service.js';

const service = new KnowledgeGraphService(neo4j, storage, logger);

// Get field by ID
const field = await service.getField('cs.AI');

// Get field hierarchy
const children = await service.getFieldChildren('cs');
const parent = await service.getFieldParent('cs.AI');
const related = await service.getRelatedFields('cs.AI');

// Search fields
const matches = await service.searchFields('artificial intelligence');

// Get preprints in field
const preprints = await service.getFieldPreprints('cs.AI', {
limit: 20,
sort: 'recent',
});

Proposal operations

// Create field proposal
const proposal = await service.createProposal({
type: 'create',
title: 'Add Quantum Machine Learning field',
description: 'Proposed new subfield under cs.AI',
changes: {
fieldId: 'cs.QML',
name: 'Quantum Machine Learning',
parent: 'cs.AI',
},
proposerDid: userDid,
});

// Vote on proposal
await service.vote(proposal.id, userDid, {
vote: 'approve',
weight: 1.0, // Based on user tier
});

// Get proposal status
const status = await service.getProposalStatus(proposal.id);
console.log(`Approval: ${status.approvalPercentage}%`);

Common patterns

Result types

All services return Result<T, Error> types for operations that can fail:

const result = await service.someOperation();

if (result.ok) {
console.log('Success:', result.value);
} else {
if (result.error instanceof ValidationError) {
console.log('Validation failed:', result.error.message);
} else if (result.error instanceof NotFoundError) {
console.log('Not found:', result.error.message);
}
}

Resilience policies

Services use circuit breakers and retries for external API calls:

import { createResiliencePolicy } from '@/services/common/resilience.js';

const policy = createResiliencePolicy({
circuitBreaker: {
name: 'external-api',
failureThreshold: 5,
timeout: 60000,
},
retry: {
name: 'external-api',
maxAttempts: 3,
baseDelay: 100,
maxDelay: 5000,
},
});

Testing

Run integration tests

# Start test infrastructure
./scripts/start-test-stack.sh

# Run all integration tests
npm run test:integration

# Run specific service tests
npx vitest run tests/integration/services/discovery
npx vitest run tests/integration/services/claiming
npx vitest run tests/integration/services/search

Run compliance tests

npm run test:compliance

Performance targets

OperationTarget p95
Preprint indexing< 200ms
Search query< 300ms
Discovery recommendations< 500ms
Claim evidence collection< 2000ms
Blob cache hit (L1)< 50ms
Metrics recording< 10ms