Process Raw Events with Streaming Functions

Transform Raw GitHub Star Events with a Streaming Function

Viewing typescript

switch to python

In this section, we'll enrich raw GitHub star event data by fetching additional information about the repositories owned by users who starred your repository. We'll use a Streaming Function to process incoming data in real-time.

Create StargazerProjectInfo Data Model

Let's define the data model that will store the enriched information.

Create StargazerProjectInfo.ts

Add a new file named StargazerProjectInfo.ts in your /datamodels directory:

      • RawStarEvent.ts
      • StargazerProjectInfo.ts
  • Define the Data Model

    Include the following fields in StargazerProjectInfo.ts:

    • starred_at (Date): Timestamp of the star event (used as the Key)
    • stargazerName (string): Username of the stargazer
    • repoName (string): Name of the repository
    • repoFullName (string): Full name of the repository
    • description (string): Repository description
    • repoUrl (string): Repository URL
    • repoStars (number): Number of stars the repository has
    • repoWatchers (number): Number of watchers the repository has
    • language (string): Programming language of the repository
    • repoSizeKb (number): Size of the repository in kilobytes
    • createdAt (Date): Repository creation date
    • updatedAt (Date): Last update date of the repository
    Try It Yourself

    Try creating the StargazerProjectInfo Data Model yourself. The solution is available if you get stuck.

    Initialize the Streaming Function

    We'll create a Streaming Function to transform RawStarEvent data into StargazerProjectInfo.

    Create the Function

    Run the following command in your terminal:

    Terminal
    npx moose-cli function init --source RawStarEvent --destination StargazerProjectInfo

    This generates RawStarEvent__StargazerProjectInfo.ts in the functions folder:

      • RawStarEvent__StargazerProjectInfo.ts
  • Review the Boilerplate

    The generated file includes:

    functions/RawStarEvent__StargazerProjectInfo.ts
    import { RawStarEvent } from "datamodels/RawStarEvent";
    import { StargazerProjectInfo } from "datamodels/StargazerProjectInfo";
     
    export default function run(source: RawStarEvent): StargazerProjectInfo | null {
      return {
        starred_at: new Date(),
        stargazerName: "",
        repoName: "",
        repoFullName: "",
        description: "",
        repoUrl: "",
        repoStars: 0,
        repoWatchers: 0,
        language: "",
        repoSizeKb: 0,
        createdAt: new Date(),
        updatedAt: new Date(),
      };
    }
    • When a new RawStarEvent is ingested, the Streaming Function automatically processes it using the run() function.
    • The boilerplate run() function imports RawStarEvent and StargazerProjectInfo and returns a StargazerProjectInfo object with placeholder values.

    You will implement the logic to transform RawStarEvent data into StargazerProjectInfo records within the run() function. Inside this function, you will fetch information about your stargazer's own repositories from the GitHub API and map the retrieved data to StargazerProjectInfo records.

    Implement the Transformation Logic

    Replace the boilerplate run() function with the following code to enrich the data:

    functions/RawStarEvent__StargazerProjectInfo.ts
    import { RawStarEvent } from "datamodels/RawStarEvent";
    import { StargazerProjectInfo } from "datamodels/StargazerProjectInfo";
     
    export default async function run(
      source: RawStarEvent
    ): Promise<StargazerProjectInfo[] | null> {
      if (source.action === "deleted" || !source.starred_at) {
        return null;
      }
     
      const repositories = await callGitHubAPI(source.sender.repos_url);
     
      const stargazerProjects = repositories.map((repo: any) => ({
        starred_at: new Date(source.starred_at),
        stargazerName: source.sender.login,
        repoName: repo.name,
        repoFullName: repo.full_name,
        description: repo.description,
        repoUrl: repo.html_url,
        repoStars: repo.stargazers_count,
        repoWatchers: repo.watchers_count,
        language: repo.language || "Multiple Languages",
        repoSizeKb: repo.size,
        createdAt: new Date(repo.created_at),
        updatedAt: new Date(repo.updated_at),
      }));
     
      return stargazerProjects;
    }
     
    async function callGitHubAPI(url: string): Promise<any> {
      const response = await fetch(url);
      return response.json();
    }

    This code fetches the stargazer's repositories from the GitHub API and maps each repository to a StargazerProjectInfo record.

    Returning an Array

    Moose will store each StargazerProjectInfo record in the returned array as its own row in the StargazerProjectInfo table.

    The run() function returns null for deleted stars, so Moose ignores them since we don't need to process these events.

    Test the Streaming Function

    Trigger a New Event

    Star your repository again to generate a new RawStarEvent.

    Verify the Transformation

    In your terminal, confirm that the event was processed:

    Received RawStarEvent_0_0 -> StargazerProjectInfo_0_0 1 message(s)

    Check the Data

    Query the StargazerProjectInfo table to see the enriched data:

    SELECT * FROM local.StargazerProjectInfo_0_0

    You have now created a real-time data processing service using a Streaming Function to enrich GitHub star events with additional repository information.