#!/usr/bin/env python3 """ Progress Tracker — Pipeline 10.8 Track improvement metrics over time. Are we getting better? Metrics tracked: 1. Test coverage — % of Python functions with associated tests (test:source file ratio + line coverage if available) 2. Doc coverage — % of Python callables with docstrings (AST-based) 3. Issue close rate — closed / (opened + closed) per week (Gitea API) 4. Dep freshness — % of requirements pinned vs outdated (pip list --outdated) Output: - metrics/snapshots/YYYY-MM-DD.json — one snapshot per run - metrics/TRENDS.md — cumulative markdown table - stdout summary Usage: python3 scripts/progress_tracker.py python3 scripts/progress_tracker.py --json python3 scripts/progress_tracker.py --output metrics/TRENDS.md Weekly cron: 0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/progress_tracker.py """ import argparse import json import os import re import subprocess import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # ── Configuration ────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = SCRIPT_DIR.parent METRICS_DIR = REPO_ROOT / "metrics" SNAPSHOTS_DIR = METRICS_DIR / "snapshots" TOKEN_PATH = Path.home() / ".config" / "gitea" / "token" GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1" ORG = "Timmy_Foundation" # Ensure paths exist SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True) # ── Helpers ───────────────────────────────────────────────────────────────── def run_cmd(cmd: List[str], cwd: Path = REPO_ROOT) -> str: """Run a shell command and return stdout (stderr merged).""" result = subprocess.run( cmd, capture_output=True, text=True, cwd=cwd, timeout=30 ) if result.returncode != 0: return "" return result.stdout.strip() def slugify_date(dt: datetime) -> str: return dt.strftime("%Y-%m-%d") def snapshot_path(dt: datetime) -> Path: return SNAPSHOTS_DIR / f"{slugify_date(dt)}.json" def load_snapshots() -> List[Dict[str, Any]]: """Load all existing snapshots sorted by date.""" snapshots = [] for f in sorted(SNAPSHOTS_DIR.glob("*.json")): try: with open(f) as fp: snapshots.append(json.load(fp)) except Exception: continue return snapshots # ── Metric 1: Test Coverage ───────────────────────────────────────────────── def collect_test_coverage() -> Dict[str, Any]: """ Compute test coverage metrics. Counts test_*.py and *_test.py files vs non-test .py source files. Also attempts to read .coverage if present. """ all_py = list(REPO_ROOT.rglob("*.py")) source_files = [] test_files = [] for p in all_py: try: rel_parts = p.relative_to(REPO_ROOT).parts except ValueError: continue # Skip hidden/cache/temp dirs (check only relative parts) if any(part.startswith('.') or part.startswith('__') for part in rel_parts): continue if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts): continue if p.name.startswith("test_") or p.name.endswith("_test.py"): test_files.append(p) else: source_files.append(p) # Try to get line coverage from .coverage coverage_percent = None coverage_tool = None coverage_file = REPO_ROOT / ".coverage" if coverage_file.exists(): try: import coverage # type: ignore # Use coverage API if available cov = coverage.Coverage(data_file=str(coverage_file)) cov.load() total = cov.report() coverage_percent = total if isinstance(total, float) else None coverage_tool = "coverage" except Exception: # Fallback: parse `coverage report` output out = run_cmd(["coverage", "report", "--skip-empty"]) if out: for line in out.splitlines(): if "TOTAL" in line: parts = line.split() if len(parts) >= 2: try: coverage_percent = float(parts[-1].rstrip('%')) coverage_tool = "coverage" break except ValueError: pass return { "test_files": len(test_files), "source_files": len(source_files), "test_to_source_ratio": round(len(test_files) / len(source_files), 4) if source_files else 0.0, "coverage_tool": coverage_tool, "coverage_percent": coverage_percent, } # ── Metric 2: Doc Coverage ────────────────────────────────────────────────── def collect_doc_coverage() -> Dict[str, Any]: """ Check AST of Python files for docstrings. Returns: callables_total, callables_with_doc, doc_coverage_percent """ import ast all_py = list(REPO_ROOT.rglob("*.py")) source_files = [] test_files = [] for p in all_py: try: rel_parts = p.relative_to(REPO_ROOT).parts except ValueError: continue if any(part.startswith('.') or part.startswith('__') for part in rel_parts): continue if any(part in ('node_modules', 'venv', '.venv', 'env', '.pytest_cache') for part in rel_parts): continue if p.name.startswith("test_") or p.name.endswith("_test.py"): test_files.append(p) else: source_files.append(p) total_callables = 0 with_doc = 0 for p in source_files + test_files: try: with open(p) as f: tree = ast.parse(f.read(), filename=str(p)) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): total_callables += 1 doc = ast.get_docstring(node) if doc and doc.strip(): with_doc += 1 except Exception: continue return { "callables_total": total_callables, "callables_with_doc": with_doc, "doc_coverage_percent": round((with_doc / total_callables * 100) if total_callables else 0.0, 2), } # ── Metric 3: Issue Close Rate ────────────────────────────────────────────── def collect_issue_metrics() -> Dict[str, Any]: """ Use Gitea API to get issue open/close stats for the last 7 days. Returns counts and close rate. """ token = "" if TOKEN_PATH.exists(): token = TOKEN_PATH.read_text().strip() if not token: return { "opened_last_7d": None, "closed_last_7d": None, "close_rate": None, "total_open": None, "note": "Gitea token not available" } try: from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError except ImportError: return {"error": "urllib not available"} now = datetime.now(timezone.utc) week_ago = now - timedelta(days=7) since = week_ago.strftime("%Y-%m-%d") headers = {"Authorization": f"token {token}"} base_url = f"{GITEA_API_BASE}/repos/{ORG}/compounding-intelligence/issues" try: # Get issues from last 7 days url = f"{base_url}?state=all&since={since}&per_page=100" req = Request(url, headers=headers) with urlopen(req, timeout=15) as resp: issues = json.loads(resp.read()) opened = 0 closed = 0 for issue in issues: created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00")) if created >= week_ago: opened += 1 if issue.get("state") == "closed": closed_at_str = issue.get("closed_at") if closed_at_str: closed_at = datetime.fromisoformat(closed_at_str.replace("Z", "+00:00")) if closed_at >= week_ago: closed += 1 # Total open issues req2 = Request(f"{base_url}?state=open&per_page=1", headers=headers) with urlopen(req2, timeout=15) as resp: total_open = int(resp.headers.get("X-Total-Count", "0")) total = opened + closed close_rate = closed / total if total > 0 else 0.0 return { "opened_last_7d": opened, "closed_last_7d": closed, "close_rate": round(close_rate, 4), "total_open": total_open, } except Exception as e: return { "opened_last_7d": None, "closed_last_7d": None, "close_rate": None, "total_open": None, "error": str(e)[:100], "note": "Gitea API unavailable" } # ── Metric 4: Dependency Freshness ───────────────────────────────────────── def collect_dep_freshness() -> Dict[str, Any]: """ Check requirements.txt for outdated dependencies using pip list --outdated. Returns freshness percentage and outdated list. """ req_file = REPO_ROOT / "requirements.txt" if not req_file.exists(): return { "total_deps": 0, "outdated_deps": 0, "freshness_percent": 100.0, "outdated_list": [], "note": "requirements.txt not found" } # Parse requirements (very simple: take name before comparison op) reqs = [] with open(req_file) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue m = re.match(r"^([a-zA-Z0-9_.-]+)", line) if m: reqs.append(m.group(1)) if not reqs: return {"total_deps": 0, "outdated_deps": 0, "freshness_percent": 100.0, "outdated_list": []} # Query pip for outdated packages (may fail if pip not available) outdated_names = set() try: out = run_cmd(["pip", "list", "--outdated", "--format=json"]) if out: data = json.loads(out) outdated_names = {item["name"].lower() for item in data} except Exception: pass outdated = [p for p in reqs if p.lower() in outdated_names] total = len(reqs) outdated_count = len(outdated) freshness = round(((total - outdated_count) / total * 100) if total else 100.0, 1) return { "total_deps": total, "outdated_deps": outdated_count, "freshness_percent": freshness, "outdated_list": outdated, } # ── Snapshot & Trends ─────────────────────────────────────────────────────── def take_snapshot() -> Dict[str, Any]: """Collect all metrics and return a snapshot dict.""" now = datetime.now(timezone.utc) test_cov = collect_test_coverage() doc_cov = collect_doc_coverage() issues = collect_issue_metrics() deps = collect_dep_freshness() return { "timestamp": now.isoformat(), "date": slugify_date(now), "metrics": { "test_coverage": test_cov, "doc_coverage": doc_cov, "issues": issues, "dependencies": deps, } } def save_snapshot(snapshot: Dict[str, Any]) -> Path: path = snapshot_path(datetime.fromisoformat(snapshot["timestamp"])) with open(path, "w") as f: json.dump(snapshot, f, indent=2) return path def generate_trends(snapshots: List[Dict[str, Any]], output_path: Optional[Path] = None) -> str: """Generate markdown trends table; optionally write to file.""" if not snapshots: msg = "# Progress Tracker — Trends\n\nNo snapshots yet. Run `progress_tracker.py` to create the first snapshot." if output_path: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(msg) return msg lines = [ "# Progress Tracker — Trends", f"\nLast updated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}", f"\nSnapshots: {len(snapshots)}\n", "| Date | Test Files → Source | Doc Coverage | Issues Closed/Opened (7d) | Dep Freshness |", "|------|---------------------|--------------|---------------------------|---------------|", ] for snap in reversed(snapshots): # chronological date = snap["date"] m = snap["metrics"] tc = m["test_coverage"] test_str = f"{tc['test_files']}/{tc['source_files']} ({tc['test_to_source_ratio']:.2f})" doc_str = f"{m['doc_coverage']['doc_coverage_percent']:.1f}%" issues_str = f"{m['issues'].get('closed_last_7d','-')}/{m['issues'].get('opened_last_7d','-')}" dep_str = f"{m['dependencies'].get('freshness_percent','?')}%" lines.append(f"| {date} | {test_str} | {doc_str} | {issues_str} | {dep_str} |") # Current snapshot summary cur = snapshots[-1] cm = cur["metrics"] lines.append(f"\n## Current Snapshot ({cur['date']})\n") tc = cm["test_coverage"] cov_line = f"- Test coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})\n" if tc["coverage_percent"] else "- Test coverage: (pytest-cov not configured)\n" lines.append(cov_line) lines.append(f"- Doc coverage: {cm['doc_coverage']['doc_coverage_percent']:.1f}%") im = cm["issues"] if im.get("close_rate") is not None: lines.append(f"- Issue close rate (7d): {im['close_rate']*100:.1f}% ({im['closed_last_7d']} closed, {im['opened_last_7d']} opened)") else: lines.append(f"- Issue metrics: {im.get('note','unavailable')}") dd = cm["dependencies"] lines.append(f"- Dep freshness: {dd.get('freshness_percent','?')}% outdated ({dd.get('outdated_deps',0)}/{dd.get('total_deps',0)} deps)") if dd.get('outdated_list'): lines.append(f" Outdated: {', '.join(dd['outdated_list'][:5])}") content = "\n".join(lines) + "\n" if output_path: output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(content) return content # ── Main ───────────────────────────────────────────────────────────────────── def main() -> int: parser = argparse.ArgumentParser(description="Progress Tracker — 10.8") parser.add_argument("--json", action="store_true", help="Emit snapshot as JSON only") parser.add_argument("--output", type=Path, default=METRICS_DIR / "TRENDS.md", help="Write trends markdown to this file") args = parser.parse_args() snapshot = take_snapshot() all_snapshots = load_snapshots() path_written = save_snapshot(snapshot) if args.json: print(json.dumps(snapshot, indent=2)) return 0 trends = generate_trends(all_snapshots + [snapshot], output_path=args.output) # Print current snapshot summary print(f"Snapshot saved: {path_written}\n") print(f"Progress Tracker — {snapshot['date']}") print("=" * 50) m = snapshot["metrics"] tc = m["test_coverage"] print(f"Test files: {tc['test_files']} | Source files: {tc['source_files']} | Ratio: {tc['test_to_source_ratio']:.3f}") if tc["coverage_percent"] is not None: print(f"Line coverage: {tc['coverage_percent']:.1f}% (via {tc['coverage_tool']})") else: print("Line coverage: (not available — run `pytest --cov`)") print() dc = m["doc_coverage"] print(f"Callables with docstrings: {dc['callables_with_doc']}/{dc['callables_total']} ({dc['doc_coverage_percent']:.1f}%)") print() im = m["issues"] if im.get("close_rate") is not None: print(f"Issues (7d): {im['closed_last_7d']} closed / {im['opened_last_7d']} opened → close rate: {im['close_rate']*100:.1f}%") print(f"Total open: {im['total_open']}") else: print(f"Issues: {im.get('note','unavailable')}") print() dd = m["dependencies"] print(f"Dependencies: {dd.get('total_deps',0)} total, {dd.get('outdated_deps',0)} outdated") if dd.get('outdated_list'): shown = dd['outdated_list'][:5] print(f"Outdated: {', '.join(shown)}" + ("..." if len(dd['outdated_list']) > 5 else "")) print(f"\nTrends written to: {args.output}") return 0 if __name__ == "__main__": sys.exit(main())