We value your privacy

This site uses cookies to improve your browsing experience, analyze site traffic, and show personalized content. See our Privacy Policy.

  1. MooseStack
  2. Moose Streams
  3. Stream

Stream

Use Stream<T> to define a typed Kafka or Redpanda-backed stream in your Moose project. A Stream names the topic, defines the record shape, and becomes the object you publish to, consume from, transform, or sync into ClickHouse.

Enable Moose Streams before defining streams

Turn on features.streaming_engine in moose.config.toml before creating Stream resources.

Export Required

Moose only discovers resource definitions through your root app/index.ts barrel file. Re-export the stream shown here from that file or Moose will not pick up those definitions.

TypeScript example: export { stream }

Learn more about resource discovery: local development / hosted.

Basic usage

app/stream.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream");

Add a destination table

Attach a destination OLAP table when records from the stream should be batched and written into ClickHouse automatically.

app/stream.ts
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream", {  destination: table,  version: "0.0",});

For batching and delivery behavior after records enter ClickHouse sync, see Sync to table.

Configuration options

OptionWhat it doesDefault
parallelismNumber of partitions for the stream.1
retentionPeriodHow long records stay in the topic, in seconds.604800 (7 days)
destinationOLAP table to sync stream records into ClickHouse.-
versionVersion label Moose uses when managing the stream resource.-
lifeCycleControl how Moose manages the stream when code changes.-
defaultDeadLetterQueueDefault dead letter queue for consumers and transforms attached to this stream.-
schemaConfigConfluent Schema Registry configuration for this stream.-
metadata.descriptionOptional description metadata.-

Example configuration

app/stream.ts
import { DeadLetterQueue, LifeCycle, Stream } from "@514labs/moose-lib"; interface Record {  id: string;  timestamp: Date;} const streamDLQ = new DeadLetterQueue<Record>("high_throughput_dlq"); export const stream = new Stream<Record>("high_throughput", {  version: "0.1",  parallelism: 4,  retentionPeriod: 86400,  defaultDeadLetterQueue: streamDLQ,  lifeCycle: LifeCycle.FULLY_MANAGED,});

Related capabilities

  • Stream.send() when you need to send records into a stream
  • Consumer functions when you want code to run as records arrive
  • Transform functions when you want to reshape or route records between streams
  • Stream patterns for fan-in, fan-out, and multi-stage stream composition
  • Sync to table when stream data should land in ClickHouse
  • Schema registry when stream payloads should use Confluent Schema Registry
  • Dead letter queues when failed records should be captured for recovery
  • LifeCycle management guide for resource management details

On this page

Basic usageAdd a destination tableConfiguration optionsExample configurationRelated capabilities
Edit this page
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackHostingTemplatesGuides
Release Notes
Source575
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Language Server
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streams
    • Define
    • Stream
    • Send
    • Stream.send()
    • Schema registry
    • Process
    • Consumer functions
    • Transform functions
    • Stream patterns
    • Store
    • Sync to table
    • Recover
    • Dead letter queues
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Dev
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Query Layer
  • Testing Utilities
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
Export Required

Moose only discovers resource definitions through your root app/index.ts barrel file. Re-export the stream shown here from that file or Moose will not pick up those definitions.

TypeScript example: export { stream }

Learn more about resource discovery: local development / hosted.

app/stream.ts
import { Stream } from "@514labs/moose-lib"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream");
app/stream.ts
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream", {  destination: table,  version: "0.0",});
OptionWhat it doesDefault
parallelismNumber of partitions for the stream.1
retentionPeriodHow long records stay in the topic, in seconds.604800 (7 days)
destinationOLAP table to sync stream records into ClickHouse.-
versionVersion label Moose uses when managing the stream resource.-
lifeCycleControl how Moose manages the stream when code changes.-
defaultDeadLetterQueueDefault dead letter queue for consumers and transforms attached to this stream.-
schemaConfigConfluent Schema Registry configuration for this stream.-
metadata.descriptionOptional description metadata.-
app/stream.ts
import { DeadLetterQueue, LifeCycle, Stream } from "@514labs/moose-lib"; interface Record {  id: string;  timestamp: Date;} const streamDLQ = new DeadLetterQueue<Record>("high_throughput_dlq"); export const stream = new Stream<Record>("high_throughput", {  version: "0.1",  parallelism: 4,  retentionPeriod: 86400,  defaultDeadLetterQueue: streamDLQ,  lifeCycle: LifeCycle.FULLY_MANAGED,});