Transform functions
Use Stream.addTransform() or stream.add_transform() to map records from one Stream into another Stream. Transform functions are useful when records should be filtered, enriched, reshaped, or split before they continue through your pipeline.
Basic usage
app/transform.ts
import { Stream } from "@514labs/moose-lib"; interface RawEvent { userId: string; timestamp: number;} interface ProcessedEvent { userId: string; processedAt: Date;} const rawEvents = new Stream<RawEvent>("raw_events");const processedEvents = new Stream<ProcessedEvent>("processed_events"); rawEvents.addTransform(processedEvents, (event) => { if (!event.userId) { return undefined; } return { userId: event.userId, processedAt: new Date(event.timestamp), };});Transform behavior
- A transform reads from one source stream and writes to one destination stream.
- Return one record to emit one downstream record.
- Return
undefinedin TypeScript orNonein Python to skip a record. - Return an array in TypeScript or a list in Python to emit multiple downstream records from one input record.
Use a Dead letter queue when failed transforms should be captured instead of dropped.
Related capabilities
- Stream for defining the source and destination streams
Stream.send()for getting records into the source stream- Consumer functions when records should trigger side effects instead of writing to another stream
- Stream patterns for fan-in, fan-out, and chained stream examples
- Dead letter queues for failed-transform handling