Skip to main content

Class: ATRepository

Defined in: src/atproto/repository/at-repository.ts:124

AT Protocol repository implementation.

Remarks

Implements IRepository for fetching records and blobs from user PDSes. Uses @atproto/api SDK for AT Protocol compliance and the identity resolver for DID-to-PDS endpoint resolution.

Architecture:

ATRepository
├── IIdentityResolver (DID → PDS URL)
├── AtpAgent (per-PDS HTTP client)
└── IPolicy (circuit breaker + retry)

Resilience: All PDS requests are wrapped in a resilience policy that provides:

  • Circuit breaker: Fast-fail when PDS is unhealthy
  • Retry: Automatic retry with exponential backoff

Caching: This implementation does NOT cache responses. Caching should be handled by a higher-level service (e.g., BlobProxyService for blobs).

Example

const repository = new ATRepository({
identity: didResolver,
resiliencePolicy: createResiliencePolicy({
circuitBreaker: { name: 'pds', failureThreshold: 5 },
retry: { name: 'pds', maxAttempts: 3 }
}),
logger,
config: { timeoutMs: 30000 }
});

// Fetch a single record
const record = await repository.getRecord<PreprintRecord>(uri);

// List records from a collection
for await (const record of repository.listRecords<PreprintRecord>(did, nsid)) {
console.log(record.value.title);
}

// Fetch a blob
const stream = await repository.getBlob(did, cid);

Implements

Constructors

new ATRepository()

new ATRepository(options): ATRepository

Defined in: src/atproto/repository/at-repository.ts:153

Creates a new ATRepository.

Parameters

options

ATRepositoryOptions

Repository options

Returns

ATRepository

Example

const repository = new ATRepository({
identity: didResolver,
resiliencePolicy,
logger
});

Methods

getBlob()

getBlob(did, cid): Promise<ReadableStream<Uint8Array>>

Defined in: src/atproto/repository/at-repository.ts:406

Fetches a blob from a user's PDS.

Parameters

did

DID

Repository DID

cid

CID

Blob CID

Returns

Promise<ReadableStream<Uint8Array>>

Blob data as readable stream

Remarks

This method fetches blobs for proxying only; Chive does not store blob data. The stream should be piped directly to the client.

Process: resolve DID to PDS endpoint, construct blob URL using {pdsUrl}/xrpc/com.atproto.sync.getBlob, fetch blob with resilience, and return as ReadableStream.

Throws

BlobFetchError Thrown when blob fetch fails.

Example

const pdfStream = await repository.getBlob(
toDID('did:plc:abc123')!,
toCID('bafyreib2rxk...')!
);

// Pipe to response
return new Response(pdfStream, {
headers: { 'Content-Type': 'application/pdf' }
});

Implementation of

IRepository.getBlob


getRecord()

getRecord<T>(uri, options?): Promise<null | RepositoryRecord<T>>

Defined in: src/atproto/repository/at-repository.ts:198

Fetches a single record by AT URI.

Type Parameters

T

Record value type

Parameters

uri

AtUri

AT URI of the record

options?

GetRecordOptions

Fetch options (CID for specific version)

Returns

Promise<null | RepositoryRecord<T>>

Record if found, null if not found

Remarks

This method fetches the record from the user's PDS. If the record doesn't exist or the PDS is unreachable, returns null.

Process:

  1. Parse AT URI to extract DID, collection, rkey
  2. Resolve DID to PDS endpoint
  3. Get or create AtpAgent for PDS
  4. Fetch record via com.atproto.repo.getRecord
  5. Return typed RepositoryRecord

Throws

RecordFetchError Thrown when fetch fails due to network error or parse error. NOT thrown for 404 (returns null instead).

Example

const record = await repository.getRecord<PreprintRecord>(
toAtUri('at://did:plc:abc123/pub.chive.preprint.submission/xyz789')!
);

if (record) {
console.log('Title:', record.value.title);
console.log('Author:', record.author);
console.log('CID:', record.cid);
}

Implementation of

IRepository.getRecord


listRecords()

listRecords<T>(did, collection, options?): AsyncIterable<RepositoryRecord<T>>

Defined in: src/atproto/repository/at-repository.ts:293

Lists records from a collection in a user's repository.

Type Parameters

T

Record value type

Parameters

did

DID

Repository DID

collection

NSID

Collection NSID

options?

ListRecordsOptions

List options (limit, cursor, reverse)

Returns

AsyncIterable<RepositoryRecord<T>>

Async iterable of records

Remarks

Returns an async iterable for memory-efficient streaming of large collections. Automatically handles pagination via cursors.

Process:

  1. Resolve DID to PDS endpoint
  2. Get or create AtpAgent for PDS
  3. Fetch records in pages via com.atproto.repo.listRecords
  4. Yield records one at a time
  5. Continue until no more pages or limit reached

Example

const preprints = repository.listRecords<PreprintRecord>(
toDID('did:plc:abc123')!,
toNSID('pub.chive.preprint.submission')!,
{ limit: 10 }
);

for await (const record of preprints) {
console.log('Preprint:', record.value.title);
}

Implementation of

IRepository.listRecords