Introduction to Streaming Functions
Viewing typescript
switch to python
Streaming Functions in Moose enable real-time data processing, allowing you to transform, enrich, filter, or trigger actions based on incoming data. These powerful functions can:
- Map data between Data Models
- Enrich data with external information
- Filter out unwanted data
- Implement arbitrary logic to handle and respond to incoming data
They are defined as regular functions in TypeScriptPython files, and are executed in real-time as data flows into your Moose application.
Core Concepts
Streaming Functions act as intermediares between two Data Models: a source and a destination. They are automatically triggered when new data arrives on the source Data Model's streaming topic. These functions transform the incoming data and output it in a format that matches the destination Data Model's schema.
Key points:
- Input: Data from the source Data Model's streaming topic
- Processing: Custom logic defined in the Streaming Function
- Output: Data conforming to the destination Data Model's schema
- Execution: Automatic and real-time, triggered by new messages to the source Data Model's streaming topic
File and Folder Conventions
Streaming Functions are defined in files within the /functions
directory of your Moose app, following this naming pattern:
SourceDataModel__DestinationDataModel.ts
- SourceDataModel__DestinationDataModel.ts
SourceDataModel__DestinationDataModel.py
- AnotherSource__AnotherDestination.py
Use two underscores to separate the source and destination data models in the file name. This convention allows Moose to automatically identify and execute the Streaming Functions.
Streaming Function Definition
A Streaming Function is defined as the default export of its module:
import { SourceDataModel } from "../datamodels/path/to/SourceDataModel";
import { DestinationDataModel } from "../datamodels/path/to/DestinationDataModel";
export default function functionName(
source: SourceDataModel,
): DestinationDataModel[] | DestinationDataModel | null {
// Transformation logic
}
The function accepts a single parameter source
of type SourceDataModel
. This is the data that Moose will pass to the function when it receives a new message on the streaming topic associated with the source Data Model.
The return type can be:
- An array of
DestinationDataModel
objects - A single
DestinationDataModel
object null
if no output should be produced
The function body contains the processing logic. We will go into more detail on this in the next section.
The function name itself is not important, as Moose identifies the function by the file convention.
A Streaming Function is defined as a StreamingFunction
object with a run
attribute that is assigned the function:
from moose_lib import StreamingFunction
from app.datamodels.models import SourceDataModel, DestinationDataModel
def function_name(source: SourceDataModel) -> DestinationDataModel:
# Transformation logic
pass
streaming_function = StreamingFunction(
run=function_name
)
-
The
StreamingFunction
object is how Moose identifies the entry point for the Streaming Function. -
The
run
attribute is assigned the function that Moose will call when it receives a new message on the streaming topic associated with the source Data Model. -
The function accepts a single parameter
source
of typeSourceDataModel
. This is the data that Moose will pass to the function when it receives a new message on the streaming topic associated with the source Data Model. -
The return type is
DestinationDataModel
. If a value is returned Moose automatically serializes it to JSON and writes it to the streaming topic associated with the destination Data Model.
The function name itself is not important, as Moose identifies the function by
whatever is assigned to the run
attribute of the StreamingFunction
object.