Dead letter queues
Use DeadLetterQueue<T> to capture records that fail during stream processing. A dead letter queue is itself a stream of wrapper records: Moose keeps the original payload and adds error details about why that record failed.
Create a dead letter queue
import { DeadLetterQueue } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string; timestamp: Date;} export const userEventDLQ = new DeadLetterQueue<UserEvent>("user_event_dlq");Where you can attach a dead letter queue
Attach a dead letter queue to one of these components when failed records from that stage should be retained for inspection or recovery.
| Attach to | Records are written here when |
|---|---|
| Transform | A transform throws an error while processing a record. |
| Consumer | A consumer throws an error while processing a record. |
| Ingest API | An Ingest API cannot validate or process a record cleanly. |
Attach a Dead Letter Queue to a transform
import { DeadLetterQueue, Stream } from "@514labs/moose-lib"; interface RawRecord { id: string; name: string; timestamp: Date;} interface ProcessedRecord extends RawRecord { processed: Date;} export const rawRecords = new Stream<RawRecord>("raw_records");export const processedRecords = new Stream<ProcessedRecord>("processed_records");export const recordDLQ = new DeadLetterQueue<RawRecord>("record_dlq"); rawRecords.addTransform( processedRecords, (record) => { if (!record.id) { throw new Error("Invalid id"); } return { id: record.id, name: record.name, timestamp: record.timestamp, processed: new Date(), }; }, { deadLetterQueue: recordDLQ, });Attach a Dead Letter Queue to a consumer
import { ConsumerConfig, DeadLetterQueue, Stream } from "@514labs/moose-lib"; interface RawEvent { userId: string; action: string;} export const rawEvents = new Stream<RawEvent>("raw_events");export const eventDLQ = new DeadLetterQueue<RawEvent>("event_dlq"); rawEvents.addConsumer( (event) => { if (event.action === "forbidden_action") { throw new Error("Forbidden action detected"); } console.log(`Processing ${event.action} for ${event.userId}`); }, { deadLetterQueue: eventDLQ, } satisfies ConsumerConfig<RawEvent>);Attach a Dead Letter Queue to an Ingest API
import { DeadLetterQueue, IngestApi, Stream } from "@514labs/moose-lib"; interface UserEvent { id: string; userId: string; timestamp: Date; eventType: string;} const userEvents = new Stream<UserEvent>("user_events");const userEventDLQ = new DeadLetterQueue<UserEvent>("user_events_dlq"); export const userEventsApi = new IngestApi<UserEvent>("user-events", { destination: userEvents, deadLetterQueue: userEventDLQ,});Dead letter payload format
When Moose writes a failed record into a dead letter queue, it wraps the original payload with a dead letter model that includes failure metadata alongside the original record. TypeScript exposes a base DeadLetterModel plus a typed DeadLetter<T> helper, while Python exposes DeadLetterModel[T] directly.
interface DeadLetterModel { originalRecord: Record<string, any>; errorMessage: string; errorType: string; failedAt: Date; source: "api" | "transform" | "table";} interface DeadLetter<T> extends DeadLetterModel { asTyped: () => T;}| Property / method | Meaning |
|---|---|
originalRecord | The original failed payload before Moose wrapped it as a dead letter. |
errorMessage | The error message Moose recorded for the failure. |
errorType | The category of failure, such as a validation or transform error. |
failedAt | When Moose wrote the record to the dead letter queue. |
source | Where the failure happened, such as api, transform, or table. |
asTyped() | Returns the original payload as the expected application type T. This helper is available on DeadLetter<T>. |
Inspect or recover dead letters
Because a dead letter queue is itself a stream, you can attach consumers or transforms to it for monitoring or recovery.
import { DeadLetterQueue } from "@514labs/moose-lib"; interface UserEvent { userId: string; action: string;} export const eventDLQ = new DeadLetterQueue<UserEvent>("event_dlq"); eventDLQ.addConsumer((deadLetter) => { console.error(deadLetter.errorMessage); const originalEvent = deadLetter.asTyped(); console.log(originalEvent.userId);});Dead letter queues are for recovery, not silent drops
Use a dead letter queue when failures should be retained for inspection or replay instead of disappearing from the stream pipeline.
Related capabilities
- Transformation Functions when failed transforms should be captured
- Consumer Functions when failed consumers should be captured
- Ingest API when bad input can enter through HTTP ingestion