feat(10.8): add Progress Tracker — Tracks metrics over time #250

Open
Rockachopa wants to merge 1 commits from step35/173-10-8-progress-tracker into main

477
scripts/progress_tracker.py Normal file
View File

@@ -0,0 +1,477 @@
#!/usr/bin/env python3
"""
Progress Tracker — Pipeline 10.8
Track improvement metrics over time. Are we getting better?
Metrics tracked:
1. Test coverage — % of Python functions with associated tests (test:source file ratio + line coverage if available)
2. Doc coverage — % of Python callables with docstrings (AST-based)
3. Issue close rate — closed / (opened + closed) per week (Gitea API)
4. Dep freshness — % of requirements pinned vs outdated (pip list --outdated)
Output:
- metrics/snapshots/YYYY-MM-DD.json — one snapshot per run
- metrics/TRENDS.md — cumulative markdown table
- stdout summary
Usage:
python3 scripts/progress_tracker.py
python3 scripts/progress_tracker.py --json
python3 scripts/progress_tracker.py --output metrics/TRENDS.md
Weekly cron:
0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/progress_tracker.py
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ── Configuration ──────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
METRICS_DIR = REPO_ROOT / "metrics"
SNAPSHOTS_DIR = METRICS_DIR / "snapshots"
TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
# Ensure paths exist
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
# ── Helpers ─────────────────────────────────────────────────────────────────
def run_cmd(cmd: List[str], cwd: Path = REPO_ROOT) -> str:
"""Run a shell command and return stdout (stderr merged)."""
result = subprocess.run(
cmd, capture_output=True, text=True, cwd=cwd, timeout=30
)
if result.returncode != 0:
return ""
return result.stdout.strip()
def slugify_date(dt: datetime) -> str:
return dt.strftime("%Y-%m-%d")
def snapshot_path(dt: datetime) -> Path:
return SNAPSHOTS_DIR / f"{slugify_date(dt)}.json"
def load_snapshots() -> List[Dict[str, Any]]:
"""Load all existing snapshots sorted by date."""
snapshots = []
for f in sorted(SNAPSHOTS_DIR.glob("*.json")):
try:
with open(f) as fp:
snapshots.append(json.load(fp))
except Exception:
continue
return snapshots
# ── Metric 1: Test Coverage ─────────────────────────────────────────────────
def collect_test_coverage() -> Dict[str, Any]:
"""
Compute test coverage metrics.
Counts test_*.py and *_test.py files vs non-test .py source files.
Also attempts to read .coverage if present.
"""
all_py = list(REPO_ROOT.rglob("*.py"))
source_files = []
test_files = []
for p in all_py:
try:
rel_parts = p.relative_to(REPO_ROOT).parts
except ValueError:
continue
# Skip hidden/cache/temp dirs (check only relative parts)
if any(part.startswith('.') or part.startswith('__') for part in rel_parts):
continue
if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts):
continue
if p.name.startswith("test_") or p.name.endswith("_test.py"):
test_files.append(p)
else:
source_files.append(p)
# Try to get line coverage from .coverage
coverage_percent = None
coverage_tool = None
coverage_file = REPO_ROOT / ".coverage"
if coverage_file.exists():
try:
import coverage # type: ignore
# Use coverage API if available
cov = coverage.Coverage(data_file=str(coverage_file))
cov.load()
total = cov.report()
coverage_percent = total if isinstance(total, float) else None
coverage_tool = "coverage"
except Exception:
# Fallback: parse `coverage report` output
out = run_cmd(["coverage", "report", "--skip-empty"])
if out:
for line in out.splitlines():
if "TOTAL" in line:
parts = line.split()
if len(parts) >= 2:
try:
coverage_percent = float(parts[-1].rstrip('%'))
coverage_tool = "coverage"
break
except ValueError:
pass
return {
"test_files": len(test_files),
"source_files": len(source_files),
"test_to_source_ratio": round(len(test_files) / len(source_files), 4) if source_files else 0.0,
"coverage_tool": coverage_tool,
"coverage_percent": coverage_percent,
}
# ── Metric 2: Doc Coverage ──────────────────────────────────────────────────
def collect_doc_coverage() -> Dict[str, Any]:
"""
Check AST of Python files for docstrings.
Returns: callables_total, callables_with_doc, doc_coverage_percent
"""
import ast
all_py = list(REPO_ROOT.rglob("*.py"))
source_files = []
test_files = []
for p in all_py:
try:
rel_parts = p.relative_to(REPO_ROOT).parts
except ValueError:
continue
if any(part.startswith('.') or part.startswith('__') for part in rel_parts):
continue
if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts):
continue
if p.name.startswith("test_") or p.name.endswith("_test.py"):
test_files.append(p)
else:
source_files.append(p)
total_callables = 0
with_doc = 0
for p in source_files + test_files:
try:
with open(p) as f:
tree = ast.parse(f.read(), filename=str(p))
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
total_callables += 1
doc = ast.get_docstring(node)
if doc and doc.strip():
with_doc += 1
except Exception:
continue
return {
"callables_total": total_callables,
"callables_with_doc": with_doc,
"doc_coverage_percent": round((with_doc / total_callables * 100) if total_callables else 0.0, 2),
}
# ── Metric 3: Issue Close Rate ──────────────────────────────────────────────
def collect_issue_metrics() -> Dict[str, Any]:
"""
Use Gitea API to get issue open/close stats for the last 7 days.
Returns counts and close rate.
"""
token = ""
if TOKEN_PATH.exists():
token = TOKEN_PATH.read_text().strip()
if not token:
return {
"opened_last_7d": None,
"closed_last_7d": None,
"close_rate": None,
"total_open": None,
"note": "Gitea token not available"
}
try:
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
except ImportError:
return {"error": "urllib not available"}
now = datetime.now(timezone.utc)
week_ago = now - timedelta(days=7)
since = week_ago.strftime("%Y-%m-%d")
headers = {"Authorization": f"token {token}"}
base_url = f"{GITEA_API_BASE}/repos/{ORG}/compounding-intelligence/issues"
try:
# Get issues from last 7 days
url = f"{base_url}?state=all&since={since}&per_page=100"
req = Request(url, headers=headers)
with urlopen(req, timeout=15) as resp:
issues = json.loads(resp.read())
opened = 0
closed = 0
for issue in issues:
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
if created >= week_ago:
opened += 1
if issue.get("state") == "closed":
closed_at_str = issue.get("closed_at")
if closed_at_str:
closed_at = datetime.fromisoformat(closed_at_str.replace("Z", "+00:00"))
if closed_at >= week_ago:
closed += 1
# Total open issues
req2 = Request(f"{base_url}?state=open&per_page=1", headers=headers)
with urlopen(req2, timeout=15) as resp:
total_open = int(resp.headers.get("X-Total-Count", "0"))
total = opened + closed
close_rate = closed / total if total > 0 else 0.0
return {
"opened_last_7d": opened,
"closed_last_7d": closed,
"close_rate": round(close_rate, 4),
"total_open": total_open,
}
except Exception as e:
return {
"opened_last_7d": None,
"closed_last_7d": None,
"close_rate": None,
"total_open": None,
"error": str(e)[:100],
"note": "Gitea API unavailable"
}
# ── Metric 4: Dependency Freshness ─────────────────────────────────────────
def collect_dep_freshness() -> Dict[str, Any]:
"""
Check requirements.txt for outdated dependencies using pip list --outdated.
Returns freshness percentage and outdated list.
"""
req_file = REPO_ROOT / "requirements.txt"
if not req_file.exists():
return {
"total_deps": 0,
"outdated_deps": 0,
"freshness_percent": 100.0,
"outdated_list": [],
"note": "requirements.txt not found"
}
# Parse requirements (very simple: take name before comparison op)
reqs = []
with open(req_file) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
m = re.match(r"^([a-zA-Z0-9_.-]+)", line)
if m:
reqs.append(m.group(1))
if not reqs:
return {"total_deps": 0, "outdated_deps": 0, "freshness_percent": 100.0, "outdated_list": []}
# Query pip for outdated packages (may fail if pip not available)
outdated_names = set()
try:
out = run_cmd(["pip", "list", "--outdated", "--format=json"])
if out:
data = json.loads(out)
outdated_names = {item["name"].lower() for item in data}
except Exception:
pass
outdated = [p for p in reqs if p.lower() in outdated_names]
total = len(reqs)
outdated_count = len(outdated)
freshness = round(((total - outdated_count) / total * 100) if total else 100.0, 1)
return {
"total_deps": total,
"outdated_deps": outdated_count,
"freshness_percent": freshness,
"outdated_list": outdated,
}
# ── Snapshot & Trends ───────────────────────────────────────────────────────
def take_snapshot() -> Dict[str, Any]:
"""Collect all metrics and return a snapshot dict."""
now = datetime.now(timezone.utc)
test_cov = collect_test_coverage()
doc_cov = collect_doc_coverage()
issues = collect_issue_metrics()
deps = collect_dep_freshness()
return {
"timestamp": now.isoformat(),
"date": slugify_date(now),
"metrics": {
"test_coverage": test_cov,
"doc_coverage": doc_cov,
"issues": issues,
"dependencies": deps,
}
}
def save_snapshot(snapshot: Dict[str, Any]) -> Path:
path = snapshot_path(datetime.fromisoformat(snapshot["timestamp"]))
with open(path, "w") as f:
json.dump(snapshot, f, indent=2)
return path
def generate_trends(snapshots: List[Dict[str, Any]], output_path: Optional[Path] = None) -> str:
"""Generate markdown trends table; optionally write to file."""
if not snapshots:
msg = "# Progress Tracker — Trends\n\nNo snapshots yet. Run `progress_tracker.py` to create the first snapshot."
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(msg)
return msg
lines = [
"# Progress Tracker — Trends",
f"\nLast updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
f"\nSnapshots: {len(snapshots)}\n",
"| Date | Test Files → Source | Doc Coverage | Issues Closed/Opened (7d) | Dep Freshness |",
"|------|---------------------|--------------|---------------------------|---------------|",
]
for snap in reversed(snapshots): # chronological
date = snap["date"]
m = snap["metrics"]
tc = m["test_coverage"]
test_str = f"{tc['test_files']}/{tc['source_files']} ({tc['test_to_source_ratio']:.2f})"
doc_str = f"{m['doc_coverage']['doc_coverage_percent']:.1f}%"
issues_str = f"{m['issues'].get('closed_last_7d','-')}/{m['issues'].get('opened_last_7d','-')}"
dep_str = f"{m['dependencies'].get('freshness_percent','?')}%"
lines.append(f"| {date} | {test_str} | {doc_str} | {issues_str} | {dep_str} |")
# Current snapshot summary
cur = snapshots[-1]
cm = cur["metrics"]
lines.append(f"\n## Current Snapshot ({cur['date']})\n")
tc = cm["test_coverage"]
cov_line = f"- Test coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})\n" if tc["coverage_percent"] else "- Test coverage: (pytest-cov not configured)\n"
lines.append(cov_line)
lines.append(f"- Doc coverage: {cm['doc_coverage']['doc_coverage_percent']:.1f}%")
im = cm["issues"]
if im.get("close_rate") is not None:
lines.append(f"- Issue close rate (7d): {im['close_rate']*100:.1f}% ({im['closed_last_7d']} closed, {im['opened_last_7d']} opened)")
else:
lines.append(f"- Issue metrics: {im.get('note','unavailable')}")
dd = cm["dependencies"]
lines.append(f"- Dep freshness: {dd.get('freshness_percent','?')}% outdated ({dd.get('outdated_deps',0)}/{dd.get('total_deps',0)} deps)")
if dd.get('outdated_list'):
lines.append(f" Outdated: {', '.join(dd['outdated_list'][:5])}")
content = "\n".join(lines) + "\n"
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(content)
return content
# ── Main ─────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="Progress Tracker — 10.8")
parser.add_argument("--json", action="store_true", help="Emit snapshot as JSON only")
parser.add_argument("--output", type=Path, default=METRICS_DIR / "TRENDS.md",
help="Write trends markdown to this file")
args = parser.parse_args()
snapshot = take_snapshot()
all_snapshots = load_snapshots()
path_written = save_snapshot(snapshot)
if args.json:
print(json.dumps(snapshot, indent=2))
return 0
trends = generate_trends(all_snapshots + [snapshot], output_path=args.output)
# Print current snapshot summary
print(f"Snapshot saved: {path_written}\n")
print(f"Progress Tracker — {snapshot['date']}")
print("=" * 50)
m = snapshot["metrics"]
tc = m["test_coverage"]
print(f"Test files: {tc['test_files']} | Source files: {tc['source_files']} | Ratio: {tc['test_to_source_ratio']:.3f}")
if tc["coverage_percent"] is not None:
print(f"Line coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})")
else:
print("Line coverage: (not available — run `pytest --cov`)")
print()
dc = m["doc_coverage"]
print(f"Callables with docstrings: {dc['callables_with_doc']}/{dc['callables_total']} ({dc['doc_coverage_percent']:.1f}%)")
print()
im = m["issues"]
if im.get("close_rate") is not None:
print(f"Issues (7d): {im['closed_last_7d']} closed / {im['opened_last_7d']} opened → close rate: {im['close_rate']*100:.1f}%")
print(f"Total open: {im['total_open']}")
else:
print(f"Issues: {im.get('note','unavailable')}")
print()
dd = m["dependencies"]
print(f"Dependencies: {dd.get('total_deps',0)} total, {dd.get('outdated_deps',0)} outdated")
if dd.get('outdated_list'):
shown = dd['outdated_list'][:5]
print(f"Outdated: {', '.join(shown)}" + ("..." if len(dd['outdated_list']) > 5 else ""))
print(f"\nTrends written to: {args.output}")
return 0
if __name__ == "__main__":
sys.exit(main())