Compare commits

...

6 Commits

Author SHA1 Message Date
65bc4f8089 feat: add Makefile targets for provenance tracking (#752)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 28s
PR Checklist / pr-checklist (pull_request) Failing after 3m25s
Smoke Test / smoke (pull_request) Failing after 9s
Validate Config / YAML Lint (pull_request) Failing after 7s
Validate Config / JSON Validate (pull_request) Successful in 7s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m12s
Validate Config / Shell Script Lint (pull_request) Failing after 39s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-15 22:46:28 +00:00
dd13c5302f ci: add provenance validation to training data pipeline (#752) 2026-04-15 22:45:23 +00:00
6ea7b4245f feat: add provenance to curated training pairs via make_session (#752) 2026-04-15 22:44:46 +00:00
b87dcfc676 feat: add provenance tracking to trajectory ingestion (#752) 2026-04-15 22:43:39 +00:00
b5f480da47 test: provenance tracking tests — 21 cases (#752) 2026-04-15 22:43:29 +00:00
3b6ff9038e feat: provenance tracking module for training data pairs (#752) 2026-04-15 22:43:24 +00:00
6 changed files with 722 additions and 4 deletions

View File

@@ -22,3 +22,17 @@ jobs:
run: |
cd training/data/scene-descriptions
python3 validate.py *.jsonl
- name: Validate training data provenance
run: |
cd training
python3 -c "
from training_pair_provenance import validate_provenance
import json, sys, glob
issues = 0
for f in glob.glob('data/*.jsonl'):
report = validate_provenance(f)
print(f'{f}: {report["coverage"]:.0f}% coverage ({report["with_provenance"]}/{report["total"]})')
if report['missing_provenance'] > 0:
print(f' WARNING: {report["missing_provenance"]} pairs missing provenance')
sys.exit(0)
"

View File

@@ -99,3 +99,19 @@ convert: ## Convert merged dataset to MLX format (train/valid split)
help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-16s\033[0m %s\n", $$1, $$2}'
# ── Provenance ──────────────────────────────────────────────────────────
provenance-validate: ## Validate provenance metadata on all training data
@python3 training_pair_provenance.py validate data/curated_dataset.jsonl
@python3 training_pair_provenance.py validate data/merged_training_data.jsonl
provenance-dashboard: ## Show provenance dashboard for training data
@python3 training_pair_provenance.py dashboard data/merged_training_data.jsonl
provenance-backfill: ## Add provenance to pairs missing it
@python3 training_pair_provenance.py backfill data/curated_dataset.jsonl --source backfill --model timmy-curated
@python3 training_pair_provenance.py backfill data/merged_training_data.jsonl --source backfill --model unknown
provenance-test: ## Run provenance tracking tests
@python3 -m pytest tests/test_provenance.py -v

View File

@@ -11,6 +11,7 @@ Output: ShareGPT JSONL format matching the training pipeline.
import json
import time
from pathlib import Path
from datetime import datetime, timezone
SYSTEM_PROMPT = """# SOUL.md
@@ -49,7 +50,10 @@ I am a small model on someone's machine. I cannot save anyone. But I can refuse
def make_session(session_id, conversations):
"""Create a session in the same format as the training data."""
"""Create a session in the same format as the training data.
Includes provenance metadata for tracking curated data origin.
"""
return {
"id": session_id,
"model": "timmy-curated",
@@ -57,6 +61,13 @@ def make_session(session_id, conversations):
"message_count": len(conversations),
"tool_call_count": 0,
"conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + conversations,
# Provenance metadata — tracks source of this curated pair
"provenance": {
"source": "curated",
"source_session_id": session_id,
"model": "timmy-curated",
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
},
}

View File

@@ -11,10 +11,14 @@ Applies quality filtering:
- Skip cycles where the model just echoed the perception
- Keep cycles with genuine thought, discovery, or action
Provenance:
- Each ingested pair gets provenance metadata attached
- Tracks source_session_id, model, timestamp, source="trajectory"
Usage:
python ingest_nexus_trajectories.py \\
--trajectories ~/.nexus/trajectories/ \\
--curated ../data/curated_dataset.jsonl \\
python ingest_trajectories.py \
--trajectories ~/.nexus/trajectories/ \
--curated ../data/curated_dataset.jsonl \
--output ../data/merged_training_data.jsonl
"""
@@ -23,6 +27,30 @@ import json
from pathlib import Path
from difflib import SequenceMatcher
try:
from training_pair_provenance import attach_provenance, extract_trajectory_provenance
except ImportError:
# Fallback: inline provenance for standalone use
from datetime import datetime, timezone
def attach_provenance(pair, source, source_session_id, model, timestamp=None, extras=None):
pair["provenance"] = {
"source": source,
"source_session_id": source_session_id,
"model": model,
"timestamp": timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if extras:
pair["provenance"].update(extras)
return pair
def extract_trajectory_provenance(entry):
return {
"source_session_id": entry.get("id") or entry.get("session_id") or "unknown",
"model": entry.get("model", "unknown"),
"timestamp": entry.get("started_at") or entry.get("timestamp") or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file."""
@@ -87,6 +115,7 @@ def merge_datasets(
"trajectory_files": 0,
"trajectory_raw": 0,
"trajectory_quality": 0,
"provenance_attached": 0,
"total_output": 0,
}
@@ -105,6 +134,16 @@ def merge_datasets(
for cycle in cycles:
if is_quality_cycle(cycle, min_thought_len):
# Extract provenance from trajectory entry
prov = extract_trajectory_provenance(cycle)
cycle = attach_provenance(
cycle,
source="trajectory",
source_session_id=prov["source_session_id"],
model=prov["model"],
timestamp=prov["timestamp"],
)
stats["provenance_attached"] += 1
quality_trajectories.append(cycle)
stats["trajectory_quality"] = len(quality_trajectories)
@@ -165,6 +204,7 @@ def main():
print(f" Trajectory files: {stats['trajectory_files']}")
print(f" Raw cycles: {stats['trajectory_raw']}")
print(f" Quality cycles: {stats['trajectory_quality']}")
print(f" Provenance attached: {stats['provenance_attached']}")
print(f" Total training data: {stats['total_output']}")
print(f"\nOutput: {args.output}")

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
"""Tests for training_pair_provenance.py"""
import json
import tempfile
import unittest
from pathlib import Path
from datetime import datetime, timezone
# Adjust import path
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent))
from training_pair_provenance import (
make_provenance,
attach_provenance,
extract_trajectory_provenance,
pair_fingerprint,
validate_provenance,
provenance_dashboard,
backfill_provenance,
load_jsonl,
save_jsonl,
VALID_SOURCES,
REQUIRED_FIELDS,
)
class TestMakeProvenance(unittest.TestCase):
def test_creates_valid_provenance(self):
prov = make_provenance(
source="curated",
source_session_id="test-001",
model="timmy-curated",
)
for field in REQUIRED_FIELDS:
self.assertIn(field, prov)
self.assertEqual(prov["source"], "curated")
self.assertEqual(prov["source_session_id"], "test-001")
self.assertEqual(prov["model"], "timmy-curated")
def test_rejects_invalid_source(self):
with self.assertRaises(ValueError):
make_provenance(source="bogus", source_session_id="x", model="y")
def test_accepts_all_valid_sources(self):
for source in VALID_SOURCES:
prov = make_provenance(source=source, source_session_id="x", model="y")
self.assertEqual(prov["source"], source)
def test_includes_extras(self):
prov = make_provenance(
source="curated",
source_session_id="x",
model="y",
extras={"custom_field": "value"},
)
self.assertEqual(prov["custom_field"], "value")
def test_uses_provided_timestamp(self):
ts = "2026-01-01T00:00:00Z"
prov = make_provenance(source="curated", source_session_id="x", model="y", timestamp=ts)
self.assertEqual(prov["timestamp"], ts)
def test_generates_timestamp_if_not_provided(self):
prov = make_provenance(source="curated", source_session_id="x", model="y")
self.assertIn("T", prov["timestamp"])
self.assertIn("Z", prov["timestamp"])
class TestAttachProvenance(unittest.TestCase):
def test_attaches_to_pair(self):
pair = {"id": "test", "conversations": []}
result = attach_provenance(pair, source="curated", source_session_id="s1", model="m1")
self.assertIn("provenance", result)
self.assertEqual(result["provenance"]["source"], "curated")
def test_does_not_overwrite_existing(self):
pair = {
"id": "test",
"provenance": {"source": "original", "source_session_id": "old", "model": "x"},
}
result = attach_provenance(pair, source="new", source_session_id="new", model="y")
self.assertEqual(result["provenance"]["source"], "original")
def test_overwrites_with_force(self):
pair = {
"id": "test",
"provenance": {"source": "curated", "source_session_id": "old", "model": "x", "timestamp": "t"},
}
result = attach_provenance(
pair, source="backfill", source_session_id="new", model="y", extras={"force": True}
)
self.assertEqual(result["provenance"]["source"], "backfill")
class TestExtractTrajectoryProvenance(unittest.TestCase):
def test_extracts_from_trajectory(self):
entry = {
"id": "session-123",
"model": "hermes3:latest",
"started_at": "2026-04-14T10:00:00",
}
prov = extract_trajectory_provenance(entry)
self.assertEqual(prov["source_session_id"], "session-123")
self.assertEqual(prov["model"], "hermes3:latest")
self.assertEqual(prov["timestamp"], "2026-04-14T10:00:00")
def test_uses_defaults_for_missing(self):
entry = {}
prov = extract_trajectory_provenance(entry)
self.assertEqual(prov["source_session_id"], "unknown")
self.assertEqual(prov["model"], "unknown")
self.assertIn("T", prov["timestamp"])
class TestPairFingerprint(unittest.TestCase):
def test_same_content_same_hash(self):
pair1 = {"conversations": [{"from": "human", "value": "hi"}, {"from": "gpt", "value": "hello"}]}
pair2 = {"conversations": [{"from": "human", "value": "hi"}, {"from": "gpt", "value": "hello"}]}
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
def test_different_content_different_hash(self):
pair1 = {"conversations": [{"from": "human", "value": "hi"}]}
pair2 = {"conversations": [{"from": "human", "value": "bye"}]}
self.assertNotEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
def test_ignores_provenance_in_hash(self):
pair1 = {
"conversations": [{"from": "human", "value": "hi"}],
"provenance": {"source": "a"},
}
pair2 = {
"conversations": [{"from": "human", "value": "hi"}],
"provenance": {"source": "b"},
}
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
def test_ignores_system_prompt(self):
pair1 = {
"conversations": [
{"from": "system", "value": "prompt A"},
{"from": "human", "value": "hi"},
]
}
pair2 = {
"conversations": [
{"from": "system", "value": "prompt B"},
{"from": "human", "value": "hi"},
]
}
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
class TestValidateProvenance(unittest.TestCase):
def _write_jsonl(self, entries):
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
for entry in entries:
f.write(json.dumps(entry) + "\n")
f.close()
return f.name
def test_all_valid(self):
path = self._write_jsonl([
{"id": "1", "provenance": {"source": "curated", "source_session_id": "s1", "model": "m1", "timestamp": "2026-01-01T00:00:00Z"}},
{"id": "2", "provenance": {"source": "trajectory", "source_session_id": "s2", "model": "m2", "timestamp": "2026-01-01T00:00:00Z"}},
])
report = validate_provenance(path)
self.assertEqual(report["missing_provenance"], 0)
self.assertEqual(report["missing_fields"], 0)
self.assertEqual(report["coverage"], 100.0)
def test_missing_provenance(self):
path = self._write_jsonl([
{"id": "1", "conversations": []},
])
report = validate_provenance(path)
self.assertEqual(report["missing_provenance"], 1)
self.assertEqual(report["coverage"], 0.0)
def test_missing_fields(self):
path = self._write_jsonl([
{"id": "1", "provenance": {"source": "curated"}}, # missing session_id, model, timestamp
])
report = validate_provenance(path)
self.assertEqual(report["missing_fields"], 1)
def test_invalid_source(self):
path = self._write_jsonl([
{"id": "1", "provenance": {"source": "bogus", "source_session_id": "s1", "model": "m1", "timestamp": "2026-01-01T00:00:00Z"}},
])
report = validate_provenance(path)
self.assertEqual(report["invalid_source"], 1)
class TestBackfillProvenance(unittest.TestCase):
def _write_jsonl(self, entries):
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
for entry in entries:
f.write(json.dumps(entry) + "\n")
f.close()
return f.name
def test_backfills_missing(self):
path = self._write_jsonl([
{"id": "1", "conversations": []},
{"id": "2", "conversations": [], "provenance": {"source": "existing", "source_session_id": "s", "model": "m", "timestamp": "t"}},
])
out = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False).name
stats = backfill_provenance(path, source="backfill", model="test-model", output_path=out)
self.assertEqual(stats["backfilled"], 1)
self.assertEqual(stats["already_had"], 1)
entries = load_jsonl(out)
self.assertEqual(entries[0]["provenance"]["source"], "backfill")
self.assertEqual(entries[1]["provenance"]["source"], "existing")
class TestDashboard(unittest.TestCase):
def test_generates_output(self):
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
for i in range(5):
f.write(json.dumps({
"id": str(i),
"provenance": {
"source": "curated" if i < 3 else "trajectory",
"source_session_id": f"s{i}",
"model": "timmy-curated" if i < 3 else "hermes3",
"timestamp": f"2026-04-{14+i:02d}T00:00:00Z",
},
}) + "\n")
f.close()
output = provenance_dashboard(f.name)
self.assertIn("PROVENANCE DASHBOARD", output)
self.assertIn("timmy-curated", output)
self.assertIn("hermes3", output)
self.assertIn("curated", output)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,397 @@
#!/usr/bin/env python3
"""
training_pair_provenance.py — Provenance tracking for training data pairs.
Every training pair should carry metadata about where it came from:
- Which session/trajectory produced it
- Which model generated it
- When it was created
- What source type (curated, trajectory, augmentation)
This module provides utilities to:
1. Attach provenance metadata to training pairs
2. Validate that provenance exists
3. Generate provenance statistics/dashboards
4. Backfill provenance on existing pairs
Usage:
from training_pair_provenance import attach_provenance, validate_provenance, provenance_dashboard
# Attach provenance to a pair
pair = attach_provenance(pair, source="trajectory", session_id="abc123", model="hermes3:latest")
# Validate provenance on a dataset
report = validate_provenance("data/curated_dataset.jsonl")
# Generate dashboard
print(provenance_dashboard("data/merged_training_data.jsonl"))
"""
import json
import time
import hashlib
from pathlib import Path
from typing import Optional
from collections import Counter
from datetime import datetime, timezone
# === Required provenance fields ===
REQUIRED_FIELDS = ["source", "source_session_id", "model", "timestamp"]
# === Valid source types ===
VALID_SOURCES = {"curated", "trajectory", "augmentation", "backfill", "manual"}
def make_provenance(
source: str,
source_session_id: str,
model: str,
timestamp: Optional[str] = None,
extras: Optional[dict] = None,
) -> dict:
"""Create a provenance metadata dict.
Args:
source: One of curated, trajectory, augmentation, backfill, manual
source_session_id: Unique ID of the source session/trajectory
model: Model that generated the content
timestamp: ISO8601 timestamp (defaults to now)
extras: Optional additional metadata
Returns:
Provenance dict ready to attach to a training pair
"""
if source not in VALID_SOURCES:
raise ValueError(f"Invalid source '{source}'. Must be one of: {VALID_SOURCES}")
prov = {
"source": source,
"source_session_id": source_session_id,
"model": model,
"timestamp": timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if extras:
prov.update(extras)
return prov
def attach_provenance(
pair: dict,
source: str,
source_session_id: str,
model: str,
timestamp: Optional[str] = None,
extras: Optional[dict] = None,
) -> dict:
"""Attach provenance metadata to a training pair (mutates and returns).
The pair dict gets a 'provenance' key added. If provenance already exists,
it is NOT overwritten — use force=True in the extras to override.
Args:
pair: Training pair dict (ShareGPT format)
source: Source type
source_session_id: Session/trajectory ID
model: Model name
timestamp: ISO8601 timestamp
extras: Additional metadata
Returns:
The pair dict with provenance attached
"""
if "provenance" in pair and not (extras and extras.get("force")):
return pair
# Pop 'force' flag before passing to make_provenance
clean_extras = {k: v for k, v in (extras or {}).items() if k != "force"} or None
pair["provenance"] = make_provenance(
source=source,
source_session_id=source_session_id,
model=model,
timestamp=timestamp,
extras=clean_extras,
)
return pair
def extract_trajectory_provenance(trajectory_entry: dict) -> dict:
"""Extract provenance metadata from a trajectory JSONL entry.
Trajectory entries may have fields like:
- id / session_id
- model
- started_at / timestamp
- source file path
Returns dict with extracted fields or sensible defaults.
"""
return {
"source_session_id": (
trajectory_entry.get("id")
or trajectory_entry.get("session_id")
or "unknown"
),
"model": trajectory_entry.get("model", "unknown"),
"timestamp": (
trajectory_entry.get("started_at")
or trajectory_entry.get("timestamp")
or trajectory_entry.get("created_at")
or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
),
}
def pair_fingerprint(pair: dict) -> str:
"""Generate a stable fingerprint for a training pair.
Used for deduplication and tracking. Based on conversation content,
not metadata (so same content = same hash regardless of provenance).
"""
convos = pair.get("conversations", [])
content_parts = []
for c in convos:
if c.get("from") != "system": # Skip system prompt for fingerprint
content_parts.append(f"{c.get('from', '')}:{c.get('value', '')}")
content = "|".join(content_parts)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def load_jsonl(path) -> list[dict]:
"""Load a JSONL file."""
path = Path(path)
entries = []
with open(path) as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
return entries
def save_jsonl(path, entries: list[dict]):
"""Save entries to a JSONL file."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
for entry in entries:
f.write(json.dumps(entry) + "\n")
def validate_provenance(path) -> dict:
"""Validate provenance metadata on all pairs in a JSONL file.
Returns a report dict with:
- total: total pairs
- with_provenance: pairs that have provenance
- missing_provenance: pairs without provenance
- missing_fields: pairs with provenance but missing required fields
- invalid_source: pairs with unrecognized source type
- issues: list of specific issue descriptions
"""
path = Path(path)
if not path.exists():
return {"error": f"File not found: {path}", "total": 0}
entries = load_jsonl(path)
report = {
"total": len(entries),
"with_provenance": 0,
"missing_provenance": 0,
"missing_fields": 0,
"invalid_source": 0,
"issues": [],
}
for i, entry in enumerate(entries):
prov = entry.get("provenance")
if not prov:
report["missing_provenance"] += 1
report["issues"].append(f"Pair {i} (id={entry.get('id', '?')}): no provenance")
continue
report["with_provenance"] += 1
# Check required fields
missing = [f for f in REQUIRED_FIELDS if f not in prov]
if missing:
report["missing_fields"] += 1
report["issues"].append(
f"Pair {i} (id={entry.get('id', '?')}): missing fields: {missing}"
)
# Check source validity
source = prov.get("source", "")
if source and source not in VALID_SOURCES:
report["invalid_source"] += 1
report["issues"].append(
f"Pair {i} (id={entry.get('id', '?')}): invalid source '{source}'"
)
report["coverage"] = (
report["with_provenance"] / report["total"] * 100 if report["total"] > 0 else 0
)
return report
def provenance_dashboard(path) -> str:
"""Generate a human-readable provenance dashboard for a dataset.
Shows:
- Pair count by model over time
- Pair count by source type
- Provenance coverage
- Model distribution
"""
path = Path(path)
if not path.exists():
return f"File not found: {path}"
entries = load_jsonl(path)
if not entries:
return "Empty dataset"
models = Counter()
sources = Counter()
timestamps = []
with_prov = 0
for entry in entries:
prov = entry.get("provenance")
if prov:
with_prov += 1
models[prov.get("model", "unknown")] += 1
sources[prov.get("source", "unknown")] += 1
ts = prov.get("timestamp", "")
if ts:
timestamps.append(ts[:10]) # Date only
else:
models["(no provenance)"] += 1
sources["(no provenance)"] += 1
coverage = with_prov / len(entries) * 100 if entries else 0
lines = [
"=" * 50,
"PROVENANCE DASHBOARD",
"=" * 50,
f"Total pairs: {len(entries)}",
f"Provenance coverage: {coverage:.1f}% ({with_prov}/{len(entries)})",
"",
"--- By Model ---",
]
for model, count in models.most_common():
pct = count / len(entries) * 100
lines.append(f" {model:<30} {count:>6} ({pct:.1f}%)")
lines.append("")
lines.append("--- By Source ---")
for source, count in sources.most_common():
pct = count / len(entries) * 100
lines.append(f" {source:<20} {count:>6} ({pct:.1f}%)")
if timestamps:
dates = Counter(timestamps)
lines.append("")
lines.append("--- By Date (top 10) ---")
for date, count in dates.most_common(10):
lines.append(f" {date:<12} {count:>6}")
return "\n".join(lines)
def backfill_provenance(
path,
source: str = "backfill",
model: str = "unknown",
output_path: Optional[str] = None,
) -> dict:
"""Add provenance to all pairs missing it.
Args:
path: Input JSONL file
source: Source type to use for backfilled pairs
model: Model name to use for backfilled pairs
output_path: Output path (defaults to overwriting input)
Returns:
Stats dict
"""
entries = load_jsonl(path)
stats = {"total": len(entries), "backfilled": 0, "already_had": 0}
for entry in entries:
if "provenance" not in entry:
session_id = entry.get("id", f"backfill-{stats['backfilled']}")
entry["provenance"] = make_provenance(
source=source,
source_session_id=session_id,
model=model,
)
stats["backfilled"] += 1
else:
stats["already_had"] += 1
out = Path(output_path) if output_path else Path(path)
save_jsonl(out, entries)
stats["output"] = str(out)
return stats
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Provenance tracking for training data")
sub = parser.add_subparsers(dest="command")
# validate
p_validate = sub.add_parser("validate", help="Validate provenance in a dataset")
p_validate.add_argument("input", help="Input JSONL file")
p_validate.add_argument("--json", action="store_true", help="Output as JSON")
# dashboard
p_dash = sub.add_parser("dashboard", help="Show provenance dashboard")
p_dash.add_argument("input", help="Input JSONL file")
# backfill
p_back = sub.add_parser("backfill", help="Add provenance to pairs missing it")
p_back.add_argument("input", help="Input JSONL file")
p_back.add_argument("--source", default="backfill", help="Source type")
p_back.add_argument("--model", default="unknown", help="Model name")
p_back.add_argument("--output", "-o", help="Output path (default: overwrite)")
args = parser.parse_args()
if args.command == "validate":
report = validate_provenance(args.input)
if args.json:
print(json.dumps(report, indent=2))
else:
print(f"Provenance Validation: {args.input}")
print(f" Total: {report['total']}")
print(f" With provenance: {report['with_provenance']}")
print(f" Missing provenance: {report['missing_provenance']}")
print(f" Missing fields: {report['missing_fields']}")
print(f" Invalid source: {report['invalid_source']}")
print(f" Coverage: {report.get('coverage', 0):.1f}%")
if report["issues"]:
print(f"\n Issues ({len(report['issues'])}):")
for issue in report["issues"][:20]:
print(f" {issue}")
elif args.command == "dashboard":
print(provenance_dashboard(args.input))
elif args.command == "backfill":
stats = backfill_provenance(args.input, args.source, args.model, args.output)
print(f"Backfill complete:")
print(f" Total: {stats['total']}")
print(f" Backfilled: {stats['backfilled']}")
print(f" Already had provenance: {stats['already_had']}")
print(f" Output: {stats['output']}")
else:
parser.print_help()