Stream patterns
Use this page for multi-stream composition examples that go beyond the basic Stream, Stream.send(), consumer, and transform references.
Fan-out
Send one source stream into multiple downstream streams when different processors or storage paths should receive different shapes of the same event flow.
app/fan-out.ts
import { Stream } from "@514labs/moose-lib"; interface OrderEvent { orderId: string; userId: string; amount: number;} interface AnalyticsOrderEvent extends OrderEvent {} interface HighValueOrderEvent extends OrderEvent { priority: "high";} const orders = new Stream<OrderEvent>("orders");const analyticsOrders = new Stream<AnalyticsOrderEvent>("analytics_orders");const highValueOrders = new Stream<HighValueOrderEvent>("high_value_orders"); orders.addTransform(analyticsOrders, (order) => order); orders.addTransform(highValueOrders, (order) => { if (order.amount < 1000) { return undefined; } return { ...order, priority: "high", };});Fan-in
Send multiple source streams into one destination stream when different producers should land in a shared downstream schema.
app/fan-in.ts
import { Stream } from "@514labs/moose-lib"; interface WebEvent { userId: string; path: string;} interface MobileEvent { userId: string; screen: string;} interface UnifiedEvent { userId: string; source: "web" | "mobile"; name: string;} const webEvents = new Stream<WebEvent>("web_events");const mobileEvents = new Stream<MobileEvent>("mobile_events");const unifiedEvents = new Stream<UnifiedEvent>("unified_events"); webEvents.addTransform(unifiedEvents, (event) => ({ userId: event.userId, source: "web", name: event.path,})); mobileEvents.addTransform(unifiedEvents, (event) => ({ userId: event.userId, source: "mobile", name: event.screen,}));Using intermediate streams
Use multiple intermediate streams when records should move through distinct processing steps.
app/chained-stages.ts
import { Stream } from "@514labs/moose-lib"; interface RawEvent { userId: string; email: string;} interface ValidatedEvent extends RawEvent {} interface EnrichedEvent extends ValidatedEvent { emailDomain: string;} const rawEvents = new Stream<RawEvent>("raw_events");const validatedEvents = new Stream<ValidatedEvent>("validated_events");const enrichedEvents = new Stream<EnrichedEvent>("enriched_events"); rawEvents.addTransform(validatedEvents, (event) => { if (!event.userId || !event.email.includes("@")) { return undefined; } return event;}); validatedEvents.addTransform(enrichedEvents, (event) => ({ ...event, emailDomain: event.email.split("@")[1],}));Related capabilities
- Stream for defining the streams used in each pattern
- Transform functions for the single-transform reference page
- Consumer functions when a stage should trigger side effects instead of writing to another stream
- Dead letter queues when failed records in multi-stage flows should be captured