Stream
Use Stream<T> to define a typed Kafka or Redpanda-backed stream in your Moose project. A Stream names the topic, defines the record shape, and becomes the object you publish to, consume from, transform, or sync into ClickHouse.
Enable Moose Streams before defining streams
Turn on features.streaming_engine in moose.config.toml before creating Stream resources.
Export Required
Moose only discovers resource definitions through your root app/index.ts barrel file. Re-export the stream shown here from that file or Moose will not pick up those definitions.
TypeScript example: export { stream }
Learn more about resource discovery: local development / hosted.
Basic usage
import { Stream } from "@514labs/moose-lib"; interface Record { id: string; name: string; timestamp: Date;} export const stream = new Stream<Record>("stream");Add a destination table
Attach a destination OLAP table when records from the stream should be batched and written into ClickHouse automatically.
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record { id: string; name: string; timestamp: Date;} export const stream = new Stream<Record>("stream", { destination: table, version: "0.0",});For batching and delivery behavior after records enter ClickHouse sync, see Sync to table.
Configuration options
| Option | What it does | Default |
|---|---|---|
parallelism | Number of partitions for the stream. | 1 |
retentionPeriod | How long records stay in the topic, in seconds. | 604800 (7 days) |
destination | OLAP table to sync stream records into ClickHouse. | - |
version | Version label Moose uses when managing the stream resource. | - |
lifeCycle | Control how Moose manages the stream when code changes. | - |
defaultDeadLetterQueue | Default dead letter queue for consumers and transforms attached to this stream. | - |
schemaConfig | Confluent Schema Registry configuration for this stream. | - |
metadata.description | Optional description metadata. | - |
Example configuration
import { DeadLetterQueue, LifeCycle, Stream } from "@514labs/moose-lib"; interface Record { id: string; timestamp: Date;} const streamDLQ = new DeadLetterQueue<Record>("high_throughput_dlq"); export const stream = new Stream<Record>("high_throughput", { version: "0.1", parallelism: 4, retentionPeriod: 86400, defaultDeadLetterQueue: streamDLQ, lifeCycle: LifeCycle.FULLY_MANAGED,});Related capabilities
Stream.send()when you need to send records into a stream- Consumer functions when you want code to run as records arrive
- Transform functions when you want to reshape or route records between streams
- Stream patterns for fan-in, fan-out, and multi-stage stream composition
- Sync to table when stream data should land in ClickHouse
- Schema registry when stream payloads should use Confluent Schema Registry
- Dead letter queues when failed records should be captured for recovery
- LifeCycle management guide for resource management details