Streaming Functions
Viewing typescript
switch to python
Streaming Functions enable real-time data transformation, filtering, and enrichment between source and destination Data Models. They allow you to build dynamic streaming pipelines that can integrate external data, reshape payloads, trigger events on the fly, or implement any other logic you need.
- Streaming functions take a source Data Model as input and return a destination Data Model as output.
- You can implement any logic within a streaming function, including using libraries and calling external APIs.
- These functions are automatically triggered by new data points in the source Data Model’s streaming topic.
- The output is seamlessly forwarded to the destination Data Model’s streaming topic.
Example:
import { Foo, Bar } from "../datamodels/models.ts";
export default function transform(foo: Foo): Bar {
// Transformation logic
return {
primaryKey: foo.primaryKey,
utcTimestamp: foo.timestamp,
hasText: foo.optionalText !== null,
textLength: foo.optionalText ? foo.optionalText.length : 0,
} as Bar;
}
from moose_lib import StreamingFunction
from app.datamodels.models import Foo, Bar
def transform(foo: Foo) -> Bar:
# Transformation logic
return Bar(
primary_key=foo.primary_key,
utc_timestamp=foo.timestamp,
has_text=foo.optional_text is not None,
text_length=len(foo.optional_text) if foo.optional_text else 0
)
Foo__Bar = StreamingFunction(run=transform)
Quickstart
The fastest way to get started with a new Streaming Function is to use the CLI:
npx moose-cli function init --source <SOURCE_DATA_MODEL> --destination <DESTINATION_DATA_MODEL>
moose-cli function init --source <SOURCE_DATA_MODEL> --destination <DESTINATION_DATA_MODEL>
This generates a new file in your /functions
directory with the following naming convention:
SourceDataModel__DestinationDataModel.ts
- SourceDataModel__DestinationDataModel.ts
SourceDataModel__DestinationDataModel.py
- SourceDataModel__DestinationDataModel.py
You can alternatively create a new file manually in the /functions
directory. The file name must follow the pattern of separating the source and destination data models with two underscores.
Run moose ls
to see a list of available data models in your project.
Implementing the Streaming Function
Inside the file, you define a processing function as follows:
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { DestinationDataModel } from "../datamodels/DestinationDataModel";
export default function functionName(
source: SourceDataModel,
): DestinationDataModel[] | DestinationDataModel | null {
// Transformation logic
}
The default export is critical because Moose uses it to identify the Streaming Function’s entry point. The function name can be anything you want.
from app.datamodels.SourceDataModel import SourceDataModel
from app.datamodels.DestinationDataModel import DestinationDataModel
from moose_lib import StreamingFunction
def functionName(source: SourceDataModel) -> DestinationDataModel | list[DestinationDataModel] | None:
# Transformation logic
my_streaming_function = StreamingFunction(run=functionName)
Assign your function to the run
parameter of the StreamingFunction
class to designate
it as the entry point. The function name can be anything you want.
Streaming Function Examples with Referenced Data Models
In your Streaming Function, you can apply any transformation logic you need—filtering, enriching, or reshaping the data. Below are some common scenarios.
Basic Data Manipulation
Transform or combine the source data into a different structure before returning it to the destination:
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { ManipulatedDestinationDataModel } from "../datamodels/ManipulatedDestinationDataModel";
export default function manipulateData(source: SourceDataModel): ManipulatedDestinationDataModel {
// Manipulate multiple fields from the source data into the destination format.
return {
id: source.id,
summedNumber: source.numberField1 + source.numberField2,
uppercasedString: source.stringField.toUpperCase(),
utcMonth: source.dateField.getUTCMonth(),
utcYear: source.dateField.getUTCFullYear(),
utcDay: source.dateField.getUTCDate()
};
}
from moose_lib import StreamingFunction
from app.datamodels.SourceDataModel import SourceDataModel
from app.datamodels.ManipulatedDestinationDataModel import ManipulatedDestinationDataModel
def manipulate_data(source: SourceDataModel) -> ManipulatedDestinationDataModel:
# Manipulate multiple fields from the source data into the destination format.
return ManipulatedDestinationDataModel(
id=source.id,
summed_integer=source.integer_field1 + source.integer_field2,
uppercased_string=source.string_field.upper(),
utc_month=source.datetime_field.month,
utc_year=source.datetime_field.year,
utc_day=source.datetime_field.day,
)
streaming_function = StreamingFunction(
run=manipulate_data
)
Data Validation and Filtering
Return null
to discard invalid or unwanted data:
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { ValidatedDestinationDataModel } from "../datamodels/ValidatedDestinationDataModel";
export default function validateData(source: SourceDataModel): ValidatedDestinationDataModel | null {
// Validate the source data before processing.
if (!source.requiredField) {
// Discard the data if validation fails.
return null;
}
return {
id: source.id,
validField: source.requiredField,
timestamp: source.timestamp,
};
}
Return None
to discard invalid or unwanted data:
from moose_lib import StreamingFunction
from app.datamodels.SourceDataModel import SourceDataModel
from app.datamodels.ValidatedDestinationDataModel import ValidatedDestinationDataModel
def validate_data(source: SourceDataModel) -> ValidatedDestinationDataModel | None:
# Validate the source data before processing.
if not source.required_field:
# Discard the data if validation fails.
return None
return ValidatedDestinationDataModel(
id=source.id,
valid_field=source.required_field,
timestamp=source.timestamp
)
streaming_function = StreamingFunction(
run=validate_data
)
Data Augmentation with External API
Include external data by calling APIs from within the function:
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { AugmentedDestinationDataModel } from "../datamodels/AugmentedDestinationDataModel";
export default async function augmentData(source: SourceDataModel): Promise<AugmentedDestinationDataModel> {
// Fetch additional information from an external API
const response = await fetch(`https://api.example.com/data/${source.id}`);
const extraData = await response.json();
// Combine source data with fetched extra data
return {
...source,
extraField: extraData.field,
additionalInfo: extraData.info,
};
}
Remember to return a promise or to handle async requests properly, so Moose can await the results of your fetch calls.
from moose_lib import StreamingFunction
from app.datamodels.SourceDataModel import SourceDataModel
from app.datamodels.AugmentedDestinationDataModel import AugmentedDestinationDataModel
import requests
def augment_data(source: SourceDataModel) -> AugmentedDestinationDataModel:
# Fetch additional information from an external API
response = requests.get(f"https://api.example.com/data/{source.id}")
extra_data = response.json()
# Combine source data with fetched extra data
return AugmentedDestinationDataModel(
id=source.id,
extra_field=extra_data['field'],
additional_info=extra_data['info'],
# Add other necessary fields here
)
streaming_function = StreamingFunction(
run=augment_data
)
Flattening and Unnesting Data
Return an array of destination Data Models to produce multiple rows from a single source entry:
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { UnnestedDestinationDataModel } from "../datamodels/UnnestedDestinationDataModel";
export default function reshapeData(source: SourceDataModel): UnnestedDestinationDataModel[] {
// Unnest a list of strings into their own table rows
return source.nestedListOfObjects.map((object) => ({
parentId: source.id, // Keep the ID of the parent object
...object,
}));
}
Return a list of destination Data Models to produce multiple rows from a single source entry:
from moose_lib import StreamingFunction
from app.datamodels.SourceDataModel import SourceDataModel
from app.datamodels.DestinationDataModel import DestinationDataModel
def reshape_data(source: SourceDataModel) -> list[DestinationDataModel]:
rows = []
for object in source.nested_list_of_objects:
rows.append(DestinationDataModel(
parent_id=source.id,
field_a=object.field_a,
field_b=object.field_b
))
return rows
streaming_function = StreamingFunction(
run=reshape_data
)
Testing a Streaming Function
Once you have defined and saved your Streaming Function, you can verify it is working by following these steps:
Send Test Data to the Source Data Model
Ingest some test data into the source Data Model. When Moose processes new messages on the source’s streaming topic, your Streaming Function should be invoked.
Check out the Ingesting Data section of the documentation.
Check CLI Output
Monitor the terminal or logs. You should see a message indicating that Moose received and processed the data, for example:
Received SourceDataModel -> DestinationDataModel 1 message(s)
- Make sure your filename is correct and follows the naming convention of
SourceDataModel__DestinationDataModel
separated by two underscores. - Save the file after making changes.
- Check your function logic for errors.
- Run
moose logs --tail
to see logs in real time.
Inspect the Destination Table
After data is processed, check the destination Data Model’s table in the database to confirm the data has been correctly transformed.
You can run the following command to preview the contents of the destination table:
npx moose-cli peek <DESTINATION_DATA_MODEL>
moose-cli peek <DESTINATION_DATA_MODEL>
Next Steps
You now have a complete overview of how to initialize, implement, and test your Streaming Functions in Moose. Use them to filter, enrich, or transform incoming data in real-time, ensuring that only the most relevant data moves through your pipelines. For more advanced topics, explore: