Skip to main content

Core business services

This guide covers the core services that form Chive's business logic layer. These services sit between the firehose indexing pipeline and the storage backends.

Overview

Chive's core services handle:

  • EprintService: Indexes eprints from the firehose into PostgreSQL and Elasticsearch
  • BlobProxyService: Proxies PDF and image blobs from user PDSes with 3-tier caching
  • MetricsService: Tracks view counts and downloads using Redis data structures
  • PDSSyncService: Detects stale indexes and triggers re-sync from source PDSes
  • AnnotationService: Indexes inline text annotations and entity links with threaded discussions
  • ReviewService: Indexes document-level reviews and endorsements
  • RecordMigrator: Transforms legacy PDS records at index time to match the current schema revision

All services follow ATProto compliance rules: they index data from the firehose but never write to user PDSes.

Architecture

Service dependencies

Data flow

  1. User creates a record in their PDS
  2. PDS publishes event to the ATProto relay
  3. Chive's firehose consumer receives the event and routes it by collection:
    • Eprint submissions (pub.chive.eprint.submission)
    • Inline annotations (pub.chive.annotation.comment)
    • Entity links (pub.chive.annotation.entityLink, pub.chive.review.entityLink)
    • Document-level reviews and endorsements
  4. EprintService indexes eprint records to PostgreSQL and Elasticsearch
  5. AnnotationService indexes annotations and entity links to PostgreSQL
  6. ReviewService indexes document-level reviews and endorsements
  7. MetricsService tracks views when users access the eprint
  8. BlobProxyService fetches PDFs from the user's PDS on demand
  9. PDSSyncService periodically checks for stale records

Service layer

ServiceResponsibility
EprintServiceEprint indexing, search, metadata storage
BlobProxyServicePDF/image proxy with 3-tier cache
MetricsServiceView counts, downloads, trending
PDSSyncServiceStaleness detection, PDS re-sync
AnnotationServiceInline annotations, entity links, threading
ReviewServiceDocument-level reviews, endorsements

EprintService

Indexes eprints from firehose events into local storage for search and discovery.

Key methods

import { EprintService } from '@/services/eprint/eprint-service.js';

const service = new EprintService({
storage, // IStorageBackend (PostgreSQL adapter)
search, // ISearchEngine (Elasticsearch adapter)
repository, // IRepository (PDS client)
identity, // IIdentityResolver (DID resolver)
logger,
});

// Index an eprint from firehose event
await service.indexEprint(eprint, {
uri: 'at://did:plc:abc/pub.chive.eprint.submission/xyz' as AtUri,
cid: 'bafyrei...' as CID,
pdsUrl: 'https://pds.example.com',
indexedAt: new Date(),
});

// Retrieve by URI
const result = await service.getEprint(uri);

// List by author
const eprints = await service.getEprintsByAuthor(authorDid);

// Handle deletion
await service.indexEprintDelete(uri);

ATProto compliance

EprintService stores BlobRefs (CID pointers), not blob data:

// Correct: Store BlobRef metadata
const storedEprint: StoredEprint = {
uri,
cid,
author: record.author,
title: record.title,
abstract: record.abstract,
documentBlobRef: record.document, // CID reference only
documentFormat: 'pdf',
pdsUrl: metadata.pdsUrl, // Track source PDS
indexedAt: new Date(),
};

// Wrong: Never store blob content
// await storage.storeBlob(pdfBuffer); // FORBIDDEN

BlobProxyService

Serves PDFs and images through a 3-tier cache hierarchy.

Cache tiers

TierStorageTTLUse case
L1Redis1 hourHot blobs, repeated requests
L2Cloudflare R224 hoursWarm blobs, CDN edge delivery
L3User PDSN/ASource of truth

RedisCache usage

import { RedisCache } from '@/services/blob-proxy/redis-cache.js';

const cache = new RedisCache({
redis, // ioredis client
defaultTTL: 3600, // 1 hour
beta: 1.0, // Probabilistic early expiration factor
maxBlobSize: 5 * 1024 * 1024, // 5MB limit
keyPrefix: 'blob:',
logger,
});

// Cache a blob
await cache.set(cid, buffer, 'application/pdf', 7200);

// Retrieve with early expiration check
const result = await cache.get(cid);
if (result?.isEarlyFetch) {
// Background refresh before TTL expires
scheduleRefresh(cid);
}

// Check existence
const exists = await cache.has(cid);

// Remove from cache
await cache.delete(cid);

// Get cache stats
const stats = await cache.getStats();
console.log(`Keys: ${stats.keyCount}, Memory: ${stats.memoryUsage}`);

Probabilistic early expiration

The cache uses XFetch algorithm to prevent cache stampedes. As a cached entry approaches expiration, isEarlyFetch returns true with increasing probability, allowing background refresh before the TTL expires.

MetricsService

Tracks views, downloads, and trending eprints using Redis data structures.

Data structures

  • Counters (INCR): Total view and download counts
  • HyperLogLog (PFADD): Unique viewer estimation
  • Sorted Sets (ZADD): Time-windowed metrics for trending

Recording metrics

import { MetricsService } from '@/services/metrics/metrics-service.js';

const service = new MetricsService({
storage,
redis,
logger,
keyPrefix: 'metrics:',
});

// Record a view (with optional viewer DID for unique tracking)
await service.recordView(eprintUri, viewerDid);

// Record a download
await service.recordDownload(eprintUri, viewerDid);

// Batch operations (uses Redis pipeline)
await service.batchIncrement([
{ type: 'view', uri: uri1, viewerDid: did1 },
{ type: 'view', uri: uri1, viewerDid: did2 },
{ type: 'download', uri: uri2, viewerDid: did1 },
]);

Querying metrics

// Get all metrics for an eprint
const metrics = await service.getMetrics(eprintUri);
console.log(`Total views: ${metrics.totalViews}`);
console.log(`Unique viewers: ${metrics.uniqueViews}`);
console.log(`Views (24h): ${metrics.views24h}`);
console.log(`Views (7d): ${metrics.views7d}`);

// Get view count only
const viewCount = await service.getViewCount(eprintUri);

// Get trending eprints
const trending = await service.getTrending('24h', 10);
for (const entry of trending) {
console.log(`${entry.uri}: ${entry.score} views`);
}

Persistence

MetricsService stores counters in Redis for speed. Call flushToDatabase() periodically to persist to PostgreSQL for durability:

// Flush Redis counters to PostgreSQL
const result = await service.flushToDatabase();
if (result.ok) {
console.log(`Flushed ${result.value} records`);
}

PDSSyncService

Detects when indexed records become stale and refreshes them from the source PDS.

Staleness detection

A record is stale when:

  1. The indexed CID differs from the current PDS CID
  2. The record hasn't been synced within the threshold (default: 7 days)
import { PDSSyncService } from '@/services/pds-sync/sync-service.js';
import { createResiliencePolicy } from '@/services/common/resilience.js';

const resiliencePolicy = createResiliencePolicy({
circuitBreaker: { name: 'pds-sync', failureThreshold: 5, timeout: 60000 },
retry: { name: 'pds-sync', maxAttempts: 3, baseDelay: 100 },
});

const service = new PDSSyncService({
storage,
repository,
resiliencePolicy,
logger,
});

// Check if a record is stale
const check = await service.checkStaleness(uri);
if (check.isStale) {
console.log(`Record ${uri} is stale`);
console.log(`Indexed CID: ${check.indexedCID}`);
console.log(`PDS CID: ${check.pdsCID}`);
}

// Refresh a record from PDS
const result = await service.refreshRecord(uri);
if (result.ok) {
console.log(`Changed: ${result.value.changed}`);
console.log(`Previous CID: ${result.value.previousCID}`);
console.log(`Current CID: ${result.value.currentCID}`);
}

// Track PDS update after indexing
await service.trackPDSUpdate(uri, cid, pdsUrl);

Background sync job

// Find stale records (older than 7 days)
const staleUris = service.detectStaleRecords();

// Refresh each stale record
for (const uri of staleUris) {
const result = await service.refreshRecord(uri);
if (!result.ok) {
logger.error('Failed to refresh record', result.error, { uri });
}
}

PDSDiscoveryService

Discovers and scans Personal Data Servers (PDSes) that may contain Chive records but are not connected to the main relay firehose.

Components

ComponentPurpose
PDSRegistryTracks known PDSes and their scan state
PDSDiscoveryServiceDiscovers PDSes from PLC directory, relays, DIDs
PDSScannerScans PDSes for pub.chive.* records

Discovery sources

import { PDSDiscoveryService } from '@/services/pds-discovery/discovery-service.js';

const discovery = new PDSDiscoveryService(registry, logger, redis);

// Discover from PLC directory (streams incrementally)
for await (const pds of discovery.discoverFromPLCDirectory(cursor)) {
console.log(`Found: ${pds.pdsUrl}`);
}

// Discover from relay listHosts
const relayPDSes = await discovery.discoverFromRelay('wss://bsky.network');

// Discover from DIDs in indexed records
const didPDSes = await discovery.discoverFromDIDMentions(authorDids);

Scanning PDSes

import { PDSScanner } from '@/services/pds-discovery/pds-scanner.js';

const scanner = new PDSScanner(registry, eprintService, logger, {
requestsPerMinute: 10,
maxRecordsPerPDS: 1000,
});

// Scan a PDS for Chive records
const result = await scanner.scanPDS('https://pds.example.com');
console.log(`Found ${result.chiveRecordCount} records`);

// Scan a specific DID
const count = await scanner.scanDID(pdsUrl, 'did:plc:abc123');

User PDS registration

Users can register their PDS via pub.chive.sync.registerPDS. If authenticated, their records are scanned immediately.

See PDS Discovery for detailed documentation.

AnnotationService

Indexes inline text annotations and entity links from the firehose with support for threaded discussions and W3C Web Annotation targets. AnnotationService uses generated lexicon types with runtime validation.

Type imports

import {
isRecord as isAnnotationRecord,
type Main as AnnotationRecord,
} from '../../lexicons/generated/types/pub/chive/annotation/comment.js';
import {
isRecord as isEntityLinkRecord,
type Main as EntityLinkRecord,
} from '../../lexicons/generated/types/pub/chive/annotation/entityLink.js';

Key methods

import { AnnotationService } from '@/services/annotation/annotation-service.js';

const service = new AnnotationService({ storage, logger });

// Index an annotation from firehose event
await service.indexAnnotation(record, {
uri: 'at://did:plc:abc/pub.chive.annotation.comment/xyz' as AtUri,
cid: 'bafyrei...' as CID,
pdsUrl: 'https://pds.example.com',
indexedAt: new Date(),
});

// Index an entity link from firehose event
await service.indexEntityLink(entityLinkRecord, metadata);

// Get annotations for an eprint
const annotations = await service.getAnnotationsForEprint(eprintUri, {
motivation: 'questioning',
limit: 50,
});

// Get annotations for a specific PDF page
const pageAnnotations = await service.getAnnotationsForPage(eprintUri, 5);

// Get threaded annotation with replies (recursive CTE)
const thread = await service.getAnnotationThread(annotationUri);

// Get annotations by author
const authorAnnotations = await service.getAnnotationsByAuthor(authorDid);

Database tables

AnnotationService writes to two PostgreSQL tables:

annotations_index: Inline text annotations with W3C targets

  • Columns: uri, cid, eprint_uri, annotator_did, body (JSONB), anchor (JSONB, NOT NULL), page_number, motivation, parent_annotation, reply_count, timestamps, pds_url
  • Indexed by: eprint_uri, annotator_did, page_number, parent_annotation

entity_links_index: Entity links connecting text spans to knowledge graph entities

  • Columns: uri, cid, eprint_uri, creator_did, anchor (JSONB, NOT NULL), page_number, entity_type, entity_data (JSONB), entity_label, confidence, timestamps, pds_url
  • GIN index on entity_data for JSON querying

Threading

Threads are built using recursive CTEs with unlimited depth:

const thread = await service.getAnnotationThread(annotationUri);
// Returns: { root: AnnotationView, replies: AnnotationThread[], totalReplies: number }

ATProto compliance

AnnotationService indexes records from the firehose and never writes to user PDSes. All index tables are rebuildable from firehose events and track PDS source URLs for staleness detection.

ReviewService

Indexes document-level reviews and endorsements with support for threaded discussions and soft deletion. ReviewService uses generated lexicon types with runtime validation (see Lexicon type validation for the pattern). For inline text annotations, see AnnotationService above.

Type imports

ReviewService imports generated types from the lexicon code generator:

import {
isRecord as isCommentRecord,
type Main as CommentRecord,
} from '../../lexicons/generated/types/pub/chive/review/comment.js';
import {
isRecord as isEndorsementRecord,
type Main as EndorsementRecord,
} from '../../lexicons/generated/types/pub/chive/review/endorsement.js';

// Re-export for external use
export type { CommentRecord as ReviewComment, EndorsementRecord as Endorsement };

The isRecord type guards validate records at runtime, and the service re-exports types with domain names (ReviewComment, Endorsement) for external consumers.

Indexing reviews

import { ReviewService } from '@/services/review/review-service.js';

const service = new ReviewService({
storage,
logger,
});

// Index a review comment
const review: ReviewComment = {
$type: 'pub.chive.review.comment',
subject: { uri: eprintUri, cid: eprintCid },
text: 'The methodology section needs more detail on sample size.',
reviewType: 'methodology',
parent: parentReviewUri, // Optional: for threaded replies
createdAt: new Date().toISOString(),
};

await service.indexReview(review, metadata);

// Index an endorsement
const endorsement: Endorsement = {
$type: 'pub.chive.review.endorsement',
eprintUri: eprintUri,
contributions: ['reproducibility', 'methodological'],
createdAt: new Date().toISOString(),
};

await service.indexEndorsement(endorsement, metadata);

Soft deletion

Reviews support soft deletion to preserve thread integrity while hiding content. The deleted field indicates whether a review has been soft-deleted:

// Check if review is deleted
if (review.deleted) {
// Show "[deleted]" placeholder instead of content
}

// The API response includes the deleted flag
interface ReviewResponse {
uri: string;
author: AuthorView;
record: ReviewRecord;
deleted: boolean; // true if soft-deleted
indexedAt: string;
}

Soft-deleted reviews:

  • Remain in the thread structure to preserve reply context
  • Have their content hidden from display
  • Can be identified by the deleted: true flag in API responses

Threading with ThreadingHandler

import { ThreadingHandler } from '@/services/review/threading-handler.js';

const handler = new ThreadingHandler({
maxDepth: 20,
sortByDate: true,
});

// Build thread tree from flat review list
const reviews = await service.getReviews(eprintUri);
const threads = handler.buildThreads(reviews);

// Each thread has nested replies
for (const thread of threads) {
console.log(`Root: ${thread.root.text}`);
console.log(`Replies: ${thread.totalReplies}`);

for (const reply of thread.replies) {
console.log(` Reply: ${reply.root.text}`);
}
}

// Flatten threads back to list
const flat = handler.flattenThreads(threads);

// Find specific thread containing a review
const found = handler.findThread(threads, reviewUri);

// Get ancestor chain for a nested review
const ancestors = handler.getAncestors(threads, nestedReviewUri);

Testing

Run integration tests

Integration tests require the Docker test stack:

# Start test infrastructure
./scripts/start-test-stack.sh

# Run integration tests
npm run test:integration

# Run specific service tests
npx vitest run tests/integration/services/eprint
npx vitest run tests/integration/services/metrics
npx vitest run tests/integration/services/blob-proxy
npx vitest run tests/integration/services/pds-sync
npx vitest run tests/integration/services/review

Run compliance tests

# Verify ATProto compliance
npm run test:compliance

# Run core services compliance tests
npx vitest run tests/compliance/core-services-compliance.test.ts

Run performance benchmarks

# Install k6
brew install k6 # macOS
# or: https://k6.io/docs/getting-started/installation/

# Run benchmarks
k6 run tests/performance/k6/scenarios/eprint-indexing.js
k6 run tests/performance/k6/scenarios/blob-proxy-load.js
k6 run tests/performance/k6/scenarios/search-query.js
k6 run tests/performance/k6/scenarios/metrics-recording.js

Performance targets

OperationTarget p95
Eprint indexing< 200ms
Blob cache hit (L1)< 50ms
Blob cache miss (L2/L3)< 200ms
Search query< 300ms
Metrics recording< 10ms

Error handling

All services return Result<T, Error> types for operations that can fail:

const result = await service.indexEprint(eprint, metadata);

if (result.ok) {
console.log('Indexed successfully');
} else {
// Handle typed errors
if (result.error instanceof ValidationError) {
console.log('Invalid eprint data:', result.error.message);
} else if (result.error instanceof DatabaseError) {
console.log('Storage failed:', result.error.message);
}
}

DiscoveryService

Provides personalized eprint recommendations and related paper discovery using Semantic Scholar, OpenAlex, and citation graph data.

Key constraint

All discovery features recommend only eprints indexed in Chive. External APIs (Semantic Scholar, OpenAlex) are used as enrichment signals, not as recommendation sources.

Key methods

import { DiscoveryService } from '@/services/discovery/discovery-service.js';

const service = new DiscoveryService(logger, db, searchEngine, rankingService, citationGraph);

// Optional: Enable external API enrichment
service.setPluginManager(pluginManager);

// Enrich an eprint with external metadata
const enrichment = await service.enrichEprint({
uri: eprintUri,
doi: '10.1234/example',
title: 'Example Paper',
});
console.log(`Citations: ${enrichment.citationCount}`);

// Find related Chive eprints
const related = await service.findRelatedEprints(eprintUri, {
signals: ['citations', 'concepts', 'semantic'],
limit: 10,
minScore: 0.3,
});

// Get personalized recommendations using configurable weights
const recommendations = await service.getRecommendationsForUser(userDid, {
signals: ['fields', 'citations', 'semantic'],
weights: {
semantic: 30,
coCitation: 25,
conceptOverlap: 20,
authorNetwork: 15,
collaborative: 10,
},
limit: 20,
});

// Record user interaction for feedback loop
await service.recordInteraction(userDid, {
eprintUri,
type: 'dismiss',
recommendationId: 'rec-123',
});

// Get citation network data
const counts = await service.getCitationCounts(eprintUri);
const citing = await service.getCitingPapers(eprintUri, { limit: 20 });
const references = await service.getReferences(eprintUri, { limit: 20 });

Recommendation signals

SignalDescriptionWeight
fieldsUser's research fields (from profile or claimed papers)60%
citationsPapers citing user's work25%
semanticSPECTER2 similarity to claimed papers30%

RankingService

Personalizes search results based on user fields and query relevance.

Score components

ComponentWeightDescription
Field match60%Category overlap with user's research fields
Text relevance40%BM25-style text matching (title + abstract)
RecencyBoostExponential decay with 30-day half-life

Usage

import { RankingService } from '@/services/search/ranking-service.js';

const service = new RankingService(db, logger, textScorer, categoryMatcher, {
fieldWeight: 0.6,
textWeight: 0.4,
maxUserFields: 10,
mode: 'heuristic', // or 'ltr' for Learning to Rank
});

// Rank search results by personalized relevance
const ranked = await service.rank(searchResults, {
userDid: user.did,
query: 'attention mechanisms',
});

for (const { item, score, fieldMatchScore, textRelevanceScore } of ranked) {
console.log(`${item.title}: ${score}`);
}

// Get user's research fields (cached for 5 minutes)
const fields = await service.getUserFields(userDid);

// Rank with discovery signals
const rankedWithDiscovery = await service.rankWithDiscoverySignals(
items,
{ userDid, query },
{ s2Scores, citationScores, conceptScores }
);

LTR feature extraction

RankingService extracts feature vectors for Learning to Rank model training:

interface LTRFeatureVector {
textRelevance: number;
fieldMatchScore: number;
titleMatchScore: number;
abstractMatchScore: number;
recencyScore: number;
bm25Score: number;
originalPosition: number;
// Discovery signals
specter2Similarity?: number;
coCitationScore?: number;
conceptOverlapScore?: number;
}

ClaimingService

Manages authorship claims for imported eprints using multi-authority verification.

Evidence types and weights

Evidence TypeWeightSource
orcid-match35%ORCID API (cryptographically verified)
author-claim20%Self-attestation with evidence
semantic-scholar-match15%S2 claimed author profile
openreview-match15%OpenReview authenticated profile
openalex-match10%OpenAlex ORCID-linked author
arxiv-ownership10%arXiv author ownership system
institutional-email8%Handle domain verification
ror-affiliation5%ROR organization match
coauthor-overlap5%Network analysis
name-match2%Fuzzy name matching

Claim workflow

The ClaimingService supports two claim types with no verification gatekeeping:

  1. External claims - Import papers from arXiv, Semantic Scholar, etc. with prefilled data
  2. Co-author claims - Request to be added as co-author on existing PDS records
import { ClaimingService } from '@/services/claiming/claiming-service.js';

const service = new ClaimingService(logger, db, importService, identity);
service.setPluginManager(pluginManager); // Enable external search

// Search external sources for eprints
const results = await service.searchEprints({
query: 'attention mechanisms',
sources: ['arxiv', 'semanticscholar'],
limit: 20,
});

// Get prefilled submission data for external claim
const prefilled = await service.getSubmissionData('arxiv', '2309.12345');
// Returns form data for the submission wizard

// User completes submission wizard and creates record in their PDS
// Then mark the claim as complete
await service.completeClaim(claim.id, canonicalUri);

// Co-author claim workflow
const request = await service.requestCoauthorship(
eprintUri, // AT-URI of existing paper
userDid, // Claimant
'Jane Smith', // Display name
1, // Author index being claimed
'J. Smith' // Author name on paper
);

// PDS owner approves/rejects
await service.approveCoauthorRequest(requestId, ownerDid);

ATProto compliance

The ClaimingService never writes to user PDSes. The claim flow is:

  1. User searches external sources via Chive
  2. User gets prefilled submission data
  3. User creates canonical record in THEIR PDS
  4. Chive indexes the record from firehose
  5. For co-author claims, PDS owner updates their record (Chive never writes)

SearchService

Full-text search with Elasticsearch and faceted filtering.

Features

  • BM25 text ranking
  • PMEST faceted search (Personality, Matter, Energy, Space, Time)
  • Field highlighting
  • Autocomplete suggestions
  • Search tracking for analytics
import { SearchService } from '@/services/search/search-service.js';

const service = new SearchService(elasticsearch, storage, logger);

// Full-text search
const results = await service.search({
q: 'neural attention mechanisms',
limit: 20,
cursor: 'page2',
});

// Faceted search
const facetedResults = await service.facetedSearch({
q: 'machine learning',
facets: {
fields: ['cs.AI', 'cs.LG'],
sources: ['arxiv'],
dateRange: { from: '2024-01-01' },
},
});

// Get facet counts
const counts = await service.getFacetCounts({
q: 'neural networks',
});

// Autocomplete
const suggestions = await service.autocomplete('atten', 8);

BacklinkService

Tracks references to Chive eprints from Bluesky posts.

SourceDescription
bluesky_postBluesky post mentioning the eprint
bluesky_replyReply discussing the eprint
bluesky_quoteQuote post of the eprint
import { BacklinkService } from '@/services/backlink/backlink-service.js';

const service = new BacklinkService(db, firehoseClient, logger);

// Get backlinks for an eprint
const backlinks = await service.getBacklinks(eprintUri, {
limit: 20,
sourceTypes: ['bluesky_post', 'bluesky_quote'],
});

// Get backlink counts
const counts = await service.getBacklinkCounts(eprintUri);
console.log(`Total: ${counts.total}, Bluesky posts: ${counts.blueskyPosts}`);

ActivityService

Tracks user activity for analytics and discovery feedback.

Activity categories

CategoryActions
readview, dwell
engageclick, download, share
contributereview, endorse, tag, claim
import { ActivityService } from '@/services/activity/activity-service.js';

const service = new ActivityService(db, logger);

// Log an activity
await service.logActivity({
userDid,
category: 'read',
action: 'view',
targetUri: eprintUri,
collection: 'pub.chive.eprint.submission',
metadata: { source: 'search', query: 'neural networks' },
});

// Get user's activity feed
const feed = await service.getActivityFeed(userDid, {
category: 'contribute',
limit: 50,
});

// Mark activity as failed (for retry logic)
await service.markFailed(activityId, 'API error');

NotificationService

Server-sent events (SSE) for real-time notifications.

Notification types

TypeDescription
new_reviewNew review on user's eprint
review_replyReply to user's review
endorsementNew endorsement on user's eprint
claim_updateClaim status change
proposal_updateGovernance proposal update
import { NotificationService } from '@/services/notification/notification-service.js';

const service = new NotificationService(redis, db, logger);

// Subscribe to user notifications (returns SSE stream)
const stream = service.subscribe(userDid);

// Send notification
await service.send(userDid, {
type: 'new_review',
title: 'New review on your eprint',
body: 'Alice left a review on "Attention Is All You Need"',
targetUri: reviewUri,
eprintUri,
});

// Batch notifications
await service.sendBatch([
{ userDid: did1, notification: notif1 },
{ userDid: did2, notification: notif2 },
]);

KnowledgeGraphService

Manages the knowledge graph nodes, edges, and governance proposals.

Terminology

The knowledge graph uses "node" as the universal term for all graph entities. Node types are distinguished by their kind and subkind properties:

KindSubkindDescription
typeendorsement-kindEndorsement contribution types
typelicenseLicense types
typemethodologyResearch methodologies
typepaper-typePaper types
objectfieldAcademic fields
objectinstitutionResearch institutions
objectauthorIndividual researchers
objecteprintEprint nodes

Node operations

import { KnowledgeGraphService } from '@/services/knowledge-graph/knowledge-graph-service.js';

const service = new KnowledgeGraphService(neo4j, storage, logger);

// Get node by ID
const node = await service.getNode('cs.AI');

// Get node hierarchy via edges
const children = await service.getNodeChildren('cs');
const parents = await service.getNodeParents('cs.AI');
const related = await service.getRelatedNodes('cs.AI');

// Search nodes
const matches = await service.searchNodes('artificial intelligence', {
subkind: 'field',
});

// Get eprints in field
const eprints = await service.getFieldEprints('cs.AI', {
limit: 20,
sort: 'recent',
});

Proposal operations

// Create node proposal
const nodeProposal = await service.createNodeProposal({
proposalType: 'create',
kind: 'object',
subkind: 'field',
proposedNode: {
id: 'cs.QML',
label: 'Quantum Machine Learning',
alternateLabels: ['QML'],
description: 'Algorithms combining quantum computing with ML',
},
rationale: 'Emerging interdisciplinary field',
proposerDid: userDid,
});

// Create edge proposal for parent relationship
const edgeProposal = await service.createEdgeProposal({
proposalType: 'create',
proposedEdge: {
sourceUri: nodeProposal.uri,
targetUri:
'at://did:plc:chive-governance/pub.chive.graph.node/726c5017-723e-5ae5-a1e2-f12e636eb709',
relationSlug: 'broader',
},
rationale: 'QML is a subfield of AI',
proposerDid: userDid,
});

// Vote on proposal
await service.vote(nodeProposal.id, userDid, {
vote: 'approve',
weight: 1.0, // Based on user tier
});

// Get proposal status
const status = await service.getProposalStatus(nodeProposal.id);
console.log(`Approval: ${status.approvalPercentage}%`);

Common patterns

Result types

All services return Result<T, Error> types for operations that can fail:

const result = await service.someOperation();

if (result.ok) {
console.log('Success:', result.value);
} else {
if (result.error instanceof ValidationError) {
console.log('Validation failed:', result.error.message);
} else if (result.error instanceof NotFoundError) {
console.log('Not found:', result.error.message);
}
}

Resilience policies

Services use circuit breakers and retries for external API calls:

import { createResiliencePolicy } from '@/services/common/resilience.js';

const policy = createResiliencePolicy({
circuitBreaker: {
name: 'external-api',
failureThreshold: 5,
timeout: 60000,
},
retry: {
name: 'external-api',
maxAttempts: 3,
baseDelay: 100,
maxDelay: 5000,
},
});

Testing

Run integration tests

# Start test infrastructure
./scripts/start-test-stack.sh

# Run all integration tests
npm run test:integration

# Run specific service tests
npx vitest run tests/integration/services/discovery
npx vitest run tests/integration/services/claiming
npx vitest run tests/integration/services/search

Run compliance tests

npm run test:compliance

Performance targets

OperationTarget p95
Eprint indexing< 200ms
Search query< 300ms
Discovery recommendations< 500ms
Claim evidence collection< 2000ms
Blob cache hit (L1)< 50ms
Metrics recording< 10ms

Next steps