Stream Processing

Streaming Functions

Viewing typescript

switch to python

Streaming Functions enable real-time data transformation, filtering, and enrichment between source and destination Data Models. They allow you to build dynamic streaming pipelines that can integrate external data, reshape payloads, trigger events on the fly, or implement any other logic you need.

  • Streaming functions take a source Data Model as input and return a destination Data Model as output.
  • You can implement any logic within a streaming function, including using libraries and calling external APIs.
  • These functions are automatically triggered by new data points in the source Data Model’s streaming topic.
  • The output is seamlessly forwarded to the destination Data Model’s streaming topic.

Example:

/functions/Foo__Bar.ts
import { Foo, Bar } from "../datamodels/models.ts";
 
export default function transform(foo: Foo): Bar {
  // Transformation logic
  return {
    primaryKey: foo.primaryKey,
    utcTimestamp: foo.timestamp,
    hasText: foo.optionalText !== null,
    textLength: foo.optionalText ? foo.optionalText.length : 0,
  } as Bar;
}

Quickstart

The fastest way to get started with a new Streaming Function is to use the CLI:

Terminal
npx moose-cli function init --source <SOURCE_DATA_MODEL> --destination <DESTINATION_DATA_MODEL>

This generates a new file in your /functions directory with the following naming convention:

SourceDataModel__DestinationDataModel.ts
      • SourceDataModel__DestinationDataModel.ts

You can alternatively create a new file manually in the /functions directory. The file name must follow the pattern of separating the source and destination data models with two underscores.

Make Sure the Data Models Exist

Run moose ls to see a list of available data models in your project.

Implementing the Streaming Function

Inside the file, you define a processing function as follows:

/functions/SourceDataModel__DestinationDataModel.ts
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { DestinationDataModel } from "../datamodels/DestinationDataModel";
 
export default function functionName(
  source: SourceDataModel,
): DestinationDataModel[] | DestinationDataModel | null {
  // Transformation logic
}
Default Export

The default export is critical because Moose uses it to identify the Streaming Function’s entry point. The function name can be anything you want.


Streaming Function Examples with Referenced Data Models

In your Streaming Function, you can apply any transformation logic you need—filtering, enriching, or reshaping the data. Below are some common scenarios.

Basic Data Manipulation

Transform or combine the source data into a different structure before returning it to the destination:

/functions/SourceDataModel__ManipulatedDestinationDataModel.ts
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { ManipulatedDestinationDataModel } from "../datamodels/ManipulatedDestinationDataModel";
 
export default function manipulateData(source: SourceDataModel): ManipulatedDestinationDataModel {
  // Manipulate multiple fields from the source data into the destination format.
  return {
    id: source.id,
    summedNumber: source.numberField1 + source.numberField2,
    uppercasedString: source.stringField.toUpperCase(),
    utcMonth: source.dateField.getUTCMonth(),
    utcYear: source.dateField.getUTCFullYear(),
    utcDay: source.dateField.getUTCDate()
  };
}

Data Validation and Filtering

Return null to discard invalid or unwanted data:

/functions/SourceDataModel__ValidatedDestinationDataModel.ts
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { ValidatedDestinationDataModel } from "../datamodels/ValidatedDestinationDataModel";
 
export default function validateData(source: SourceDataModel): ValidatedDestinationDataModel | null {
  // Validate the source data before processing.
  if (!source.requiredField) {
    // Discard the data if validation fails.
    return null;
  }
 
  return {
    id: source.id,
    validField: source.requiredField,
    timestamp: source.timestamp,
  };
}

Data Augmentation with External API

Include external data by calling APIs from within the function:

/functions/SourceDataModel__AugmentedDestinationDataModel.ts
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { AugmentedDestinationDataModel } from "../datamodels/AugmentedDestinationDataModel";
 
export default async function augmentData(source: SourceDataModel): Promise<AugmentedDestinationDataModel> {
  // Fetch additional information from an external API
  const response = await fetch(`https://api.example.com/data/${source.id}`);
  const extraData = await response.json();
 
  // Combine source data with fetched extra data
  return {
    ...source,
    extraField: extraData.field,
    additionalInfo: extraData.info,
  };
}
MooseTip:

Remember to return a promise or to handle async requests properly, so Moose can await the results of your fetch calls.

Flattening and Unnesting Data

Return an array of destination Data Models to produce multiple rows from a single source entry:

/functions/SourceDataModel__UnnestedDestinationDataModel.ts
import { SourceDataModel } from "../datamodels/SourceDataModel";
import { UnnestedDestinationDataModel } from "../datamodels/UnnestedDestinationDataModel";
 
export default function reshapeData(source: SourceDataModel): UnnestedDestinationDataModel[] {
  // Unnest a list of strings into their own table rows
  return source.nestedListOfObjects.map((object) => ({
    parentId: source.id, // Keep the ID of the parent object
    ...object,
  }));
}

Testing a Streaming Function

Once you have defined and saved your Streaming Function, you can verify it is working by following these steps:

Send Test Data to the Source Data Model

Ingest some test data into the source Data Model. When Moose processes new messages on the source’s streaming topic, your Streaming Function should be invoked.

Need Help Ingesting Data?

Check out the Ingesting Data section of the documentation.

Check CLI Output

Monitor the terminal or logs. You should see a message indicating that Moose received and processed the data, for example:

Received SourceDataModel -> DestinationDataModel 1 message(s)
Don't See This Message? Try These Steps:
  • Make sure your filename is correct and follows the naming convention of SourceDataModel__DestinationDataModel separated by two underscores.
  • Save the file after making changes.
  • Check your function logic for errors.
  • Run moose logs --tail to see logs in real time.

Inspect the Destination Table

After data is processed, check the destination Data Model’s table in the database to confirm the data has been correctly transformed.

You can run the following command to preview the contents of the destination table:

npx moose-cli peek <DESTINATION_DATA_MODEL>

Next Steps

You now have a complete overview of how to initialize, implement, and test your Streaming Functions in Moose. Use them to filter, enrich, or transform incoming data in real-time, ensuring that only the most relevant data moves through your pipelines. For more advanced topics, explore: