Transform Raw GitHub Star Events with a Streaming Function
Viewing typescript
switch to python
In this section, we'll enrich raw GitHub star event data by fetching additional information about the repositories owned by users who starred your repository. We'll use a Streaming Function to process incoming data in real-time.
Create StargazerProjectInfo
Data Model
Let's define the data model that will store the enriched information.
Create StargazerProjectInfo.ts
Create StargazerProjectInfo.py
Add a new file named StargazerProjectInfo.tsStargazerProjectInfo.py in your /datamodels
directory:
- RawStarEvent.ts
- StargazerProjectInfo.ts
- RawStarEvent.py
- StargazerProjectInfo.py
Define the Data Model
Include the following fields in StargazerProjectInfo.ts
:
starred_at
(Date): Timestamp of the star event (used as theKey
)stargazerName
(string): Username of the stargazerrepoName
(string): Name of the repositoryrepoFullName
(string): Full name of the repositorydescription
(string): Repository descriptionrepoUrl
(string): Repository URLrepoStars
(number): Number of stars the repository hasrepoWatchers
(number): Number of watchers the repository haslanguage
(string): Programming language of the repositoryrepoSizeKb
(number): Size of the repository in kilobytescreatedAt
(Date): Repository creation dateupdatedAt
(Date): Last update date of the repository
starred_at
(datetime): Timestamp of the star eventstargazer_login
(str): Username of the stargazer (used as theKey
)repo_name
(str): Name of the repositoryrepo_full_name
(str): Full name of the repositorydescription
(str): Repository descriptionrepo_url
(str): Repository URLrepo_stars
(int): Number of stars the repository hasrepo_watchers
(int): Number of watchers the repository haslanguage
(str): Programming language of the repositoryrepo_size_kb
(int): Size of the repository in kilobytescreated_at
(datetime): Repository creation dateupdated_at
(datetime): Last update date of the repository
Try creating the StargazerProjectInfo
Data Model yourself. The solution is available if you get stuck.
Initialize the Streaming Function
We'll create a Streaming Function to transform RawStarEvent
data into StargazerProjectInfo
.
Create the Function
Run the following command in your terminal:
npx moose-cli function init --source RawStarEvent --destination StargazerProjectInfo
moose-cli function init --source RawStarEvent --destination StargazerProjectInfo
This generates RawStarEvent__StargazerProjectInfo.ts
RawStarEvent__StargazerProjectInfo.py
in the functions
folder:
- RawStarEvent__StargazerProjectInfo.ts
- RawStarEvent__StargazerProjectInfo.py
Review the Boilerplate
The generated file includes:
import { RawStarEvent } from "datamodels/RawStarEvent";
import { StargazerProjectInfo } from "datamodels/StargazerProjectInfo";
export default function run(source: RawStarEvent): StargazerProjectInfo | null {
return {
starred_at: new Date(),
stargazerName: "",
repoName: "",
repoFullName: "",
description: "",
repoUrl: "",
repoStars: 0,
repoWatchers: 0,
language: "",
repoSizeKb: 0,
createdAt: new Date(),
updatedAt: new Date(),
};
}
# Import your Moose data models to use in the streaming function
from app.datamodels.RawStarEvent import RawStarEvent
from app.datamodels.StargazerProjectInfo import StargazerProjectInfo
from moose_lib import StreamingFunction
from typing import Optional
from datetime import datetime
def fn(source: RawStarEvent) -> Optional[StargazerProjectInfo]:
return StargazerProjectInfo(
starred_at=datetime.now(),
stargazer_login="",
repo_name="",
repo_full_name="",
description="",
repo_url="",
repo_stars=0,
repo_watchers=0,
language="",
repo_size_kb=0,
created_at=datetime.now(),
updated_at=datetime.now(),
)
my_function = StreamingFunction(
run=fn
)
- When a new
RawStarEvent
is ingested, the Streaming Function automatically processes it using therun()
functionfunction assigned to therun
property of theStreamingFunction
(in this case, thefn()
function). - The boilerplate
run()
functionfn()
function importsRawStarEvent
andStargazerProjectInfo
and returns aStargazerProjectInfo
object with placeholder values.
You will implement the logic to transform RawStarEvent
data into StargazerProjectInfo
records within the run()
functionfn()
function. Inside this function, you will fetch information about your stargazer's own repositories from the GitHub API and map the retrieved data to StargazerProjectInfo
records.
Implement the Transformation Logic
Replace the boilerplate run()
function with the following code to enrich the data:
import { RawStarEvent } from "datamodels/RawStarEvent";
import { StargazerProjectInfo } from "datamodels/StargazerProjectInfo";
export default async function run(
source: RawStarEvent
): Promise<StargazerProjectInfo[] | null> {
if (source.action === "deleted" || !source.starred_at) {
return null;
}
const repositories = await callGitHubAPI(source.sender.repos_url);
const stargazerProjects = repositories.map((repo: any) => ({
starred_at: new Date(source.starred_at),
stargazerName: source.sender.login,
repoName: repo.name,
repoFullName: repo.full_name,
description: repo.description,
repoUrl: repo.html_url,
repoStars: repo.stargazers_count,
repoWatchers: repo.watchers_count,
language: repo.language || "Multiple Languages",
repoSizeKb: repo.size,
createdAt: new Date(repo.created_at),
updatedAt: new Date(repo.updated_at),
}));
return stargazerProjects;
}
async function callGitHubAPI(url: string): Promise<any> {
const response = await fetch(url);
return response.json();
}
Replace the boilerplace fn()
function with the following code to enrich the data:
# Import your Moose data models to use in the streaming function
from app.datamodels.RawStarEvent import RawStarEvent
from app.datamodels.StargazerProjectInfo import StargazerProjectInfo
from moose_lib import StreamingFunction, cli_log, CliLogData
from typing import Optional
from datetime import datetime
import requests
def call_github_api(url: str) -> dict:
response = requests.get(url)
response.raise_for_status()
return response.json()
def fn(source: RawStarEvent) -> Optional[list[StargazerProjectInfo]]:
if source.action == "deleted" or not source.starred_at:
cli_log(CliLogData(action=source.action, message=f"Skipping deleted or without starred_at", message_type="Info"))
return None
repositories = call_github_api(source.sender.repos_url)
cli_log(CliLogData(action="Got repositories", message=f"{len(repositories)}", message_type="Info"))
data=[]
for repo in repositories:
data.append(
StargazerProjectInfo(
starred_at=source.starred_at,
stargazer_login=source.sender.login,
repo_name=repo["name"],
repo_full_name=repo["full_name"],
description=repo["description"],
repo_url=repo["html_url"],
repo_stars=repo["stargazers_count"],
repo_watchers=repo["watchers_count"],
language=repo["language"],
repo_size_kb=repo["size"],
created_at=repo["created_at"],
updated_at=repo["updated_at"],
)
)
return data
my_function = StreamingFunction(
run=fn
)
This code fetches the stargazer's repositories from the GitHub API and maps each repository to a StargazerProjectInfo
record.
Moose will store each StargazerProjectInfo
record in the returned array as its own row in the StargazerProjectInfo
table.
The run()
functionfn()
function returns null
None
for deleted
stars, so Moose ignores them since we don't need to process these events.
Test the Streaming Function
Trigger a New Event
Star your repository again to generate a new RawStarEvent
.
Verify the Transformation
In your terminal, confirm that the event was processed:
Received RawStarEvent_0_0 -> StargazerProjectInfo_0_0 1 message(s)
Check the Data
Query the StargazerProjectInfo
table to see the enriched data:
SELECT * FROM local.StargazerProjectInfo_0_0
You have now created a real-time data processing service using a Streaming Function to enrich GitHub star events with additional repository information.