Consumer functions
Use Stream.addConsumer() or stream.add_consumer() to run code whenever a record arrives on a Stream. Consumers are useful for side effects such as notifications, cache updates, webhook delivery, or calling other services.
Basic usage
app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record { id: string; name: string; timestamp: Date; type: "foo" | "bar";} const stream = new Stream<Record>("stream"); stream.addConsumer(async (record) => { console.log(`Processing ${record.id} for ${record.name} at ${record.timestamp}`);});Multiple consumers
You can attach more than one consumer to the same stream when different side effects should run from the same records.
app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record { id: string; name: string; timestamp: Date; type: "foo" | "bar";} const stream = new Stream<Record>("stream"); async function foo(record: Record) { console.log(`foo: ${record.id}`);} async function bar(record: Record) { console.log(`bar: ${record.id}`);} stream.addConsumer(async (record) => { if (record.type === "foo") { await foo(record); }}); stream.addConsumer(async (record) => { if (record.type === "bar") { await bar(record); }});Consumer behavior
- Every consumer attached to the stream receives each record.
- Consumers can be synchronous or asynchronous.
- Use
MooseCacheor another external store if consumer logic needs state across records. - For broader multi-stream compositions, see Stream patterns.
- Use a Dead Letter Queue when failed records should be captured instead of dropped.
Related capabilities
- Stream for defining the stream resource
Stream.send()for getting records into the stream- Transform functions when records should be rewritten or routed to another stream
- Stream patterns for fan-in, fan-out, and chained stream examples
- Dead letter queues for failed-consumer handling