Class: ATRepository
Defined in: src/atproto/repository/at-repository.ts:124
AT Protocol repository implementation.
Remarks
Implements IRepository for fetching records and blobs from user PDSes.
Uses @atproto/api SDK for AT Protocol compliance and the identity resolver
for DID-to-PDS endpoint resolution.
Architecture:
ATRepository
├── IIdentityResolver (DID → PDS URL)
├── AtpAgent (per-PDS HTTP client)
└── IPolicy (circuit breaker + retry)
Resilience: All PDS requests are wrapped in a resilience policy that provides:
- Circuit breaker: Fast-fail when PDS is unhealthy
- Retry: Automatic retry with exponential backoff
Caching: This implementation does NOT cache responses. Caching should be handled by a higher-level service (e.g., BlobProxyService for blobs).
Example
const repository = new ATRepository({
identity: didResolver,
resiliencePolicy: createResiliencePolicy({
circuitBreaker: { name: 'pds', failureThreshold: 5 },
retry: { name: 'pds', maxAttempts: 3 }
}),
logger,
config: { timeoutMs: 30000 }
});
// Fetch a single record
const record = await repository.getRecord<PreprintRecord>(uri);
// List records from a collection
for await (const record of repository.listRecords<PreprintRecord>(did, nsid)) {
console.log(record.value.title);
}
// Fetch a blob
const stream = await repository.getBlob(did, cid);
Implements
Constructors
new ATRepository()
new ATRepository(
options):ATRepository
Defined in: src/atproto/repository/at-repository.ts:153
Creates a new ATRepository.
Parameters
options
Repository options
Returns
Example
const repository = new ATRepository({
identity: didResolver,
resiliencePolicy,
logger
});
Methods
getBlob()
getBlob(
did,cid):Promise<ReadableStream<Uint8Array>>
Defined in: src/atproto/repository/at-repository.ts:406
Fetches a blob from a user's PDS.
Parameters
did
Repository DID
cid
Blob CID
Returns
Promise<ReadableStream<Uint8Array>>
Blob data as readable stream
Remarks
This method fetches blobs for proxying only; Chive does not store blob data. The stream should be piped directly to the client.
Process: resolve DID to PDS endpoint, construct blob URL using
{pdsUrl}/xrpc/com.atproto.sync.getBlob, fetch blob with resilience,
and return as ReadableStream.
Throws
BlobFetchError Thrown when blob fetch fails.
Example
const pdfStream = await repository.getBlob(
toDID('did:plc:abc123')!,
toCID('bafyreib2rxk...')!
);
// Pipe to response
return new Response(pdfStream, {
headers: { 'Content-Type': 'application/pdf' }
});
Implementation of
getRecord()
getRecord<
T>(uri,options?):Promise<null|RepositoryRecord<T>>
Defined in: src/atproto/repository/at-repository.ts:198
Fetches a single record by AT URI.
Type Parameters
• T
Record value type
Parameters
uri
AT URI of the record
options?
Fetch options (CID for specific version)
Returns
Promise<null | RepositoryRecord<T>>
Record if found, null if not found
Remarks
This method fetches the record from the user's PDS. If the record doesn't exist or the PDS is unreachable, returns null.
Process:
- Parse AT URI to extract DID, collection, rkey
- Resolve DID to PDS endpoint
- Get or create AtpAgent for PDS
- Fetch record via
com.atproto.repo.getRecord - Return typed RepositoryRecord
Throws
RecordFetchError Thrown when fetch fails due to network error or parse error. NOT thrown for 404 (returns null instead).
Example
const record = await repository.getRecord<PreprintRecord>(
toAtUri('at://did:plc:abc123/pub.chive.preprint.submission/xyz789')!
);
if (record) {
console.log('Title:', record.value.title);
console.log('Author:', record.author);
console.log('CID:', record.cid);
}
Implementation of
listRecords()
listRecords<
T>(did,collection,options?):AsyncIterable<RepositoryRecord<T>>
Defined in: src/atproto/repository/at-repository.ts:293
Lists records from a collection in a user's repository.
Type Parameters
• T
Record value type
Parameters
did
Repository DID
collection
Collection NSID
options?
List options (limit, cursor, reverse)
Returns
AsyncIterable<RepositoryRecord<T>>
Async iterable of records
Remarks
Returns an async iterable for memory-efficient streaming of large collections. Automatically handles pagination via cursors.
Process:
- Resolve DID to PDS endpoint
- Get or create AtpAgent for PDS
- Fetch records in pages via
com.atproto.repo.listRecords - Yield records one at a time
- Continue until no more pages or limit reached
Example
const preprints = repository.listRecords<PreprintRecord>(
toDID('did:plc:abc123')!,
toNSID('pub.chive.preprint.submission')!,
{ limit: 10 }
);
for await (const record of preprints) {
console.log('Preprint:', record.value.title);
}