Batch Load Historical Star Data
Viewing typescript
switch to python
In this section, we'll load historical star data from GitHub's API to complement our real-time star events. This will give us a complete picture of who has starred our repository over time.
Create Data Models and Functions
First, we need to create a data model for historical star data and a streaming function to process it.
Create HistoricalStargazer Data Model
Create a new file in your datamodels directory:
import { Key, DataModelConfig, IngestionConfig, IngestionFormat } from "@514labs/moose-lib";
// Configure the data model to accept JSON arrays for batch ingestion
export const HistoricalStargazerConfig: DataModelConfig<HistoricalStargazer> = {
ingestion: {
format: IngestionFormat.JSON_ARRAY, // Enables batch processing of records
},
};
export interface HistoricalStargazer {
starred_at: Key<Date>;
login: string;
avatar_url: string;
repos_url: string;
}
from dataclasses import dataclass
from datetime import datetime
from moose_lib import Key, moose_data_model, DataModelConfig, IngestionConfig, IngestionFormat
# Configuration for batch loading stargazer data
# IngestionFormat.JSON_ARRAY enables the ingestion endpoint to accept arrays of records
# This is more efficient than sending individual records when batch loading
batch_load_config = DataModelConfig(
ingestion=IngestionConfig(
format=IngestionFormat.JSON_ARRAY,
)
)
@moose_data_model(batch_load_config)
@dataclass
class HistoricalStargazer:
starred_at: datetime
login: Key[str]
avatar_url: str
repos_url: str
Create Streaming Function
This section assumes you've already created the StargazerProjectInfo
data model from the previous section. If you haven't done this yet, please refer back to the Process Real-Time Events section to create it first.
Create a streaming function to transform historical stargazer data into StargazerProjectInfo
records:
import { HistoricalStargazer } from "datamodels/HistoricalStargazer";
import { StargazerProjectInfo } from "datamodels/StargazerProjectInfo";
export default async function run(
source: HistoricalStargazer
): Promise<StargazerProjectInfo[] | null> {
const repositories = await callGitHubAPI(source.repos_url);
const stargazerProjects = repositories.map((repo: any) => ({
starred_at: new Date(source.starred_at),
stargazerName: source.login,
repoName: repo.name,
repoFullName: repo.full_name,
description: repo.description,
repoUrl: repo.html_url,
repoStars: repo.stargazers_count,
repoWatchers: repo.watchers_count,
language: repo.language || "Multiple Languages",
repoSizeKb: repo.size,
createdAt: new Date(repo.created_at),
updatedAt: new Date(repo.updated_at),
}));
return stargazerProjects;
}
async function callGitHubAPI(url: string): Promise<any> {
const response = await fetch(url);
return response.json();
}
from app.datamodels.HistoricalStargazer import HistoricalStargazer
from app.datamodels.StargazerProjectInfo import StargazerProjectInfo
from moose_lib import StreamingFunction
from typing import Optional
import requests
def call_github_api(url: str) -> dict:
response = requests.get(url)
response.raise_for_status()
return response.json()
def fn(source: HistoricalStargazer) -> Optional[list[StargazerProjectInfo]]:
repositories = call_github_api(source.repos_url)
data = []
for repo in repositories:
data.append(
StargazerProjectInfo(
starred_at=source.starred_at,
stargazer_login=source.login,
repo_name=repo["name"],
repo_full_name=repo["full_name"],
description=repo["description"],
repo_url=repo["html_url"],
repo_stars=repo["stargazers_count"],
repo_watchers=repo["watchers_count"],
language=repo["language"] or "Multiple Languages",
repo_size_kb=repo["size"],
created_at=repo["created_at"],
updated_at=repo["updated_at"],
)
)
return data
my_function = StreamingFunction(
run=fn
)
Create the Ingest Script
Create a new file for the ingest script. We recommend creating a new workflows
directory in your app
folder to host your ingest scripts.
mkdir -p app/workflows
touch app/workflows/ingest_stargazers.ts
mkdir -p app/workflows
touch app/workflows/ingest_stargazers.py
Add the following code to your new file:
// Coming soon! For now, please use the Python version
import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
from pathlib import Path
from typing import Optional, Dict, List
class StargazerIngester:
def __init__(self, owner: str, repo: str, token: str, base_url: str):
self.owner = owner
self.repo = repo
self.token = token
self.base_url = base_url
self.headers = {
'Accept': 'application/vnd.github.v3.star+json',
'Authorization': f'token {token}'
}
def fetch_stargazers(self, before_date: Optional[datetime] = None) -> List[Dict]:
"""Fetch all stargazers before a given date"""
stargazers = []
page = 1
per_page = 100
while True:
url = f'https://api.github.com/repos/{self.owner}/{self.repo}/stargazers'
params = {'per_page': per_page, 'page': page}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code != 200:
print(f'Error fetching stargazers: {response.status_code} {response.reason}')
break
data = response.json()
if not data:
break
# Filter by date if specified
if before_date:
data = [
star for star in data
if datetime.strptime(star["starred_at"], "%Y-%m-%dT%H:%M:%SZ") < before_date
]
if not data: # Stop if we've passed our date threshold
break
stargazers.extend(data)
page += 1
return stargazers
def ingest_stargazers(self, stargazers: List[Dict]) -> int:
"""Ingest stargazers in batch into the HistoricalStargazer table"""
# Transform the stargazers into the expected format
stargazer_data = [
{
"starred_at": stargazer["starred_at"],
"login": stargazer["user"]["login"],
"avatar_url": stargazer["user"]["avatar_url"],
"repos_url": stargazer["user"]["repos_url"]
}
for stargazer in stargazers
]
# Send the entire batch in a single request
response = requests.post(
f"{self.base_url}/ingest/HistoricalStargazer",
json=stargazer_data
)
if response.ok:
print(f"Ingested {len(stargazers)} stargazers")
else:
print(f"Failed to ingest {len(stargazers)} stargazers: {response.status_code}")
return len(stargazers)
def main():
# Setup
load_dotenv()
# Configuration
token = os.getenv('GITHUB_ACCESS_TOKEN')
owner = os.getenv('GITHUB_OWNER', '514-labs')
repo = os.getenv('GITHUB_REPO', 'moose')
base_url = os.getenv('MOOSE_API_HOST', 'http://localhost:4000')
if not token:
raise ValueError("Please set the GITHUB_ACCESS_TOKEN environment variable.")
ingester = StargazerIngester(owner, repo, token, base_url)
# Set cutoff date (default to yesterday evening)
cutoff_date = datetime.utcnow().replace(hour=18, minute=0, second=0, microsecond=0) - timedelta(days=1)
# Fetch and ingest
stargazers = ingester.fetch_stargazers(before_date=cutoff_date)
total_ingested = ingester.ingest_stargazers(stargazers)
print(f"\nCompleted: Ingested {total_ingested} stargazers")
if __name__ == '__main__':
main()
For Python users, you'll need to install the python-dotenv
package:
pip install python-dotenv
This script:
- Fetches historical stargazer data from GitHub's API using pagination
- Processes all stargazers into a batch
- Sends the entire batch to your Moose application in a single request
- Includes error handling and progress reporting
- Uses environment variables for configuration
Configure Environment Variables
Create a .env
file in your project root with your GitHub credentials:
GITHUB_ACCESS_TOKEN=your_github_token
GITHUB_OWNER=your_github_repo_owner
GITHUB_REPO=your_github_repo
MOOSE_API_HOST=http://localhost:4000
Make sure to replace your_github_token
with a valid GitHub Personal Access Token (opens in a new tab). Keep your token secure and never commit it to version control. We recommend storing it in your .env
file which should be listed in your .gitignore
.
Run the Batch Load Script
The ingest script will:
- Fetch all historical stargazers from GitHub's API
- Send all stargazer records to your Moose application's
HistoricalStargazer
ingestion endpoint as a single batch - The streaming function will automatically process each record and populate the
StargazerProjectInfo
table
Run the script:
ts-node app/workflows/ingest_stargazers.ts
python app/workflows/ingest_stargazers.py
You should see output like:
Completed: Ingested 36 stargazers
Verify the Data
Query your StargazerProjectInfo
table to see the historical data:
SELECT
stargazer_login,
starred_at,
COUNT(DISTINCT repo_name) as num_repos
FROM local.StargazerProjectInfo_0_0
GROUP BY stargazer_login, starred_at
ORDER BY starred_at DESC
LIMIT 5;
You now have a complete dataset of both historical and real-time star events! This data will be used in the next sections to analyze trends and patterns in your repository's stargazers.