From d93d3ca7ecc52cab507f0b618bc112bdbd4d59fd Mon Sep 17 00:00:00 2001 From: Bezalel Date: Wed, 15 Apr 2026 23:01:55 +0000 Subject: [PATCH] feat: add provenance dashboard Part of #752 --- training/provenance_dashboard.py | 134 +++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 training/provenance_dashboard.py diff --git a/training/provenance_dashboard.py b/training/provenance_dashboard.py new file mode 100644 index 00000000..1d6a6612 --- /dev/null +++ b/training/provenance_dashboard.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +Provenance Dashboard + +Shows statistics about training data provenance: +- Pair count by model over time +- Pair count by source +- Exclusion statistics +- Provenance coverage + +Usage: + python provenance_dashboard.py data/merged_training_data.jsonl + python provenance_dashboard.py data/ --all +""" + +import argparse +import json +import sys +from pathlib import Path +from collections import defaultdict +from datetime import datetime + +try: + from training_pair_provenance import get_provenance_stats +except ImportError: + sys.path.insert(0, str(Path(__file__).parent)) + from training_pair_provenance import get_provenance_stats + + +def load_jsonl(path: Path) -> list[dict]: + """Load a JSONL file.""" + entries = [] + with open(path) as f: + for line in f: + line = line.strip() + if line: + entries.append(json.loads(line)) + return entries + + +def analyze_temporal(pairs: list[dict]) -> dict: + """Analyze pairs by time period.""" + by_date = defaultdict(lambda: {"total": 0, "with_provenance": 0}) + + for pair in pairs: + prov = pair.get("provenance", {}) + ts = prov.get("timestamp", "") + if ts: + date = ts[:10] # YYYY-MM-DD + else: + date = "unknown" + + by_date[date]["total"] += 1 + if prov: + by_date[date]["with_provenance"] += 1 + + return dict(by_date) + + +def print_dashboard(pairs: list[dict], title: str = "Training Data Provenance"): + """Print comprehensive provenance dashboard.""" + stats = get_provenance_stats(pairs) + temporal = analyze_temporal(pairs) + + print(f"\n{'=' * 60}") + print(f" {title}") + print(f"{'=' * 60}") + + # Coverage + print(f"\nPROVENANCE COVERAGE") + print(f" Total pairs: {stats['total_pairs']}") + print(f" With provenance: {stats['with_provenance']}") + print(f" Coverage: {stats['coverage_pct']}%") + print(f" Excluded: {stats['excluded']}") + + # By source type + print(f"\nBY SOURCE TYPE") + for st, count in sorted(stats["by_source_type"].items()): + pct = round(count / stats['total_pairs'] * 100, 1) if stats['total_pairs'] else 0 + bar = "█" * int(pct / 2) + print(f" {st:15} {count:6} ({pct:5.1f}%) {bar}") + + # By model + print(f"\nBY MODEL") + for model, count in sorted(stats["by_model"].items()): + pct = round(count / stats['total_pairs'] * 100, 1) if stats['total_pairs'] else 0 + bar = "█" * int(pct / 2) + print(f" {model:20} {count:6} ({pct:5.1f}%) {bar}") + + # Temporal + if temporal: + print(f"\nBY DATE (last 10)") + sorted_dates = sorted(temporal.keys())[-10:] + for date in sorted_dates: + d = temporal[date] + print(f" {date} total={d['total']:4} provenance={d['with_provenance']:4}") + + # Quality indicators + print(f"\nQUALITY INDICATORS") + if stats['coverage_pct'] == 100: + print(" ✓ Full provenance coverage") + elif stats['coverage_pct'] >= 90: + print(" ⚠ Near-full coverage — some pairs missing provenance") + else: + print(" ✗ Low coverage — many pairs missing provenance") + + if stats['excluded'] > 0: + print(f" ℹ {stats['excluded']} pairs excluded from training") + + print() + + +def main(): + parser = argparse.ArgumentParser(description="Provenance Dashboard") + parser.add_argument("input", type=str, help="Path to JSONL or directory") + parser.add_argument("--all", action="store_true", help="Process all JSONL in directory") + args = parser.parse_args() + + input_path = Path(args.input) + + if input_path.is_file(): + pairs = load_jsonl(input_path) + print_dashboard(pairs, f"Provenance: {input_path.name}") + elif input_path.is_dir() and args.all: + for jsonl_file in sorted(input_path.glob("*.jsonl")): + pairs = load_jsonl(jsonl_file) + print_dashboard(pairs, f"Provenance: {jsonl_file.name}") + else: + print(f"ERROR: {input_path} is not a file (use --all for directories)") + sys.exit(1) + + +if __name__ == "__main__": + main()