We value your privacy

This site uses cookies to improve your browsing experience, analyze site traffic, and show personalized content. See our Privacy Policy.

  1. MooseStack
  2. Moose Streams
  3. Schema registry

Schema registry

Current support

Only JSON Schema is supported today. Avro and Protobuf are planned.

Use schemaConfig (TypeScript) or schema_config (Python) to publish a Stream through Confluent Schema Registry.

Requirements

Configure the Schema Registry URL in moose.config.toml under redpanda_config, or override it with environment variables.

moose.config.toml
[redpanda_config]broker = "localhost:19092"schema_registry_url = "http://localhost:8081"

Environment overrides (either key works):

export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
# or
export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081

For Python, install confluent-kafka[json,schemaregistry] before publishing to a Schema Registry stream:

pip install "confluent-kafka[json,schemaregistry]"

Attach Schema Registry config to a stream

Set kind to choose the payload format and reference to choose which registered schema Moose uses when it publishes.

sr-stream.ts
import { Stream, type KafkaSchemaConfig } from "@514labs/moose-lib"; interface Event {  id: string;  value: number;} const schemaConfig: KafkaSchemaConfig = {  kind: "JSON",  reference: { subjectLatest: "event-value" },}; export const events = new Stream<Event>("events", {  schemaConfig,}); // Producing uses Schema Registry envelope automaticallyawait events.send({ id: "e1", value: 42 });

Schema Registry config fields

The kind field tells Moose which Schema Registry payload format to use for the stream. JSON publishes records using Schema Registry JSON, and it is the only supported value today.

The reference field tells Moose how to resolve the schema id before it encodes and publishes a record. Use it to look up the latest version for a subject, pin a specific subject version, or use a fixed schema id directly.

TypeScript reference formats:

  • Latest subject version: { subjectLatest: string } looks up the newest schema version registered under that subject.
  • Specific subject version: { subject: string, version: number } pins publishing to one subject version.
  • Fixed schema id: { id: number } uses a known schema id directly.

Runtime behavior

  • If schemaConfig.kind = "JSON" or schema_config.kind = "JSON" is set, Stream.send() publishes records using the Confluent wire format.
  • Moose resolves the schema id from the configured reference before sending unless you provide a fixed id.
  • Moose streaming runners automatically detect the Schema Registry JSON envelope and strip the header before parsing the payload.
  • When an Ingest API routes to a stream configured this way, Moose also publishes using the Schema Registry envelope.
  • Publishing fails if schema_registry_url is not configured.

Pull existing topics and schemas

Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models.

moose kafka pull <bootstrap> \
  --schema-registry http://localhost:8081 \
  --path app/external-topics \
  --include "*" \
  --exclude "{__consumer_offsets,_schemas}"

This writes external topic declarations under the provided path. When Schema Registry subjects are available, Moose also emits typed models from the fetched JSON Schemas.

Related capabilities

  • Stream for defining the stream resource
  • Stream.send() for publishing records to a Schema Registry stream
  • Ingest API for external HTTP ingestion into a Schema Registry stream

On this page

RequirementsAttach Schema Registry config to a streamSchema Registry config fieldsRuntime behaviorPull existing topics and schemasRelated capabilities
Edit this page
FiveonefourFiveonefour
Fiveonefour Docs
MooseStackHostingTemplatesGuides
Release Notes
Source575
  • Overview
Build a New App
  • 5 Minute Quickstart
  • Browse Templates
  • Existing ClickHouse
Add to Existing App
  • Next.js
  • Fastify
Fundamentals
  • Moose Runtime
  • MooseDev MCP
  • Language Server
  • Data Modeling
Moose Modules
  • Moose OLAP
  • Moose Streams
    • Define
    • Stream
    • Send
    • Stream.send()
    • Schema registry
    • Process
    • Consumer functions
    • Transform functions
    • Stream patterns
    • Store
    • Sync to table
    • Recover
    • Dead letter queues
  • Moose Workflows
  • Moose APIs & Web Apps
Deployment & Lifecycle
  • Moose Dev
  • Moose Migrate
  • Moose Deploy
Reference
  • API Reference
  • Query Layer
  • Testing Utilities
  • Data Types
  • Table Engines
  • CLI
  • Configuration
  • Observability Metrics
  • Help
  • Release Notes
Contribution
  • Documentation
  • Framework
sr-stream.ts
import { Stream, type KafkaSchemaConfig } from "@514labs/moose-lib"; interface Event {  id: string;  value: number;} const schemaConfig: KafkaSchemaConfig = {  kind: "JSON",  reference: { subjectLatest: "event-value" },}; export const events = new Stream<Event>("events", {  schemaConfig,}); // Producing uses Schema Registry envelope automaticallyawait events.send({ id: "e1", value: 42 });

TypeScript reference formats:

  • Latest subject version: { subjectLatest: string } looks up the newest schema version registered under that subject.
  • Specific subject version: { subject: string, version: number } pins publishing to one subject version.
  • Fixed schema id: { id: number } uses a known schema id directly.