Files
timmy-config/scripts/provenance_dashboard.py
2026-04-21 21:23:48 -04:00

132 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""
provenance_dashboard.py — Provenance statistics dashboard.
Shows pair count by model, by source, coverage over time.
Usage:
python3 provenance_dashboard.py
python3 provenance_dashboard.py --days 30
python3 provenance_dashboard.py --json
"""
import json
import os
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List
DATA_DIRS = [
Path.home() / "timmy-config" / "training-data",
Path.home() / "timmy-config" / "training" / "data",
]
def scan_training_data() -> List[dict]:
"""Scan all JSONL files for provenance metadata."""
all_pairs = []
for data_dir in DATA_DIRS:
if not data_dir.exists():
continue
for filepath in data_dir.rglob("*.jsonl"):
try:
with open(filepath) as f:
for line in f:
if line.strip():
pair = json.loads(line)
pair["_source_file"] = str(filepath.relative_to(data_dir))
all_pairs.append(pair)
except (json.JSONDecodeError, IOError):
continue
return all_pairs
def compute_stats(pairs: List[dict]) -> dict:
"""Compute provenance statistics."""
total = len(pairs)
if total == 0:
return {"total": 0}
with_session = sum(1 for p in pairs if p.get("source_session_id"))
with_model = sum(1 for p in pairs if p.get("model"))
with_timestamp = sum(1 for p in pairs if p.get("timestamp"))
models = Counter(p.get("model", "unknown") for p in pairs)
sources = Counter(p.get("source_type", p.get("source", "unknown")) for p in pairs)
files = Counter(p.get("_source_file", "unknown") for p in pairs)
return {
"total": total,
"with_session_id": with_session,
"with_model": with_model,
"with_timestamp": with_timestamp,
"coverage_session_pct": round(with_session / total * 100, 1),
"coverage_model_pct": round(with_model / total * 100, 1),
"coverage_timestamp_pct": round(with_timestamp / total * 100, 1),
"by_model": dict(models.most_common(15)),
"by_source": dict(sources.most_common()),
"by_file": dict(files.most_common(10)),
}
def render_dashboard(stats: dict):
"""Render human-readable dashboard."""
print(f"\n{'='*60}")
print(f" PROVENANCE DASHBOARD")
print(f" {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"{'='*60}")
total = stats.get("total", 0)
if total == 0:
print(" No training data found.")
print(f"{'='*60}\n")
return
print(f"\n Total pairs: {total}")
print(f"\n Coverage:")
print(f" source_session_id: {stats['with_session_id']:>6} ({stats['coverage_session_pct']}%)")
print(f" model: {stats['with_model']:>6} ({stats['coverage_model_pct']}%)")
print(f" timestamp: {stats['with_timestamp']:>6} ({stats['coverage_timestamp_pct']}%)")
print(f"\n By Model:")
for model, count in sorted(stats["by_model"].items(), key=lambda x: -x[1]):
bar = "" * (count * 40 // total)
print(f" {model:<25} {count:>6} {bar}")
print(f"\n By Source:")
for source, count in sorted(stats["by_source"].items(), key=lambda x: -x[1]):
print(f" {source:<20} {count:>6}")
print(f"\n Top Files:")
for fname, count in list(stats["by_file"].items())[:10]:
short = fname[-40:] if len(fname) > 40 else fname
print(f" {short:<42} {count:>6}")
print(f"{'='*60}\n")
def main():
import argparse
parser = argparse.ArgumentParser(description="Provenance Dashboard")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
pairs = scan_training_data()
stats = compute_stats(pairs)
if args.json:
# Remove internal fields
for p in pairs:
p.pop("_source_file", None)
print(json.dumps(stats, indent=2))
else:
render_dashboard(stats)
if __name__ == "__main__":
main()