We value your privacy

This site uses cookies to improve your browsing experience, analyze site traffic, and show personalized content. See our Privacy Policy.

  1. MooseStack
  2. Moose Streams
  3. Stream patterns

Stream patterns

Use this page for multi-stream composition examples that go beyond the basic Stream, Stream.send(), consumer, and transform references.

Fan-out

Send one source stream into multiple downstream streams when different processors or storage paths should receive different shapes of the same event flow.

app/fan-out.ts
import { Stream } from "@514labs/moose-lib"; interface OrderEvent {  orderId: string;  userId: string;  amount: number;} interface AnalyticsOrderEvent extends OrderEvent {} interface HighValueOrderEvent extends OrderEvent {  priority: "high";} const orders = new Stream<OrderEvent>("orders");const analyticsOrders = new Stream<AnalyticsOrderEvent>("analytics_orders");const highValueOrders = new Stream<HighValueOrderEvent>("high_value_orders"); orders.addTransform(analyticsOrders, (order) => order); orders.addTransform(highValueOrders, (order) => {  if (order.amount < 1000) {    return undefined;  }   return {    ...order,    priority: "high",  };});

Fan-in

Send multiple source streams into one destination stream when different producers should land in a shared downstream schema.

app/fan-in.ts
import { Stream } from "@514labs/moose-lib"; interface WebEvent {  userId: string;  path: string;} interface MobileEvent {  userId: string;  screen: string;} interface UnifiedEvent {  userId: string;  source: "web" | "mobile";  name: string;} const webEvents = new Stream<WebEvent>("web_events");const mobileEvents = new Stream<MobileEvent>("mobile_events");const unifiedEvents = new Stream<UnifiedEvent>("unified_events"); webEvents.addTransform(unifiedEvents, (event) => ({  userId: event.userId,  source: "web",  name: event.path,})); mobileEvents.addTransform(unifiedEvents, (event) => ({  userId: event.userId,  source: "mobile",  name: event.screen,}));

Using intermediate streams

Use multiple intermediate streams when records should move through distinct processing steps.

app/chained-stages.ts
import { Stream } from "@514labs/moose-lib"; interface RawEvent {  userId: string;  email: string;} interface ValidatedEvent extends RawEvent {} interface EnrichedEvent extends ValidatedEvent {  emailDomain: string;} const rawEvents = new Stream<RawEvent>("raw_events");const validatedEvents = new Stream<ValidatedEvent>("validated_events");const enrichedEvents = new Stream<EnrichedEvent>("enriched_events"); rawEvents.addTransform(validatedEvents, (event) => {  if (!event.userId || !event.email.includes("@")) {    return undefined;  }   return event;}); validatedEvents.addTransform(enrichedEvents, (event) => ({  ...event,  emailDomain: event.email.split("@")[1],}));

Related capabilities

  • Stream for defining the streams used in each pattern
  • Transform functions for the single-transform reference page
  • Consumer functions when a stage should trigger side effects instead of writing to another stream
  • Dead letter queues when failed records in multi-stage flows should be captured

On this page

Fan-outFan-inUsing intermediate streamsRelated capabilities
Edit this page
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackHostingTemplatesGuides
Release Notes
Source575
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Language Server
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streams
    • Define
    • Stream
    • Send
    • Stream.send()
    • Schema registry
    • Process
    • Consumer functions
    • Transform functions
    • Stream patterns
    • Store
    • Sync to table
    • Recover
    • Dead letter queues
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Dev
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Query Layer
  • Testing Utilities
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
app/fan-out.ts
import { Stream } from "@514labs/moose-lib"; interface OrderEvent {  orderId: string;  userId: string;  amount: number;} interface AnalyticsOrderEvent extends OrderEvent {} interface HighValueOrderEvent extends OrderEvent {  priority: "high";} const orders = new Stream<OrderEvent>("orders");const analyticsOrders = new Stream<AnalyticsOrderEvent>("analytics_orders");const highValueOrders = new Stream<HighValueOrderEvent>("high_value_orders"); orders.addTransform(analyticsOrders, (order) => order); orders.addTransform(highValueOrders, (order) => {  if (order.amount < 1000) {    return undefined;  }   return {    ...order,    priority: "high",  };});
app/fan-in.ts
import { Stream } from "@514labs/moose-lib"; interface WebEvent {  userId: string;  path: string;} interface MobileEvent {  userId: string;  screen: string;} interface UnifiedEvent {  userId: string;  source: "web" | "mobile";  name: string;} const webEvents = new Stream<WebEvent>("web_events");const mobileEvents = new Stream<MobileEvent>("mobile_events");const unifiedEvents = new Stream<UnifiedEvent>("unified_events"); webEvents.addTransform(unifiedEvents, (event) => ({  userId: event.userId,  source: "web",  name: event.path,})); mobileEvents.addTransform(unifiedEvents, (event) => ({  userId: event.userId,  source: "mobile",  name: event.screen,}));
app/chained-stages.ts
import { Stream } from "@514labs/moose-lib"; interface RawEvent {  userId: string;  email: string;} interface ValidatedEvent extends RawEvent {} interface EnrichedEvent extends ValidatedEvent {  emailDomain: string;} const rawEvents = new Stream<RawEvent>("raw_events");const validatedEvents = new Stream<ValidatedEvent>("validated_events");const enrichedEvents = new Stream<EnrichedEvent>("enriched_events"); rawEvents.addTransform(validatedEvents, (event) => {  if (!event.userId || !event.email.includes("@")) {    return undefined;  }   return event;}); validatedEvents.addTransform(enrichedEvents, (event) => ({  ...event,  emailDomain: event.email.split("@")[1],}));