Implementing Streaming Functions
Viewing typescript
switch to python
Streaming Functions are just ordinary TypescriptPython functions. Inside the function, you can leverage the full power of the language to process, enrich, filter, or otherwise manipulate the source data as needed.
This guide introduces some common operations you might perform within a Streaming Function.
Check out the Creating Streaming Functions guide.
Basic Data Manipulation
import { SourceDataModel } from "datamodels/path/to/SourceDataModel.ts";
import { ManipulatedDestinationDataModel } from "datamodels/path/to/ManipulatedDestinationDataModel.ts";
export default function manipulateData(source: SourceDataModel): ManipulatedDestinationDataModel {
// Manipulate multiple fields from the source data into the destination format.
const manipulatedNumber = source.numberField1 + source.numberField2;
const manipulatedString = source.stringField.toUpperCase();
const manipulatedDate = new Date(source.dateField).toISOString();
return {
id: source.id,
manipulatedNumber: manipulatedNumber,
manipulatedString: manipulatedString,
manipulatedDate: manipulatedDate,
};
}
from moose_lib import StreamingFunction
from app.datamodels.models import SourceDataModel, ManipulatedDestinationDataModel
def manipulate_data(source: SourceDataModel) -> ManipulatedDestinationDataModel:
# Manipulate multiple fields from the source data into the destination format.
manipulated_integer = source.integer_field1 + source.integer_field2
manipulated_string = source.string_field.upper()
manipulated_datetime = source.datetime_field.isoformat()
return ManipulatedDestinationDataModel(
id=source.id,
manipulated_integer=manipulated_integer,
manipulated_string=manipulated_string,
manipulated_datetime=manipulated_datetime,
)
streaming_function = StreamingFunction(
run=manipulate_data
)
Data Validation and Filtering
By returning null
None
you can discard the data.
import { SourceDataModel } from "datamodels/models.ts";
import { DestinationDataModel } from "datamodels/models.ts";
export default function validateData(source: SourceDataModel): ValidatedDestinationDataModel | null {
// Validate the source data before processing.
if (!source.requiredField) {
// Discard the data if validation fails.
return null;
}
return {
id: source.id,
validField: source.requiredField,
timestamp: new Date(source.timestamp),
};
}
from moose_lib import StreamingFunction
from app.datamodels.models import SourceDataModel, ValidatedDestinationDataModel
def validate_data(source: SourceDataModel) -> ValidatedDestinationDataModel | None:
# Validate the source data before processing.
if not source.required_field:
# Discard the data if validation fails.
return None
return ValidatedDestinationDataModel(
id=source.id,
valid_field=source.required_field,
timestamp=source.timestamp
)
streaming_function = StreamingFunction(
run=validate_data
)
Data Augmentation with External API
You can use the fetch
requests
module to enrich your source data with additional data from an external API.
import { SourceDataModel } from "datamodels/path/to/SourceDataModel.ts";
import { AugmentedDestinationDataModel } from "datamodels/path/to/AugmentedDestinationDataModel.ts";
export default async function augmentData(source: SourceDataModel): Promise<AugmentedDestinationDataModel> {
// Fetch additional information from an external API
const response = await fetch(`https://api.example.com/data/${source.id}`);
const extraData = await response.json();
// Combine source data with fetched extra data
return {
...source,
extraField: extraData.field,
additionalInfo: extraData.info,
};
}
Make sure to return a promise to fetch data from an external API. You must also make the function async
and use await
to fetch data from an external API.
from moose_lib import StreamingFunction
from app.datamodels.models import SourceDataModel, AugmentedDestinationDataModel
import requests
def augment_data(source: SourceDataModel) -> AugmentedDestinationDataModel:
# Fetch additional information from an external API
response = requests.get(f"https://api.example.com/data/{source.id}")
extra_data = response.json()
# Combine source data with fetched extra data
return AugmentedDestinationDataModel(
id=source.id,
extra_field=extra_data['field'],
additional_info=extra_data['info'],
# Add other necessary fields here
)
streaming_function = StreamingFunction(
run=augment_data
)
Splitting Data Into Multiple Entries
To create multiple entries, you can return an array of the destination Data Model. A common use case is when you have a Data Model with a nested list of objects that you want to unnest and store in their own table. Moose will convert each object inside the array into a separate database entry.
import { SourceDataModel } from "datamodels/models.ts";
import { UnnestedDestinationDataModel } from "datamodels/models.ts";
export default function reshapeData(source: SourceDataModel): UnnestedDestinationDataModel[] {
// Unnest a list of objects into their own table rows
const objects = source.nestedListOfObjects;
return objects.map((object) => ({
id: source.id, // Keep the ID of the parent object
...object,
}));
}
from moose_lib import StreamingFunction
from app.datamodels.models import SourceDataModel, DestinationDataModel
def reshape_data(source: SourceDataModel) -> list[DestinationDataModel]:
objects = source.nested_list_of_objects
return [
DestinationDataModel(
id=source.id,
**item
)
for item in items
]
streaming_function = StreamingFunction(
run=reshape_data
)