We value your privacy

This site uses cookies to improve your browsing experience, analyze site traffic, and show personalized content. See our Privacy Policy.

  1. MooseStack
  2. Moose Streams
  3. Consumer functions

Consumer functions

Use Stream.addConsumer() or stream.add_consumer() to run code whenever a record arrives on a Stream. Consumers are useful for side effects such as notifications, cache updates, webhook delivery, or calling other services.

Basic usage

app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;  type: "foo" | "bar";} const stream = new Stream<Record>("stream"); stream.addConsumer(async (record) => {  console.log(`Processing ${record.id} for ${record.name} at ${record.timestamp}`);});

Multiple consumers

You can attach more than one consumer to the same stream when different side effects should run from the same records.

app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;  type: "foo" | "bar";} const stream = new Stream<Record>("stream"); async function foo(record: Record) {  console.log(`foo: ${record.id}`);} async function bar(record: Record) {  console.log(`bar: ${record.id}`);} stream.addConsumer(async (record) => {  if (record.type === "foo") {    await foo(record);  }}); stream.addConsumer(async (record) => {  if (record.type === "bar") {    await bar(record);  }});

Consumer behavior

  • Every consumer attached to the stream receives each record.
  • Consumers can be synchronous or asynchronous.
  • Use MooseCache or another external store if consumer logic needs state across records.
  • For broader multi-stream compositions, see Stream patterns.
  • Use a Dead Letter Queue when failed records should be captured instead of dropped.

Related capabilities

  • Stream for defining the stream resource
  • Stream.send() for getting records into the stream
  • Transform functions when records should be rewritten or routed to another stream
  • Stream patterns for fan-in, fan-out, and chained stream examples
  • Dead letter queues for failed-consumer handling

On this page

Basic usageMultiple consumersConsumer behaviorRelated capabilities
Edit this page
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackHostingTemplatesGuides
Release Notes
Source575
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Language Server
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streams
    • Define
    • Stream
    • Send
    • Stream.send()
    • Schema registry
    • Process
    • Consumer functions
    • Transform functions
    • Stream patterns
    • Store
    • Sync to table
    • Recover
    • Dead letter queues
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Dev
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Query Layer
  • Testing Utilities
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;  type: "foo" | "bar";} const stream = new Stream<Record>("stream"); stream.addConsumer(async (record) => {  console.log(`Processing ${record.id} for ${record.name} at ${record.timestamp}`);});
app/consumer.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;  type: "foo" | "bar";} const stream = new Stream<Record>("stream"); async function foo(record: Record) {  console.log(`foo: ${record.id}`);} async function bar(record: Record) {  console.log(`bar: ${record.id}`);} stream.addConsumer(async (record) => {  if (record.type === "foo") {    await foo(record);  }}); stream.addConsumer(async (record) => {  if (record.type === "bar") {    await bar(record);  }});