Moose Streams
Moose Streams adds continuous ingest and event processing around Moose OLAP. Use it when data arrives continuously, needs to be transformed before storage, or should land in ClickHouse tables through managed stream pipelines backed by Kafka or Redpanda.
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record { id: string; name: string; timestamp: Date;} export const stream = new Stream<Record>("stream", { destination: table, version: "0.0",});Most Moose projects start by defining an OLAP table, then attach a stream when they need continuous ingest or real-time processing before data becomes queryable in ClickHouse.
Enable Moose Streams before defining streams
Most Moose projects already have streaming enabled. If you disabled it, turn features.streaming_engine back on in the project-root moose.config.toml before you create stream resources or attach them to OLAP tables.
[features]streaming_engine = trueChoose the starting point that matches whether your ClickHouse table is already modeled.
You already have an OLAP table
You need to define storage first
What can you do with Moose Streams?
Define streams
- Define a stream with a typed payload backed by a Kafka or Redpanda topic
Send records
The pattern for sending data to a Stream depends on where the producer runs.
- Publish from Moose code with
Stream.send()when the producer runs inside your Moose project and can import theStreamobject directly - Publish from external systems with the Ingest API when the producer runs outside your Moose project and should send JSON over HTTP
- Use Schema registry when stream payloads should use Confluent Schema Registry
Process records
- Run consumer functions when records should trigger side effects
- Run transform functions when records should be filtered, enriched, or rewritten into another stream
- Use stream patterns for fan-in, fan-out, and chained processing flows
Land records in ClickHouse
- Sync to table when stream records should land in Moose OLAP tables
Recover failures
- Use dead letter queues when failed records should be retained for recovery