209 lines
7.6 KiB
Python
209 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Fetch GitHub Actions workflow runs for a given date and summarize costs.
|
|
|
|
Usage:
|
|
python fetch_actions_data.py [OPTIONS]
|
|
|
|
Options:
|
|
--date YYYY-MM-DD Date to query (default: yesterday)
|
|
--mode brief|full Output mode (default: full)
|
|
brief: billable minutes/hours table only
|
|
full: detailed breakdown with per-run list
|
|
--repo OWNER/NAME Repository (default: zeroclaw-labs/zeroclaw)
|
|
-h, --help Show this help message
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import subprocess
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Fetch GitHub Actions workflow runs and summarize costs.",
|
|
)
|
|
yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
parser.add_argument(
|
|
"--date",
|
|
default=yesterday,
|
|
help="Date to query in YYYY-MM-DD format (default: yesterday)",
|
|
)
|
|
parser.add_argument(
|
|
"--mode",
|
|
choices=["brief", "full"],
|
|
default="full",
|
|
help="Output mode: 'brief' for billable hours only, 'full' for detailed breakdown (default: full)",
|
|
)
|
|
parser.add_argument(
|
|
"--repo",
|
|
default="zeroclaw-labs/zeroclaw",
|
|
help="Repository in OWNER/NAME format (default: zeroclaw-labs/zeroclaw)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def fetch_runs(repo, date_str, page=1, per_page=100):
|
|
"""Fetch completed workflow runs for a given date."""
|
|
url = (
|
|
f"https://api.github.com/repos/{repo}/actions/runs"
|
|
f"?created={date_str}&per_page={per_page}&page={page}"
|
|
)
|
|
result = subprocess.run(
|
|
["curl", "-sS", "-H", "Accept: application/vnd.github+json", url],
|
|
capture_output=True, text=True
|
|
)
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def fetch_jobs(repo, run_id):
|
|
"""Fetch jobs for a specific run."""
|
|
url = f"https://api.github.com/repos/{repo}/actions/runs/{run_id}/jobs?per_page=100"
|
|
result = subprocess.run(
|
|
["curl", "-sS", "-H", "Accept: application/vnd.github+json", url],
|
|
capture_output=True, text=True
|
|
)
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def parse_duration(started, completed):
|
|
"""Return duration in seconds between two ISO timestamps."""
|
|
if not started or not completed:
|
|
return 0
|
|
try:
|
|
s = datetime.fromisoformat(started.replace("Z", "+00:00"))
|
|
c = datetime.fromisoformat(completed.replace("Z", "+00:00"))
|
|
return max(0, (c - s).total_seconds())
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
repo = args.repo
|
|
date_str = args.date
|
|
brief = args.mode == "brief"
|
|
|
|
print(f"Fetching workflow runs for {repo} on {date_str}...")
|
|
print("=" * 100)
|
|
|
|
all_runs = []
|
|
for page in range(1, 5): # up to 400 runs
|
|
data = fetch_runs(repo, date_str, page=page)
|
|
runs = data.get("workflow_runs", [])
|
|
if not runs:
|
|
break
|
|
all_runs.extend(runs)
|
|
if len(runs) < 100:
|
|
break
|
|
|
|
print(f"Total workflow runs found: {len(all_runs)}")
|
|
print()
|
|
|
|
# Group by workflow name
|
|
workflow_stats = {}
|
|
for run in all_runs:
|
|
name = run.get("name", "Unknown")
|
|
event = run.get("event", "unknown")
|
|
conclusion = run.get("conclusion", "unknown")
|
|
run_id = run.get("id")
|
|
|
|
if name not in workflow_stats:
|
|
workflow_stats[name] = {
|
|
"count": 0,
|
|
"events": {},
|
|
"conclusions": {},
|
|
"total_job_seconds": 0,
|
|
"total_jobs": 0,
|
|
"run_ids": [],
|
|
}
|
|
|
|
workflow_stats[name]["count"] += 1
|
|
workflow_stats[name]["events"][event] = workflow_stats[name]["events"].get(event, 0) + 1
|
|
workflow_stats[name]["conclusions"][conclusion] = workflow_stats[name]["conclusions"].get(conclusion, 0) + 1
|
|
workflow_stats[name]["run_ids"].append(run_id)
|
|
|
|
# For each workflow, sample up to 3 runs to get job-level timing
|
|
print("Sampling job-level timing (up to 3 runs per workflow)...")
|
|
print()
|
|
|
|
for name, stats in workflow_stats.items():
|
|
sample_ids = stats["run_ids"][:3]
|
|
for run_id in sample_ids:
|
|
jobs_data = fetch_jobs(repo, run_id)
|
|
jobs = jobs_data.get("jobs", [])
|
|
for job in jobs:
|
|
started = job.get("started_at")
|
|
completed = job.get("completed_at")
|
|
duration = parse_duration(started, completed)
|
|
stats["total_job_seconds"] += duration
|
|
stats["total_jobs"] += 1
|
|
|
|
# Extrapolate: if we sampled N runs but there are M total, scale up
|
|
sampled = len(sample_ids)
|
|
total = stats["count"]
|
|
if sampled > 0 and sampled < total:
|
|
scale = total / sampled
|
|
stats["estimated_total_seconds"] = stats["total_job_seconds"] * scale
|
|
else:
|
|
stats["estimated_total_seconds"] = stats["total_job_seconds"]
|
|
|
|
# Print summary sorted by estimated cost (descending)
|
|
sorted_workflows = sorted(
|
|
workflow_stats.items(),
|
|
key=lambda x: x[1]["estimated_total_seconds"],
|
|
reverse=True
|
|
)
|
|
|
|
if brief:
|
|
# Brief mode: compact billable hours table
|
|
print(f"{'Workflow':<40} {'Runs':>5} {'Est.Mins':>9} {'Est.Hours':>10}")
|
|
print("-" * 68)
|
|
grand_total_minutes = 0
|
|
for name, stats in sorted_workflows:
|
|
est_mins = stats["estimated_total_seconds"] / 60
|
|
grand_total_minutes += est_mins
|
|
print(f"{name:<40} {stats['count']:>5} {est_mins:>9.1f} {est_mins/60:>10.2f}")
|
|
print("-" * 68)
|
|
print(f"{'TOTAL':<40} {len(all_runs):>5} {grand_total_minutes:>9.0f} {grand_total_minutes/60:>10.1f}")
|
|
print(f"\nProjected monthly: ~{grand_total_minutes/60*30:.0f} hours")
|
|
else:
|
|
# Full mode: detailed breakdown with per-run list
|
|
print("=" * 100)
|
|
print(f"{'Workflow':<40} {'Runs':>5} {'SampledJobs':>12} {'SampledMins':>12} {'Est.TotalMins':>14} {'Events'}")
|
|
print("-" * 100)
|
|
|
|
grand_total_minutes = 0
|
|
for name, stats in sorted_workflows:
|
|
sampled_mins = stats["total_job_seconds"] / 60
|
|
est_total_mins = stats["estimated_total_seconds"] / 60
|
|
grand_total_minutes += est_total_mins
|
|
events_str = ", ".join(f"{k}={v}" for k, v in stats["events"].items())
|
|
conclusions_str = ", ".join(f"{k}={v}" for k, v in stats["conclusions"].items())
|
|
print(
|
|
f"{name:<40} {stats['count']:>5} {stats['total_jobs']:>12} "
|
|
f"{sampled_mins:>12.1f} {est_total_mins:>14.1f} {events_str}"
|
|
)
|
|
print(f"{'':>40} {'':>5} {'':>12} {'':>12} {'':>14} outcomes: {conclusions_str}")
|
|
|
|
print("-" * 100)
|
|
print(f"{'GRAND TOTAL':>40} {len(all_runs):>5} {'':>12} {'':>12} {grand_total_minutes:>14.1f}")
|
|
print(f"\nEstimated total billable minutes on {date_str}: {grand_total_minutes:.0f} min ({grand_total_minutes/60:.1f} hours)")
|
|
print()
|
|
|
|
# Also show raw run list
|
|
print("\n" + "=" * 100)
|
|
print("DETAILED RUN LIST")
|
|
print("=" * 100)
|
|
for run in all_runs:
|
|
name = run.get("name", "Unknown")
|
|
event = run.get("event", "unknown")
|
|
conclusion = run.get("conclusion", "unknown")
|
|
run_id = run.get("id")
|
|
started = run.get("run_started_at", "?")
|
|
print(f" [{run_id}] {name:<40} conclusion={conclusion:<12} event={event:<20} started={started}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|