Implementing Streaming Functions

Implementing Streaming Functions

Viewing typescript

switch to python

Streaming Functions are just ordinary Typescript functions. Inside the function, you can leverage the full power of the language to process, enrich, filter, or otherwise manipulate the source data as needed.

This guide introduces some common operations you might perform within a Streaming Function.

Need Help Setting Up Your Streaming Function File?

Check out the Creating Streaming Functions guide.

Basic Data Manipulation

/functions/SourceDataModel__ManipulatedDestinationDataModel.ts
import { SourceDataModel } from "datamodels/path/to/SourceDataModel.ts";
import { ManipulatedDestinationDataModel } from "datamodels/path/to/ManipulatedDestinationDataModel.ts";
 
export default function manipulateData(source: SourceDataModel): ManipulatedDestinationDataModel {
  // Manipulate multiple fields from the source data into the destination format.
  const manipulatedNumber = source.numberField1 + source.numberField2;
  const manipulatedString = source.stringField.toUpperCase();
  const manipulatedDate = new Date(source.dateField).toISOString();
 
  return {
    id: source.id,
    manipulatedNumber: manipulatedNumber,
    manipulatedString: manipulatedString,
    manipulatedDate: manipulatedDate,
  };
}

Data Validation and Filtering

By returning null you can discard the data.

/functions/SourceDataModel__ValidatedDestinationDataModel.ts
import { SourceDataModel } from "datamodels/models.ts";
import { DestinationDataModel } from "datamodels/models.ts";
 
export default function validateData(source: SourceDataModel): ValidatedDestinationDataModel | null {
  // Validate the source data before processing.
  if (!source.requiredField) {
    // Discard the data if validation fails.
    return null;
  }
 
  return {
    id: source.id,
    validField: source.requiredField,
    timestamp: new Date(source.timestamp),
  };
}

Data Augmentation with External API

You can use the fetch module to enrich your source data with additional data from an external API.

/functions/SourceDataModel__AugmentedDestinationDataModel.ts
import { SourceDataModel } from "datamodels/path/to/SourceDataModel.ts";
import { AugmentedDestinationDataModel } from "datamodels/path/to/AugmentedDestinationDataModel.ts";
 
export default async function augmentData(source: SourceDataModel): Promise<AugmentedDestinationDataModel> {
  // Fetch additional information from an external API
  const response = await fetch(`https://api.example.com/data/${source.id}`);
  const extraData = await response.json();
 
  // Combine source data with fetched extra data
  return {
    ...source,
    extraField: extraData.field,
    additionalInfo: extraData.info,
  };
}
MooseTip:

Make sure to return a promise to fetch data from an external API. You must also make the function async and use await to fetch data from an external API.

Splitting Data Into Multiple Entries

To create multiple entries, you can return an array of the destination Data Model. A common use case is when you have a Data Model with a nested list of objects that you want to unnest and store in their own table. Moose will convert each object inside the array into a separate database entry.

/functions/SourceDataModel__UnnestedDestinationDataModel.ts
import { SourceDataModel } from "datamodels/models.ts";
import { UnnestedDestinationDataModel } from "datamodels/models.ts";
 
export default function reshapeData(source: SourceDataModel): UnnestedDestinationDataModel[] {
  // Unnest a list of objects into their own table rows
  const objects = source.nestedListOfObjects;
 
  return objects.map((object) => ({
    id: source.id, // Keep the ID of the parent object
    ...object,
  }));
}