Merge pull request #847 from agorevski/algore/cicd-descript-release-matrix

perf(ci): reduce GitHub Actions costs ~60-65% across all workflows
This commit is contained in:
Alex Gorevski 2026-02-19 06:54:40 -08:00 committed by GitHub
commit fedfd6ae01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 271 additions and 64 deletions

View file

@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""Fetch GitHub Actions workflow runs for a given date and summarize costs.
Usage:
python fetch_actions_data.py [OPTIONS]
Options:
--date YYYY-MM-DD Date to query (default: yesterday)
--mode brief|full Output mode (default: full)
brief: billable minutes/hours table only
full: detailed breakdown with per-run list
--repo OWNER/NAME Repository (default: zeroclaw-labs/zeroclaw)
-h, --help Show this help message
"""
import argparse
import json
import subprocess
from datetime import datetime, timedelta, timezone
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Fetch GitHub Actions workflow runs and summarize costs.",
)
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
parser.add_argument(
"--date",
default=yesterday,
help="Date to query in YYYY-MM-DD format (default: yesterday)",
)
parser.add_argument(
"--mode",
choices=["brief", "full"],
default="full",
help="Output mode: 'brief' for billable hours only, 'full' for detailed breakdown (default: full)",
)
parser.add_argument(
"--repo",
default="zeroclaw-labs/zeroclaw",
help="Repository in OWNER/NAME format (default: zeroclaw-labs/zeroclaw)",
)
return parser.parse_args()
def fetch_runs(repo, date_str, page=1, per_page=100):
"""Fetch completed workflow runs for a given date."""
url = (
f"https://api.github.com/repos/{repo}/actions/runs"
f"?created={date_str}&per_page={per_page}&page={page}"
)
result = subprocess.run(
["curl", "-sS", "-H", "Accept: application/vnd.github+json", url],
capture_output=True, text=True
)
return json.loads(result.stdout)
def fetch_jobs(repo, run_id):
"""Fetch jobs for a specific run."""
url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}/jobs?per_page=100"
result = subprocess.run(
["curl", "-sS", "-H", "Accept: application/vnd.github+json", url],
capture_output=True, text=True
)
return json.loads(result.stdout)
def parse_duration(started, completed):
"""Return duration in seconds between two ISO timestamps."""
if not started or not completed:
return 0
try:
s = datetime.fromisoformat(started.replace("Z", "+00:00"))
c = datetime.fromisoformat(completed.replace("Z", "+00:00"))
return max(0, (c - s).total_seconds())
except Exception:
return 0
def main():
args = parse_args()
repo = args.repo
date_str = args.date
brief = args.mode == "brief"
print(f"Fetching workflow runs for {repo} on {date_str}...")
print("=" * 100)
all_runs = []
for page in range(1, 5): # up to 400 runs
data = fetch_runs(repo, date_str, page=page)
runs = data.get("workflow_runs", [])
if not runs:
break
all_runs.extend(runs)
if len(runs) < 100:
break
print(f"Total workflow runs found: {len(all_runs)}")
print()
# Group by workflow name
workflow_stats = {}
for run in all_runs:
name = run.get("name", "Unknown")
event = run.get("event", "unknown")
conclusion = run.get("conclusion", "unknown")
run_id = run.get("id")
if name not in workflow_stats:
workflow_stats[name] = {
"count": 0,
"events": {},
"conclusions": {},
"total_job_seconds": 0,
"total_jobs": 0,
"run_ids": [],
}
workflow_stats[name]["count"] += 1
workflow_stats[name]["events"][event] = workflow_stats[name]["events"].get(event, 0) + 1
workflow_stats[name]["conclusions"][conclusion] = workflow_stats[name]["conclusions"].get(conclusion, 0) + 1
workflow_stats[name]["run_ids"].append(run_id)
# For each workflow, sample up to 3 runs to get job-level timing
print("Sampling job-level timing (up to 3 runs per workflow)...")
print()
for name, stats in workflow_stats.items():
sample_ids = stats["run_ids"][:3]
for run_id in sample_ids:
jobs_data = fetch_jobs(repo, run_id)
jobs = jobs_data.get("jobs", [])
for job in jobs:
started = job.get("started_at")
completed = job.get("completed_at")
duration = parse_duration(started, completed)
stats["total_job_seconds"] += duration
stats["total_jobs"] += 1
# Extrapolate: if we sampled N runs but there are M total, scale up
sampled = len(sample_ids)
total = stats["count"]
if sampled > 0 and sampled < total:
scale = total / sampled
stats["estimated_total_seconds"] = stats["total_job_seconds"] * scale
else:
stats["estimated_total_seconds"] = stats["total_job_seconds"]
# Print summary sorted by estimated cost (descending)
sorted_workflows = sorted(
workflow_stats.items(),
key=lambda x: x[1]["estimated_total_seconds"],
reverse=True
)
if brief:
# Brief mode: compact billable hours table
print(f"{'Workflow':<40} {'Runs':>5} {'Est.Mins':>9} {'Est.Hours':>10}")
print("-" * 68)
grand_total_minutes = 0
for name, stats in sorted_workflows:
est_mins = stats["estimated_total_seconds"] / 60
grand_total_minutes += est_mins
print(f"{name:<40} {stats['count']:>5} {est_mins:>9.1f} {est_mins/60:>10.2f}")
print("-" * 68)
print(f"{'TOTAL':<40} {len(all_runs):>5} {grand_total_minutes:>9.0f} {grand_total_minutes/60:>10.1f}")
print(f"\nProjected monthly: ~{grand_total_minutes/60*30:.0f} hours")
else:
# Full mode: detailed breakdown with per-run list
print("=" * 100)
print(f"{'Workflow':<40} {'Runs':>5} {'SampledJobs':>12} {'SampledMins':>12} {'Est.TotalMins':>14} {'Events'}")
print("-" * 100)
grand_total_minutes = 0
for name, stats in sorted_workflows:
sampled_mins = stats["total_job_seconds"] / 60
est_total_mins = stats["estimated_total_seconds"] / 60
grand_total_minutes += est_total_mins
events_str = ", ".join(f"{k}={v}" for k, v in stats["events"].items())
conclusions_str = ", ".join(f"{k}={v}" for k, v in stats["conclusions"].items())
print(
f"{name:<40} {stats['count']:>5} {stats['total_jobs']:>12} "
f"{sampled_mins:>12.1f} {est_total_mins:>14.1f} {events_str}"
)
print(f"{'':>40} {'':>5} {'':>12} {'':>12} {'':>14} outcomes: {conclusions_str}")
print("-" * 100)
print(f"{'GRAND TOTAL':>40} {len(all_runs):>5} {'':>12} {'':>12} {grand_total_minutes:>14.1f}")
print(f"\nEstimated total billable minutes on {date_str}: {grand_total_minutes:.0f} min ({grand_total_minutes/60:.1f} hours)")
print()
# Also show raw run list
print("\n" + "=" * 100)
print("DETAILED RUN LIST")
print("=" * 100)
for run in all_runs:
name = run.get("name", "Unknown")
event = run.get("event", "unknown")
conclusion = run.get("conclusion", "unknown")
run_id = run.get("id")
started = run.get("run_started_at", "?")
print(f" [{run_id}] {name:<40} conclusion={conclusion:<12} event={event:<20} started={started}")
if __name__ == "__main__":
main()