Schema registry
Current support
Only JSON Schema is supported today. Avro and Protobuf are planned.
Use schemaConfig (TypeScript) or schema_config (Python) to publish a Stream through Confluent Schema Registry.
Requirements
Configure the Schema Registry URL in moose.config.toml under redpanda_config, or override it with environment variables.
[redpanda_config]broker = "localhost:19092"schema_registry_url = "http://localhost:8081"Environment overrides (either key works):
export MOOSE_REDPANDA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081
# or
export MOOSE_KAFKA_CONFIG__SCHEMA_REGISTRY_URL=http://localhost:8081For Python, install confluent-kafka[json,schemaregistry] before publishing to a Schema Registry stream:
pip install "confluent-kafka[json,schemaregistry]"Attach Schema Registry config to a stream
Set kind to choose the payload format and reference to choose which registered schema Moose uses when it publishes.
import { Stream, type KafkaSchemaConfig } from "@514labs/moose-lib"; interface Event { id: string; value: number;} const schemaConfig: KafkaSchemaConfig = { kind: "JSON", reference: { subjectLatest: "event-value" },}; export const events = new Stream<Event>("events", { schemaConfig,}); // Producing uses Schema Registry envelope automaticallyawait events.send({ id: "e1", value: 42 });Schema Registry config fields
The kind field tells Moose which Schema Registry payload format to use for the stream. JSON publishes records using Schema Registry JSON, and it is the only supported value today.
The reference field tells Moose how to resolve the schema id before it encodes and publishes a record. Use it to look up the latest version for a subject, pin a specific subject version, or use a fixed schema id directly.
TypeScript reference formats:
- Latest subject version:
{ subjectLatest: string }looks up the newest schema version registered under that subject. - Specific subject version:
{ subject: string, version: number }pins publishing to one subject version. - Fixed schema id:
{ id: number }uses a known schema id directly.
Runtime behavior
- If
schemaConfig.kind = "JSON"orschema_config.kind = "JSON"is set,Stream.send()publishes records using the Confluent wire format. - Moose resolves the schema id from the configured reference before sending unless you provide a fixed
id. - Moose streaming runners automatically detect the Schema Registry JSON envelope and strip the header before parsing the payload.
- When an Ingest API routes to a stream configured this way, Moose also publishes using the Schema Registry envelope.
- Publishing fails if
schema_registry_urlis not configured.
Pull existing topics and schemas
Use the CLI to pull external topics and optionally fetch JSON Schemas from Schema Registry to emit typed models.
moose kafka pull <bootstrap> \
--schema-registry http://localhost:8081 \
--path app/external-topics \
--include "*" \
--exclude "{__consumer_offsets,_schemas}"This writes external topic declarations under the provided path. When Schema Registry subjects are available, Moose also emits typed models from the fetched JSON Schemas.
Related capabilities
- Stream for defining the stream resource
Stream.send()for publishing records to a Schema Registry stream- Ingest API for external HTTP ingestion into a Schema Registry stream