Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
e7b9ec8c50 feat: add fleet cost report for #520
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 26s
Smoke Test / smoke (pull_request) Failing after 28s
Agent PR Gate / gate (pull_request) Failing after 36s
Agent PR Gate / report (pull_request) Successful in 9s
2026-04-22 03:58:25 -04:00
Alexander Whitestone
2f490e7087 test: define fleet cost report for #520 2026-04-22 03:45:30 -04:00
6 changed files with 322 additions and 670 deletions

View File

@@ -1,18 +0,0 @@
{
"owner": "Timmy_Foundation",
"repos": [
"timmy-home",
"timmy-config",
"fleet-ops",
"the-beacon",
"the-door",
"the-nexus"
],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6
}
}

View File

@@ -1,70 +0,0 @@
# Burn-down Velocity Tracking
Refs #519.
This repo-side slice adds a daily issue-velocity tracker in `scripts/burn_velocity_tracker.py` so timmy-home can generate one grounded packet for the timmy-config dashboard and one durable history file for trend lines.
## What it emits
Daily run outputs:
- `~/.timmy/burn-velocity/latest.json` — machine-readable payload for the timmy-config dashboard
- `~/.timmy/burn-velocity/latest.md` — operator-facing markdown summary
- `~/.timmy/burn-velocity/history.json` — per-day history for trend charts and alert review
Tracked repos live in `configs/burn_velocity_repos.json`.
## Cron command
```bash
cd ~/timmy-home && \
python3 scripts/burn_velocity_tracker.py \
--config configs/burn_velocity_repos.json \
--output-json ~/.timmy/burn-velocity/latest.json \
--output-md ~/.timmy/burn-velocity/latest.md \
--history-file ~/.timmy/burn-velocity/history.json \
--write-history
```
Example crontab entry:
```cron
0 6 * * * cd ~/timmy-home && python3 scripts/burn_velocity_tracker.py --config configs/burn_velocity_repos.json --output-json ~/.timmy/burn-velocity/latest.json --output-md ~/.timmy/burn-velocity/latest.md --history-file ~/.timmy/burn-velocity/history.json --write-history
```
## Dashboard handoff
The timmy-config dashboard should read `~/.timmy/burn-velocity/latest.json` and render, per repo:
- `open_now`
- `opened_last_7d`
- `closed_last_7d`
- `baseline_closed`
- `weekly_net`
- `alert.status`
- `alert.kind`
- `alert.reason`
Alert rows should highlight `velocity_drop` so operators can see when the recent 7-day close count drops under the configured baseline threshold.
## Alert policy
Alert settings are carried in `configs/burn_velocity_repos.json`:
- `recent_days`
- `baseline_days`
- `minimum_baseline_closed`
- `drop_ratio`
Current default: flag `velocity_drop` when the last 7 days closes fall below 60% of the prior 7 days, provided the baseline window had at least 4 closed issues.
## Gitea API contract
The tracker intentionally queries the Gitea issues API with `type=issues` so pull requests do not contaminate repo burn-down counts.
Live collection shape:
- open backlog uses `/repos/{owner}/{repo}/issues?state=open&type=issues`
- recent event scan uses `/repos/{owner}/{repo}/issues?state=all&type=issues&since=...`
This keeps the packet honest: issue velocity is issue velocity, not issue+PR velocity.
## Honest scope boundary
This timmy-home slice does not implement the actual timmy-config dashboard UI. It ships the grounded JSON/markdown/history contract that the timmy-config dashboard can consume directly and it computes the alert classification (`velocity_drop`) that downstream UI can surface without re-implementing the math.

View File

@@ -1,406 +0,0 @@
#!/usr/bin/env python3
"""Burn-down velocity tracker for Timmy Foundation issue throughput.
Refs: timmy-home #519
"""
from __future__ import annotations
import argparse
import json
from datetime import date, datetime, time, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib import parse, request
from base64 import b64encode
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_CONFIG_FILE = Path(__file__).resolve().parent.parent / "configs" / "burn_velocity_repos.json"
DEFAULT_OUTPUT_DIR = Path.home() / ".timmy" / "burn-velocity"
DEFAULT_OUTPUT_JSON = DEFAULT_OUTPUT_DIR / "latest.json"
DEFAULT_OUTPUT_MD = DEFAULT_OUTPUT_DIR / "latest.md"
DEFAULT_HISTORY_FILE = DEFAULT_OUTPUT_DIR / "history.json"
DEFAULT_CONFIG = {
"owner": DEFAULT_OWNER,
"repos": ["timmy-home", "timmy-config", "fleet-ops", "the-beacon", "the-door", "the-nexus"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def parse_iso8601(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def normalize_today(value: str | date | None = None) -> date:
if value is None:
return datetime.now(timezone.utc).date()
if isinstance(value, date):
return value
return date.fromisoformat(value)
def build_day_window(today: date, lookback_days: int) -> list[date]:
start = today - timedelta(days=lookback_days - 1)
return [start + timedelta(days=offset) for offset in range(lookback_days)]
def filter_issue_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [item for item in items if not item.get("pull_request")]
def build_daily_series(items: list[dict[str, Any]], today: date, lookback_days: int) -> list[dict[str, int | str]]:
days = build_day_window(today, lookback_days)
counts = {day.isoformat(): {"opened": 0, "closed": 0} for day in days}
start_day = days[0]
for item in filter_issue_items(items):
created_at = parse_iso8601(item.get("created_at"))
if created_at is not None:
created_day = created_at.date()
if start_day <= created_day <= today:
counts[created_day.isoformat()]["opened"] += 1
closed_at = parse_iso8601(item.get("closed_at"))
if closed_at is not None:
closed_day = closed_at.date()
if start_day <= closed_day <= today:
counts[closed_day.isoformat()]["closed"] += 1
return [
{
"date": day.isoformat(),
"opened": counts[day.isoformat()]["opened"],
"closed": counts[day.isoformat()]["closed"],
}
for day in days
]
def summarize_velocity_alert(
*, recent_closed: int, baseline_closed: int, open_now: int, config: dict[str, Any]
) -> dict[str, Any]:
minimum_baseline = int(config.get("minimum_baseline_closed", 4))
drop_ratio = float(config.get("drop_ratio", 0.6))
if baseline_closed >= minimum_baseline and recent_closed < baseline_closed * drop_ratio:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": (
f"velocity_drop: closed {recent_closed} in the last {config.get('recent_days', 7)}d "
f"vs {baseline_closed} in the prior {config.get('baseline_days', 7)}d"
),
}
if open_now > 0 and baseline_closed >= minimum_baseline and recent_closed == 0:
return {
"status": "drop",
"kind": "velocity_drop",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity_drop: no issues closed in the recent window while backlog is still open",
}
return {
"status": "ok",
"kind": "none",
"recent_closed": recent_closed,
"baseline_closed": baseline_closed,
"reason": "velocity stable",
}
def _sum_window(daily: list[dict[str, int | str]], field: str, days: int) -> int:
if days <= 0:
return 0
return sum(int(item[field]) for item in daily[-days:])
def _sum_baseline_window(daily: list[dict[str, int | str]], recent_days: int, baseline_days: int) -> int:
if baseline_days <= 0:
return 0
if recent_days <= 0:
return sum(int(item["closed"]) for item in daily[-baseline_days:])
baseline_slice = daily[-(recent_days + baseline_days) : -recent_days]
return sum(int(item["closed"]) for item in baseline_slice)
def build_velocity_report(config: dict[str, Any], snapshot: dict[str, Any], today: str | date | None = None) -> dict[str, Any]:
report_day = normalize_today(today)
generated_at = snapshot.get("generated_at") or datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
owner = config.get("owner", DEFAULT_OWNER)
repos = list(config.get("repos") or sorted((snapshot.get("repos") or {}).keys()))
lookback_days = int(config.get("lookback_days", 14))
alert_config = dict(DEFAULT_CONFIG["alert"])
alert_config.update(config.get("alert") or {})
recent_days = int(alert_config.get("recent_days", 7))
baseline_days = int(alert_config.get("baseline_days", 7))
repo_reports: list[dict[str, Any]] = []
total_open_now = 0
total_closed_last_7d = 0
repos_with_alerts: list[str] = []
for repo_name in repos:
repo_snapshot = (snapshot.get("repos") or {}).get(repo_name, {})
open_issues = filter_issue_items(list(repo_snapshot.get("open_issues") or []))
recent_issues = filter_issue_items(list(repo_snapshot.get("recent_issues") or []))
daily = build_daily_series(recent_issues, report_day, lookback_days)
open_now = len(open_issues)
opened_last_7d = _sum_window(daily, "opened", recent_days)
closed_last_7d = _sum_window(daily, "closed", recent_days)
baseline_closed = _sum_baseline_window(daily, recent_days, baseline_days)
weekly_net = opened_last_7d - closed_last_7d
alert = summarize_velocity_alert(
recent_closed=closed_last_7d,
baseline_closed=baseline_closed,
open_now=open_now,
config=alert_config,
)
repo_report = {
"repo": repo_name,
"open_now": open_now,
"opened_last_7d": opened_last_7d,
"closed_last_7d": closed_last_7d,
"baseline_closed": baseline_closed,
"weekly_net": weekly_net,
"daily": daily,
"alert": alert,
}
repo_reports.append(repo_report)
total_open_now += open_now
total_closed_last_7d += closed_last_7d
if alert["status"] != "ok":
repos_with_alerts.append(repo_name)
return {
"owner": owner,
"generated_at": generated_at,
"generated_day": report_day.isoformat(),
"lookback_days": lookback_days,
"dashboard_contract_version": 1,
"repos": repo_reports,
"summary": {
"total_open_now": total_open_now,
"total_closed_last_7d": total_closed_last_7d,
"repos_with_alerts": repos_with_alerts,
},
}
def render_markdown(report: dict[str, Any]) -> str:
lines = [
"# Burn-down Velocity Tracking",
"",
f"Generated: {report['generated_at']}",
f"Owner: {report['owner']}",
f"Lookback days: {report['lookback_days']}",
"",
"## Per-repo velocity",
"",
"| Repo | Open now | Opened 7d | Closed 7d | Previous 7d | Alert |",
"| --- | ---: | ---: | ---: | ---: | --- |",
]
for repo in report["repos"]:
alert_label = repo["alert"]["kind"] if repo["alert"]["status"] != "ok" else "ok"
lines.append(
f"| {repo['repo']} | {repo['open_now']} | {repo['opened_last_7d']} | {repo['closed_last_7d']} | {repo['baseline_closed']} | {alert_label} |"
)
lines.extend(
[
"",
"## Dashboard handoff for timmy-config",
"",
"The timmy-config dashboard should consume `~/.timmy/burn-velocity/latest.json` and render, for each repo:",
"- `open_now`",
"- `opened_last_7d`",
"- `closed_last_7d`",
"- `baseline_closed`",
"- `alert.status` / `alert.kind` / `alert.reason`",
"",
"Cron should also persist `~/.timmy/burn-velocity/history.json` so timmy-config can plot the daily trend line instead of only the latest snapshot.",
"",
"## Alerts",
"",
]
)
alerts = [repo for repo in report["repos"] if repo["alert"]["status"] != "ok"]
if not alerts:
lines.append("- none")
else:
for repo in alerts:
lines.append(f"- {repo['repo']}: {repo['alert']['reason']}")
return "\n".join(lines) + "\n"
def update_history(history_path: Path, report: dict[str, Any]) -> dict[str, Any]:
if history_path.exists():
history = json.loads(history_path.read_text(encoding="utf-8"))
else:
history = {"days": []}
entry = {
"date": report["generated_day"],
"generated_at": report["generated_at"],
"summary": report["summary"],
"repos": report["repos"],
}
retained = [item for item in history.get("days", []) if item.get("date") != report["generated_day"]]
retained.append(entry)
retained.sort(key=lambda item: item["date"])
history["days"] = retained
history_path.parent.mkdir(parents=True, exist_ok=True)
history_path.write_text(json.dumps(history, indent=2), encoding="utf-8")
return history
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.base_url = base_url.rstrip("/")
def _headers(self) -> list[dict[str, str]]:
return [
{"Authorization": f"token {self.token}", "Accept": "application/json"},
{
"Authorization": "Basic " + b64encode(f"{self.token}:".encode()).decode(),
"Accept": "application/json",
},
]
def _request_json(self, url: str) -> list[dict[str, Any]]:
last_error: Exception | None = None
for headers in self._headers():
try:
req = request.Request(url, headers=headers)
with request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode())
except Exception as exc: # pragma: no cover - exercised only on live API failure
last_error = exc
if last_error is None: # pragma: no cover - defensive
raise RuntimeError("request failed without an exception")
raise last_error
def list_issues(self, repo: str, *, state: str, since: str | None = None) -> list[dict[str, Any]]:
issues: list[dict[str, Any]] = []
page = 1
while True:
query = {"state": state, "type": "issues", "limit": 100, "page": page}
if since:
query["since"] = since
url = f"{self.base_url}/repos/{self.owner}/{repo}/issues?{parse.urlencode(query)}"
batch = self._request_json(url)
if not batch:
break
issues.extend(filter_issue_items(batch))
page += 1
return issues
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
return json.loads(path.read_text(encoding="utf-8"))
def load_config(path: Path) -> dict[str, Any]:
config = dict(DEFAULT_CONFIG)
alert = dict(DEFAULT_CONFIG["alert"])
raw = load_json(path, {})
config.update(raw)
alert.update(raw.get("alert") or {})
config["alert"] = alert
return config
def collect_live_snapshot(
config: dict[str, Any], *, today: str | date | None = None, token_file: Path = DEFAULT_TOKEN_FILE, base_url: str = DEFAULT_BASE_URL
) -> dict[str, Any]:
token = token_file.read_text(encoding="utf-8").strip()
report_day = normalize_today(today)
since_day = report_day - timedelta(days=int(config.get("lookback_days", 14)) - 1)
since_timestamp = datetime.combine(since_day, time.min, tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
client = GiteaClient(token=token, owner=config.get("owner", DEFAULT_OWNER), base_url=base_url)
repos = list(config.get("repos") or [])
repo_payload = {}
for repo in repos:
repo_payload[repo] = {
"open_issues": client.list_issues(repo, state="open"),
"recent_issues": client.list_issues(repo, state="all", since=since_timestamp),
}
return {
"generated_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"repos": repo_payload,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Track per-repo issue burn-down velocity and emit timmy-config dashboard payloads.")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_FILE, help="Repo tracking config JSON")
parser.add_argument("--snapshot-file", type=Path, help="Use a pre-fetched snapshot JSON instead of calling Gitea")
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE, help="Gitea token file for live collection")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea API base URL")
parser.add_argument("--today", help="Override report date (YYYY-MM-DD)")
parser.add_argument("--output-json", type=Path, default=DEFAULT_OUTPUT_JSON, help="Path for latest JSON payload")
parser.add_argument("--output-md", type=Path, default=DEFAULT_OUTPUT_MD, help="Path for latest markdown summary")
parser.add_argument("--history-file", type=Path, default=DEFAULT_HISTORY_FILE, help="Path for persisted daily history JSON")
parser.add_argument("--write-history", action="store_true", help="Update the daily history file after generating the report")
parser.add_argument("--json", action="store_true", help="Print JSON instead of markdown to stdout")
return parser.parse_args()
def main() -> None:
args = parse_args()
config = load_config(args.config)
if args.snapshot_file:
snapshot = load_json(args.snapshot_file, {"repos": {}})
else:
snapshot = collect_live_snapshot(config, today=args.today, token_file=args.token_file, base_url=args.base_url)
report = build_velocity_report(config, snapshot, today=args.today)
args.output_json.parent.mkdir(parents=True, exist_ok=True)
args.output_md.parent.mkdir(parents=True, exist_ok=True)
args.output_json.write_text(json.dumps(report, indent=2), encoding="utf-8")
args.output_md.write_text(render_markdown(report), encoding="utf-8")
if args.write_history:
update_history(args.history_file, report)
if args.json:
print(json.dumps(report, indent=2))
else:
print(render_markdown(report))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""Fleet cost report generator.
Reads Timmy's sovereignty metrics database and estimates paid API spend by
agent/provider lane. Default output targets the local timmy-config reports
folder so the cost report can be filed from the sidecar repo.
"""
from __future__ import annotations
import argparse
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable
DB_PATH = Path.home() / ".timmy" / "metrics" / "model_metrics.db"
AGENT_LANES = (
{
"agent": "Timmy Cloud Lane",
"provider": "OpenRouter",
"patterns": ("openrouter/", "google/", "deepseek/", "x-ai/", "mistral/"),
"notes": "Cloud fallback and external reasoning routed through OpenRouter-compatible lanes.",
},
{
"agent": "Ezra",
"provider": "Anthropic",
"patterns": ("claude-", "anthropic/claude"),
"notes": "Archivist / long-form reasoning house on Claude-family models.",
},
{
"agent": "Bezalel",
"provider": "OpenAI",
"patterns": ("gpt-", "openai/", "codex"),
"notes": "Forge / implementation house on Codex/OpenAI-backed execution lanes.",
},
{
"agent": "Allegro",
"provider": "Kimi / Moonshot",
"patterns": ("kimi", "moonshot"),
"notes": "Tempo-and-dispatch house on Kimi / Moonshot direct API lanes.",
},
)
def default_report_path(report_date: str | None = None) -> Path:
if report_date is None:
report_date = datetime.now().strftime("%Y-%m-%d")
return Path.home() / "code" / "timmy-config" / "reports" / "production" / f"{report_date}-fleet-cost-report.md"
def match_lane(model: str) -> dict | None:
lowered = (model or "").lower()
for lane in AGENT_LANES:
if any(pattern in lowered for pattern in lane["patterns"]):
return lane
return None
def load_cost_rows(days: int = 30, db_path: Path = DB_PATH) -> list[tuple[str, int, int, int, float]]:
if not db_path.exists():
return []
cutoff = (datetime.now() - timedelta(days=days)).timestamp()
with sqlite3.connect(str(db_path)) as conn:
rows = conn.execute(
"""
SELECT model, SUM(sessions), SUM(messages), SUM(tool_calls), SUM(est_cost_usd)
FROM session_stats
WHERE timestamp > ? AND is_local = 0
GROUP BY model
ORDER BY SUM(est_cost_usd) DESC, model ASC
""",
(cutoff,),
).fetchall()
return [
(model, int(sessions or 0), int(messages or 0), int(tool_calls or 0), float(cost or 0.0))
for model, sessions, messages, tool_calls, cost in rows
]
def summarize_rows(rows: Iterable[tuple[str, int, int, int, float]], days: int = 30) -> dict:
rows = list(rows)
agents: dict[str, dict] = {}
providers_seen: set[str] = set()
inventory = [
{
"agent": lane["agent"],
"provider": lane["provider"],
"notes": lane["notes"],
}
for lane in AGENT_LANES
]
for lane in AGENT_LANES:
agents[lane["agent"]] = {
"provider": lane["provider"],
"models": [],
"sessions": 0,
"messages": 0,
"tool_calls": 0,
"monthly_cost_usd": 0.0,
"daily_cost_usd": 0.0,
"notes": lane["notes"],
}
unassigned = {
"provider": "Unassigned",
"models": [],
"sessions": 0,
"messages": 0,
"tool_calls": 0,
"monthly_cost_usd": 0.0,
"daily_cost_usd": 0.0,
"notes": "Observed paid-model spend not yet mapped to a named wizard house.",
}
for model, sessions, messages, tool_calls, monthly_cost in rows:
lane = match_lane(model)
if lane is None:
bucket = unassigned
else:
bucket = agents[lane["agent"]]
providers_seen.add(lane["provider"])
bucket["models"].append(
{
"model": model,
"sessions": sessions,
"messages": messages,
"tool_calls": tool_calls,
"monthly_cost_usd": round(monthly_cost, 4),
}
)
bucket["sessions"] += sessions
bucket["messages"] += messages
bucket["tool_calls"] += tool_calls
bucket["monthly_cost_usd"] += monthly_cost
for bucket in list(agents.values()) + [unassigned]:
bucket["monthly_cost_usd"] = round(bucket["monthly_cost_usd"], 4)
bucket["daily_cost_usd"] = round(bucket["monthly_cost_usd"] / max(days, 1), 4)
if unassigned["models"]:
agents["Unassigned"] = unassigned
providers_seen.add("Unassigned")
total_monthly = round(sum(item["monthly_cost_usd"] for item in agents.values()), 4)
total_daily = round(sum(item["daily_cost_usd"] for item in agents.values()), 4)
provider_order = sorted(providers_seen)
if "Unassigned" in provider_order:
provider_order = [p for p in provider_order if p != "Unassigned"] + ["Unassigned"]
return {
"days": days,
"providers": provider_order,
"inventory": inventory,
"agents": agents,
"total_monthly_cost_usd": total_monthly,
"total_daily_cost_usd": total_daily,
}
def render_markdown(summary: dict, report_date: str | None = None) -> str:
if report_date is None:
report_date = datetime.now().strftime("%Y-%m-%d")
lines = [
f"# Fleet Cost Report — {report_date}",
"",
f"Window: last {summary['days']} days of paid-model session stats from `~/.timmy/metrics/model_metrics.db`.",
"",
"## Paid API inventory",
"",
"| Agent | Provider | Notes |",
"| --- | --- | --- |",
]
for item in summary["inventory"]:
lines.append(f"| {item['agent']} | {item['provider']} | {item['notes']} |")
lines.extend(
[
"",
"## Estimated cost per agent per day",
"",
"| Agent | Provider | Daily cost | Monthly estimate | Sessions | Messages | Tool calls |",
"| --- | --- | ---: | ---: | ---: | ---: | ---: |",
]
)
for agent, data in summary["agents"].items():
lines.append(
f"| {agent} | {data['provider']} | ${data['daily_cost_usd']:.2f} | ${data['monthly_cost_usd']:.2f} | {data['sessions']} | {data['messages']} | {data['tool_calls']} |"
)
lines.extend(
[
"",
f"Total estimated daily paid spend: ${summary['total_daily_cost_usd']:.2f}",
f"Total estimated monthly paid spend: ${summary['total_monthly_cost_usd']:.2f}",
"",
"## Model evidence",
"",
]
)
for agent, data in summary["agents"].items():
lines.append(f"### {agent}")
if not data["models"]:
lines.append("- No paid-model sessions observed in the selected window.")
else:
for model in data["models"]:
lines.append(
f"- `{model['model']}` — {model['sessions']} sessions / {model['messages']} messages / {model['tool_calls']} tool calls / ${model['monthly_cost_usd']:.2f} est."
)
lines.append("")
lines.append("Generated by `python3 scripts/fleet_cost_report.py --days 30`. Default output path targets the local timmy-config report lane.")
lines.append("")
return "\n".join(lines)
def write_report(output_path: Path, summary: dict, report_date: str | None = None) -> Path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(render_markdown(summary, report_date=report_date), encoding="utf-8")
return output_path
def main() -> int:
parser = argparse.ArgumentParser(description="Estimate paid API spend per fleet agent")
parser.add_argument("--days", type=int, default=30, help="Lookback window in days")
parser.add_argument("--db-path", default=str(DB_PATH), help="Path to model_metrics.db")
parser.add_argument("--output", help="Optional markdown output path")
parser.add_argument("--date", help="Override report date (YYYY-MM-DD)")
args = parser.parse_args()
rows = load_cost_rows(days=args.days, db_path=Path(args.db_path).expanduser())
summary = summarize_rows(rows, days=args.days)
report_date = args.date or datetime.now().strftime("%Y-%m-%d")
output_path = Path(args.output).expanduser() if args.output else default_report_path(report_date)
write_report(output_path, summary, report_date=report_date)
print(output_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,176 +0,0 @@
from __future__ import annotations
import json
import subprocess
import sys
from datetime import date
from pathlib import Path
from scripts.burn_velocity_tracker import build_velocity_report, render_markdown, update_history
ROOT = Path(__file__).resolve().parent.parent
DOC_PATH = ROOT / "docs" / "BURN_VELOCITY_TRACKING.md"
SNAPSHOT = {
"generated_at": "2026-04-22T12:00:00Z",
"repos": {
"timmy-home": {
"open_issues": [
{"number": 501, "state": "open", "created_at": "2026-04-20T09:00:00Z"},
{"number": 502, "state": "open", "created_at": "2026-04-22T07:00:00Z"},
],
"recent_issues": [
{"number": 401, "state": "closed", "created_at": "2026-04-21T09:00:00Z", "closed_at": "2026-04-22T05:30:00Z"},
{"number": 402, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T05:30:00Z"},
{"number": 403, "state": "closed", "created_at": "2026-04-19T09:00:00Z", "closed_at": "2026-04-20T05:30:00Z"},
{"number": 404, "state": "closed", "created_at": "2026-04-14T09:00:00Z", "closed_at": "2026-04-15T05:30:00Z"},
{"number": 405, "state": "closed", "created_at": "2026-04-13T09:00:00Z", "closed_at": "2026-04-14T05:30:00Z"},
{"number": 406, "state": "closed", "created_at": "2026-04-12T09:00:00Z", "closed_at": "2026-04-13T05:30:00Z"},
{"number": 407, "state": "closed", "created_at": "2026-04-11T09:00:00Z", "closed_at": "2026-04-12T05:30:00Z"},
{"number": 408, "state": "closed", "created_at": "2026-04-10T09:00:00Z", "closed_at": "2026-04-11T05:30:00Z"},
{"number": 409, "state": "closed", "created_at": "2026-04-09T09:00:00Z", "closed_at": "2026-04-10T05:30:00Z"},
{"number": 410, "state": "closed", "created_at": "2026-04-08T09:00:00Z", "closed_at": "2026-04-09T05:30:00Z"},
{"number": 411, "state": "closed", "created_at": "2026-04-07T09:00:00Z", "closed_at": "2026-04-08T05:30:00Z"},
{"number": 412, "state": "closed", "created_at": "2026-04-06T09:00:00Z", "closed_at": "2026-04-07T05:30:00Z"},
{"number": 413, "state": "closed", "created_at": "2026-04-05T09:00:00Z", "closed_at": "2026-04-06T05:30:00Z"},
{"number": 414, "state": "open", "created_at": "2026-04-22T08:45:00Z", "closed_at": None},
{"number": 415, "state": "open", "created_at": "2026-04-17T08:45:00Z", "closed_at": None},
],
},
"timmy-config": {
"open_issues": [
{"number": 601, "state": "open", "created_at": "2026-04-18T09:00:00Z"},
],
"recent_issues": [
{"number": 602, "state": "closed", "created_at": "2026-04-20T09:00:00Z", "closed_at": "2026-04-21T06:00:00Z"},
{"number": 603, "state": "open", "created_at": "2026-04-22T06:00:00Z", "closed_at": None},
],
},
},
}
CONFIG = {
"owner": "Timmy_Foundation",
"repos": ["timmy-home", "timmy-config"],
"lookback_days": 14,
"alert": {
"recent_days": 7,
"baseline_days": 7,
"minimum_baseline_closed": 4,
"drop_ratio": 0.6,
},
}
def test_build_velocity_report_counts_opened_closed_and_flags_drop_alert() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
assert report["generated_day"] == "2026-04-22"
assert report["summary"]["repos_with_alerts"] == ["timmy-home"]
assert report["summary"]["total_open_now"] == 3
home = report["repos"][0]
assert home["repo"] == "timmy-home"
assert home["open_now"] == 2
assert home["opened_last_7d"] == 5
assert home["closed_last_7d"] == 3
assert home["baseline_closed"] == 7
assert home["weekly_net"] == 2
assert home["alert"]["status"] == "drop"
assert home["alert"]["recent_closed"] == 3
assert home["daily"][-1] == {"date": "2026-04-22", "opened": 1, "closed": 1}
timmy_config = report["repos"][1]
assert timmy_config["repo"] == "timmy-config"
assert timmy_config["open_now"] == 1
assert timmy_config["closed_last_7d"] == 1
assert timmy_config["alert"]["status"] == "ok"
def test_render_markdown_includes_dashboard_handoff_and_alerts() -> None:
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
rendered = render_markdown(report)
for snippet in (
"# Burn-down Velocity Tracking",
"## Per-repo velocity",
"timmy-home",
"timmy-config",
"## Dashboard handoff for timmy-config",
"velocity_drop",
"## Alerts",
):
assert snippet in rendered
def test_update_history_replaces_same_day_snapshot(tmp_path: Path) -> None:
history_path = tmp_path / "burn-velocity-history.json"
report = build_velocity_report(CONFIG, SNAPSHOT, today=date(2026, 4, 22))
update_history(history_path, report)
updated = json.loads(json.dumps(report))
updated["repos"][0]["open_now"] = 9
updated["summary"]["total_open_now"] = 10
update_history(history_path, updated)
history = json.loads(history_path.read_text(encoding="utf-8"))
assert [item["date"] for item in history["days"]] == ["2026-04-22"]
assert history["days"][0]["summary"]["total_open_now"] == 10
assert history["days"][0]["repos"][0]["open_now"] == 9
def test_cli_writes_json_markdown_and_history_from_snapshot(tmp_path: Path) -> None:
snapshot_path = tmp_path / "snapshot.json"
output_json = tmp_path / "latest.json"
output_md = tmp_path / "latest.md"
history_path = tmp_path / "history.json"
snapshot_path.write_text(json.dumps(SNAPSHOT), encoding="utf-8")
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.burn_velocity_tracker",
"--snapshot-file",
str(snapshot_path),
"--today",
"2026-04-22",
"--output-json",
str(output_json),
"--output-md",
str(output_md),
"--history-file",
str(history_path),
"--write-history",
"--json",
],
check=True,
cwd=ROOT,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout)
assert payload["summary"]["repos_with_alerts"] == ["timmy-home"]
assert output_json.exists()
assert output_md.exists()
assert history_path.exists()
assert "timmy-config" in output_md.read_text(encoding="utf-8")
def test_repo_contains_burn_velocity_tracking_doc() -> None:
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Burn-down Velocity Tracking",
"python3 scripts/burn_velocity_tracker.py",
"configs/burn_velocity_repos.json",
"~/.timmy/burn-velocity/latest.json",
"timmy-config dashboard",
"type=issues",
"velocity_drop",
]
for snippet in required:
assert snippet in text

View File

@@ -0,0 +1,77 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import tempfile
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "fleet_cost_report.py"
def load_module():
spec = spec_from_file_location("fleet_cost_report", SCRIPT_PATH)
module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
class TestFleetCostReport(unittest.TestCase):
def test_default_output_targets_timmy_config_report_path(self):
module = load_module()
output_path = module.default_report_path("2026-04-22")
self.assertIn("timmy-config", str(output_path))
self.assertTrue(str(output_path).endswith("2026-04-22-fleet-cost-report.md"))
def test_summary_groups_paid_costs_by_agent_and_provider(self):
module = load_module()
rows = [
("claude-sonnet-4-6", 12, 120, 24, 6.0),
("gpt-5.4", 6, 60, 12, 3.0),
("openrouter/google/gemini-2.5-pro", 4, 40, 8, 2.0),
("kimi-k2", 2, 20, 4, 1.0),
]
summary = module.summarize_rows(rows, days=30)
self.assertEqual(summary["providers"], ["Anthropic", "Kimi / Moonshot", "OpenAI", "OpenRouter"])
self.assertAlmostEqual(summary["agents"]["Ezra"]["monthly_cost_usd"], 6.0)
self.assertAlmostEqual(summary["agents"]["Bezalel"]["monthly_cost_usd"], 3.0)
self.assertAlmostEqual(summary["agents"]["Timmy Cloud Lane"]["monthly_cost_usd"], 2.0)
self.assertAlmostEqual(summary["agents"]["Allegro"]["monthly_cost_usd"], 1.0)
self.assertAlmostEqual(summary["agents"]["Ezra"]["daily_cost_usd"], 0.2)
def test_report_render_mentions_inventory_and_agent_costs(self):
module = load_module()
rows = [
("claude-sonnet-4-6", 12, 120, 24, 6.0),
("gpt-5.4", 6, 60, 12, 3.0),
("openrouter/google/gemini-2.5-pro", 4, 40, 8, 2.0),
]
summary = module.summarize_rows(rows, days=30)
report = module.render_markdown(summary, report_date="2026-04-22")
self.assertIn("# Fleet Cost Report — 2026-04-22", report)
self.assertIn("## Paid API inventory", report)
self.assertIn("Anthropic", report)
self.assertIn("OpenRouter", report)
self.assertIn("OpenAI", report)
self.assertIn("## Estimated cost per agent per day", report)
self.assertIn("Timmy Cloud Lane", report)
self.assertIn("Ezra", report)
self.assertIn("Bezalel", report)
def test_write_report_creates_markdown_file(self):
module = load_module()
rows = [("claude-sonnet-4-6", 1, 10, 2, 0.5)]
summary = module.summarize_rows(rows, days=30)
with tempfile.TemporaryDirectory() as tmpdir:
dest = Path(tmpdir) / "fleet-cost.md"
module.write_report(dest, summary, report_date="2026-04-22")
self.assertTrue(dest.exists())
text = dest.read_text()
self.assertIn("Fleet Cost Report", text)
self.assertIn("Ezra", text)
if __name__ == "__main__":
unittest.main()