We value your privacy

This site uses cookies to improve your browsing experience, analyze site traffic, and show personalized content. See our Privacy Policy.

  1. MooseStack
  2. Moose Streams

Moose Streams

Moose Streams adds continuous ingest and event processing around Moose OLAP. Use it when data arrives continuously, needs to be transformed before storage, or should land in ClickHouse tables through managed stream pipelines backed by Kafka or Redpanda.

app/stream.ts
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream", {  destination: table,  version: "0.0",});

Most Moose projects start by defining an OLAP table, then attach a stream when they need continuous ingest or real-time processing before data becomes queryable in ClickHouse.

Enable Moose Streams before defining streams

Most Moose projects already have streaming enabled. If you disabled it, turn features.streaming_engine back on in the project-root moose.config.toml before you create stream resources or attach them to OLAP tables.

moose.config.toml
[features]streaming_engine = true

Choose the starting point that matches whether your ClickHouse table is already modeled.

You already have an OLAP table
Start here if you already modeled the ClickHouse table and want stream data to land in it.
Sync to table →
You need to define storage first
Start with Moose OLAP if you need to model the ClickHouse table before adding stream ingest.
Model an OLAP table →

What can you do with Moose Streams?

Define streams

  • Define a stream with a typed payload backed by a Kafka or Redpanda topic

Send records

The pattern for sending data to a Stream depends on where the producer runs.

  • Publish from Moose code with Stream.send() when the producer runs inside your Moose project and can import the Stream object directly
  • Publish from external systems with the Ingest API when the producer runs outside your Moose project and should send JSON over HTTP
  • Use Schema registry when stream payloads should use Confluent Schema Registry

Process records

  • Run consumer functions when records should trigger side effects
  • Run transform functions when records should be filtered, enriched, or rewritten into another stream
  • Use stream patterns for fan-in, fan-out, and chained processing flows

Land records in ClickHouse

  • Sync to table when stream records should land in Moose OLAP tables

Recover failures

  • Use dead letter queues when failed records should be retained for recovery

Related resources

  • Stream
  • Stream patterns
  • Configuration

On this page

What can you do with Moose Streams?Define streamsSend recordsProcess recordsLand records in ClickHouseRecover failuresRelated resources
Edit this page
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackHostingTemplatesGuides
Release Notes
Source575
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Language Server
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streams
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Dev
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Query Layer
  • Testing Utilities
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
app/stream.ts
import { Stream } from "@514labs/moose-lib";import { table } from "./table"; interface Record {  id: string;  name: string;  timestamp: Date;} export const stream = new Stream<Record>("stream", {  destination: table,  version: "0.0",});