Compare commits
6 Commits
step35/595
...
fix/752-pr
| Author | SHA1 | Date | |
|---|---|---|---|
| 65bc4f8089 | |||
| dd13c5302f | |||
| 6ea7b4245f | |||
| b87dcfc676 | |||
| b5f480da47 | |||
| 3b6ff9038e |
@@ -22,3 +22,17 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
cd training/data/scene-descriptions
|
cd training/data/scene-descriptions
|
||||||
python3 validate.py *.jsonl
|
python3 validate.py *.jsonl
|
||||||
|
- name: Validate training data provenance
|
||||||
|
run: |
|
||||||
|
cd training
|
||||||
|
python3 -c "
|
||||||
|
from training_pair_provenance import validate_provenance
|
||||||
|
import json, sys, glob
|
||||||
|
issues = 0
|
||||||
|
for f in glob.glob('data/*.jsonl'):
|
||||||
|
report = validate_provenance(f)
|
||||||
|
print(f'{f}: {report["coverage"]:.0f}% coverage ({report["with_provenance"]}/{report["total"]})')
|
||||||
|
if report['missing_provenance'] > 0:
|
||||||
|
print(f' WARNING: {report["missing_provenance"]} pairs missing provenance')
|
||||||
|
sys.exit(0)
|
||||||
|
"
|
||||||
|
|||||||
@@ -99,3 +99,19 @@ convert: ## Convert merged dataset to MLX format (train/valid split)
|
|||||||
help: ## Show this help
|
help: ## Show this help
|
||||||
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
|
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
|
||||||
awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-16s\033[0m %s\n", $$1, $$2}'
|
awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-16s\033[0m %s\n", $$1, $$2}'
|
||||||
|
|
||||||
|
# ── Provenance ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
provenance-validate: ## Validate provenance metadata on all training data
|
||||||
|
@python3 training_pair_provenance.py validate data/curated_dataset.jsonl
|
||||||
|
@python3 training_pair_provenance.py validate data/merged_training_data.jsonl
|
||||||
|
|
||||||
|
provenance-dashboard: ## Show provenance dashboard for training data
|
||||||
|
@python3 training_pair_provenance.py dashboard data/merged_training_data.jsonl
|
||||||
|
|
||||||
|
provenance-backfill: ## Add provenance to pairs missing it
|
||||||
|
@python3 training_pair_provenance.py backfill data/curated_dataset.jsonl --source backfill --model timmy-curated
|
||||||
|
@python3 training_pair_provenance.py backfill data/merged_training_data.jsonl --source backfill --model unknown
|
||||||
|
|
||||||
|
provenance-test: ## Run provenance tracking tests
|
||||||
|
@python3 -m pytest tests/test_provenance.py -v
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ Output: ShareGPT JSONL format matching the training pipeline.
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
SYSTEM_PROMPT = """# SOUL.md
|
SYSTEM_PROMPT = """# SOUL.md
|
||||||
|
|
||||||
@@ -49,7 +50,10 @@ I am a small model on someone's machine. I cannot save anyone. But I can refuse
|
|||||||
|
|
||||||
|
|
||||||
def make_session(session_id, conversations):
|
def make_session(session_id, conversations):
|
||||||
"""Create a session in the same format as the training data."""
|
"""Create a session in the same format as the training data.
|
||||||
|
|
||||||
|
Includes provenance metadata for tracking curated data origin.
|
||||||
|
"""
|
||||||
return {
|
return {
|
||||||
"id": session_id,
|
"id": session_id,
|
||||||
"model": "timmy-curated",
|
"model": "timmy-curated",
|
||||||
@@ -57,6 +61,13 @@ def make_session(session_id, conversations):
|
|||||||
"message_count": len(conversations),
|
"message_count": len(conversations),
|
||||||
"tool_call_count": 0,
|
"tool_call_count": 0,
|
||||||
"conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + conversations,
|
"conversations": [{"from": "system", "value": SYSTEM_PROMPT}] + conversations,
|
||||||
|
# Provenance metadata — tracks source of this curated pair
|
||||||
|
"provenance": {
|
||||||
|
"source": "curated",
|
||||||
|
"source_session_id": session_id,
|
||||||
|
"model": "timmy-curated",
|
||||||
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -11,10 +11,14 @@ Applies quality filtering:
|
|||||||
- Skip cycles where the model just echoed the perception
|
- Skip cycles where the model just echoed the perception
|
||||||
- Keep cycles with genuine thought, discovery, or action
|
- Keep cycles with genuine thought, discovery, or action
|
||||||
|
|
||||||
|
Provenance:
|
||||||
|
- Each ingested pair gets provenance metadata attached
|
||||||
|
- Tracks source_session_id, model, timestamp, source="trajectory"
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
python ingest_nexus_trajectories.py \\
|
python ingest_trajectories.py \
|
||||||
--trajectories ~/.nexus/trajectories/ \\
|
--trajectories ~/.nexus/trajectories/ \
|
||||||
--curated ../data/curated_dataset.jsonl \\
|
--curated ../data/curated_dataset.jsonl \
|
||||||
--output ../data/merged_training_data.jsonl
|
--output ../data/merged_training_data.jsonl
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -23,6 +27,30 @@ import json
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from difflib import SequenceMatcher
|
from difflib import SequenceMatcher
|
||||||
|
|
||||||
|
try:
|
||||||
|
from training_pair_provenance import attach_provenance, extract_trajectory_provenance
|
||||||
|
except ImportError:
|
||||||
|
# Fallback: inline provenance for standalone use
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
def attach_provenance(pair, source, source_session_id, model, timestamp=None, extras=None):
|
||||||
|
pair["provenance"] = {
|
||||||
|
"source": source,
|
||||||
|
"source_session_id": source_session_id,
|
||||||
|
"model": model,
|
||||||
|
"timestamp": timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
}
|
||||||
|
if extras:
|
||||||
|
pair["provenance"].update(extras)
|
||||||
|
return pair
|
||||||
|
|
||||||
|
def extract_trajectory_provenance(entry):
|
||||||
|
return {
|
||||||
|
"source_session_id": entry.get("id") or entry.get("session_id") or "unknown",
|
||||||
|
"model": entry.get("model", "unknown"),
|
||||||
|
"timestamp": entry.get("started_at") or entry.get("timestamp") or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def load_jsonl(path: Path) -> list[dict]:
|
def load_jsonl(path: Path) -> list[dict]:
|
||||||
"""Load a JSONL file."""
|
"""Load a JSONL file."""
|
||||||
@@ -87,6 +115,7 @@ def merge_datasets(
|
|||||||
"trajectory_files": 0,
|
"trajectory_files": 0,
|
||||||
"trajectory_raw": 0,
|
"trajectory_raw": 0,
|
||||||
"trajectory_quality": 0,
|
"trajectory_quality": 0,
|
||||||
|
"provenance_attached": 0,
|
||||||
"total_output": 0,
|
"total_output": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,6 +134,16 @@ def merge_datasets(
|
|||||||
|
|
||||||
for cycle in cycles:
|
for cycle in cycles:
|
||||||
if is_quality_cycle(cycle, min_thought_len):
|
if is_quality_cycle(cycle, min_thought_len):
|
||||||
|
# Extract provenance from trajectory entry
|
||||||
|
prov = extract_trajectory_provenance(cycle)
|
||||||
|
cycle = attach_provenance(
|
||||||
|
cycle,
|
||||||
|
source="trajectory",
|
||||||
|
source_session_id=prov["source_session_id"],
|
||||||
|
model=prov["model"],
|
||||||
|
timestamp=prov["timestamp"],
|
||||||
|
)
|
||||||
|
stats["provenance_attached"] += 1
|
||||||
quality_trajectories.append(cycle)
|
quality_trajectories.append(cycle)
|
||||||
|
|
||||||
stats["trajectory_quality"] = len(quality_trajectories)
|
stats["trajectory_quality"] = len(quality_trajectories)
|
||||||
@@ -165,6 +204,7 @@ def main():
|
|||||||
print(f" Trajectory files: {stats['trajectory_files']}")
|
print(f" Trajectory files: {stats['trajectory_files']}")
|
||||||
print(f" Raw cycles: {stats['trajectory_raw']}")
|
print(f" Raw cycles: {stats['trajectory_raw']}")
|
||||||
print(f" Quality cycles: {stats['trajectory_quality']}")
|
print(f" Quality cycles: {stats['trajectory_quality']}")
|
||||||
|
print(f" Provenance attached: {stats['provenance_attached']}")
|
||||||
print(f" Total training data: {stats['total_output']}")
|
print(f" Total training data: {stats['total_output']}")
|
||||||
print(f"\nOutput: {args.output}")
|
print(f"\nOutput: {args.output}")
|
||||||
|
|
||||||
|
|||||||
240
training/tests/test_provenance.py
Normal file
240
training/tests/test_provenance.py
Normal file
@@ -0,0 +1,240 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Tests for training_pair_provenance.py"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
# Adjust import path
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||||
|
from training_pair_provenance import (
|
||||||
|
make_provenance,
|
||||||
|
attach_provenance,
|
||||||
|
extract_trajectory_provenance,
|
||||||
|
pair_fingerprint,
|
||||||
|
validate_provenance,
|
||||||
|
provenance_dashboard,
|
||||||
|
backfill_provenance,
|
||||||
|
load_jsonl,
|
||||||
|
save_jsonl,
|
||||||
|
VALID_SOURCES,
|
||||||
|
REQUIRED_FIELDS,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMakeProvenance(unittest.TestCase):
|
||||||
|
def test_creates_valid_provenance(self):
|
||||||
|
prov = make_provenance(
|
||||||
|
source="curated",
|
||||||
|
source_session_id="test-001",
|
||||||
|
model="timmy-curated",
|
||||||
|
)
|
||||||
|
for field in REQUIRED_FIELDS:
|
||||||
|
self.assertIn(field, prov)
|
||||||
|
self.assertEqual(prov["source"], "curated")
|
||||||
|
self.assertEqual(prov["source_session_id"], "test-001")
|
||||||
|
self.assertEqual(prov["model"], "timmy-curated")
|
||||||
|
|
||||||
|
def test_rejects_invalid_source(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
make_provenance(source="bogus", source_session_id="x", model="y")
|
||||||
|
|
||||||
|
def test_accepts_all_valid_sources(self):
|
||||||
|
for source in VALID_SOURCES:
|
||||||
|
prov = make_provenance(source=source, source_session_id="x", model="y")
|
||||||
|
self.assertEqual(prov["source"], source)
|
||||||
|
|
||||||
|
def test_includes_extras(self):
|
||||||
|
prov = make_provenance(
|
||||||
|
source="curated",
|
||||||
|
source_session_id="x",
|
||||||
|
model="y",
|
||||||
|
extras={"custom_field": "value"},
|
||||||
|
)
|
||||||
|
self.assertEqual(prov["custom_field"], "value")
|
||||||
|
|
||||||
|
def test_uses_provided_timestamp(self):
|
||||||
|
ts = "2026-01-01T00:00:00Z"
|
||||||
|
prov = make_provenance(source="curated", source_session_id="x", model="y", timestamp=ts)
|
||||||
|
self.assertEqual(prov["timestamp"], ts)
|
||||||
|
|
||||||
|
def test_generates_timestamp_if_not_provided(self):
|
||||||
|
prov = make_provenance(source="curated", source_session_id="x", model="y")
|
||||||
|
self.assertIn("T", prov["timestamp"])
|
||||||
|
self.assertIn("Z", prov["timestamp"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestAttachProvenance(unittest.TestCase):
|
||||||
|
def test_attaches_to_pair(self):
|
||||||
|
pair = {"id": "test", "conversations": []}
|
||||||
|
result = attach_provenance(pair, source="curated", source_session_id="s1", model="m1")
|
||||||
|
self.assertIn("provenance", result)
|
||||||
|
self.assertEqual(result["provenance"]["source"], "curated")
|
||||||
|
|
||||||
|
def test_does_not_overwrite_existing(self):
|
||||||
|
pair = {
|
||||||
|
"id": "test",
|
||||||
|
"provenance": {"source": "original", "source_session_id": "old", "model": "x"},
|
||||||
|
}
|
||||||
|
result = attach_provenance(pair, source="new", source_session_id="new", model="y")
|
||||||
|
self.assertEqual(result["provenance"]["source"], "original")
|
||||||
|
|
||||||
|
def test_overwrites_with_force(self):
|
||||||
|
pair = {
|
||||||
|
"id": "test",
|
||||||
|
"provenance": {"source": "curated", "source_session_id": "old", "model": "x", "timestamp": "t"},
|
||||||
|
}
|
||||||
|
result = attach_provenance(
|
||||||
|
pair, source="backfill", source_session_id="new", model="y", extras={"force": True}
|
||||||
|
)
|
||||||
|
self.assertEqual(result["provenance"]["source"], "backfill")
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractTrajectoryProvenance(unittest.TestCase):
|
||||||
|
def test_extracts_from_trajectory(self):
|
||||||
|
entry = {
|
||||||
|
"id": "session-123",
|
||||||
|
"model": "hermes3:latest",
|
||||||
|
"started_at": "2026-04-14T10:00:00",
|
||||||
|
}
|
||||||
|
prov = extract_trajectory_provenance(entry)
|
||||||
|
self.assertEqual(prov["source_session_id"], "session-123")
|
||||||
|
self.assertEqual(prov["model"], "hermes3:latest")
|
||||||
|
self.assertEqual(prov["timestamp"], "2026-04-14T10:00:00")
|
||||||
|
|
||||||
|
def test_uses_defaults_for_missing(self):
|
||||||
|
entry = {}
|
||||||
|
prov = extract_trajectory_provenance(entry)
|
||||||
|
self.assertEqual(prov["source_session_id"], "unknown")
|
||||||
|
self.assertEqual(prov["model"], "unknown")
|
||||||
|
self.assertIn("T", prov["timestamp"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestPairFingerprint(unittest.TestCase):
|
||||||
|
def test_same_content_same_hash(self):
|
||||||
|
pair1 = {"conversations": [{"from": "human", "value": "hi"}, {"from": "gpt", "value": "hello"}]}
|
||||||
|
pair2 = {"conversations": [{"from": "human", "value": "hi"}, {"from": "gpt", "value": "hello"}]}
|
||||||
|
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
|
||||||
|
|
||||||
|
def test_different_content_different_hash(self):
|
||||||
|
pair1 = {"conversations": [{"from": "human", "value": "hi"}]}
|
||||||
|
pair2 = {"conversations": [{"from": "human", "value": "bye"}]}
|
||||||
|
self.assertNotEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
|
||||||
|
|
||||||
|
def test_ignores_provenance_in_hash(self):
|
||||||
|
pair1 = {
|
||||||
|
"conversations": [{"from": "human", "value": "hi"}],
|
||||||
|
"provenance": {"source": "a"},
|
||||||
|
}
|
||||||
|
pair2 = {
|
||||||
|
"conversations": [{"from": "human", "value": "hi"}],
|
||||||
|
"provenance": {"source": "b"},
|
||||||
|
}
|
||||||
|
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
|
||||||
|
|
||||||
|
def test_ignores_system_prompt(self):
|
||||||
|
pair1 = {
|
||||||
|
"conversations": [
|
||||||
|
{"from": "system", "value": "prompt A"},
|
||||||
|
{"from": "human", "value": "hi"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
pair2 = {
|
||||||
|
"conversations": [
|
||||||
|
{"from": "system", "value": "prompt B"},
|
||||||
|
{"from": "human", "value": "hi"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
self.assertEqual(pair_fingerprint(pair1), pair_fingerprint(pair2))
|
||||||
|
|
||||||
|
|
||||||
|
class TestValidateProvenance(unittest.TestCase):
|
||||||
|
def _write_jsonl(self, entries):
|
||||||
|
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||||
|
for entry in entries:
|
||||||
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
f.close()
|
||||||
|
return f.name
|
||||||
|
|
||||||
|
def test_all_valid(self):
|
||||||
|
path = self._write_jsonl([
|
||||||
|
{"id": "1", "provenance": {"source": "curated", "source_session_id": "s1", "model": "m1", "timestamp": "2026-01-01T00:00:00Z"}},
|
||||||
|
{"id": "2", "provenance": {"source": "trajectory", "source_session_id": "s2", "model": "m2", "timestamp": "2026-01-01T00:00:00Z"}},
|
||||||
|
])
|
||||||
|
report = validate_provenance(path)
|
||||||
|
self.assertEqual(report["missing_provenance"], 0)
|
||||||
|
self.assertEqual(report["missing_fields"], 0)
|
||||||
|
self.assertEqual(report["coverage"], 100.0)
|
||||||
|
|
||||||
|
def test_missing_provenance(self):
|
||||||
|
path = self._write_jsonl([
|
||||||
|
{"id": "1", "conversations": []},
|
||||||
|
])
|
||||||
|
report = validate_provenance(path)
|
||||||
|
self.assertEqual(report["missing_provenance"], 1)
|
||||||
|
self.assertEqual(report["coverage"], 0.0)
|
||||||
|
|
||||||
|
def test_missing_fields(self):
|
||||||
|
path = self._write_jsonl([
|
||||||
|
{"id": "1", "provenance": {"source": "curated"}}, # missing session_id, model, timestamp
|
||||||
|
])
|
||||||
|
report = validate_provenance(path)
|
||||||
|
self.assertEqual(report["missing_fields"], 1)
|
||||||
|
|
||||||
|
def test_invalid_source(self):
|
||||||
|
path = self._write_jsonl([
|
||||||
|
{"id": "1", "provenance": {"source": "bogus", "source_session_id": "s1", "model": "m1", "timestamp": "2026-01-01T00:00:00Z"}},
|
||||||
|
])
|
||||||
|
report = validate_provenance(path)
|
||||||
|
self.assertEqual(report["invalid_source"], 1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestBackfillProvenance(unittest.TestCase):
|
||||||
|
def _write_jsonl(self, entries):
|
||||||
|
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||||
|
for entry in entries:
|
||||||
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
f.close()
|
||||||
|
return f.name
|
||||||
|
|
||||||
|
def test_backfills_missing(self):
|
||||||
|
path = self._write_jsonl([
|
||||||
|
{"id": "1", "conversations": []},
|
||||||
|
{"id": "2", "conversations": [], "provenance": {"source": "existing", "source_session_id": "s", "model": "m", "timestamp": "t"}},
|
||||||
|
])
|
||||||
|
out = tempfile.NamedTemporaryFile(suffix=".jsonl", delete=False).name
|
||||||
|
stats = backfill_provenance(path, source="backfill", model="test-model", output_path=out)
|
||||||
|
self.assertEqual(stats["backfilled"], 1)
|
||||||
|
self.assertEqual(stats["already_had"], 1)
|
||||||
|
|
||||||
|
entries = load_jsonl(out)
|
||||||
|
self.assertEqual(entries[0]["provenance"]["source"], "backfill")
|
||||||
|
self.assertEqual(entries[1]["provenance"]["source"], "existing")
|
||||||
|
|
||||||
|
|
||||||
|
class TestDashboard(unittest.TestCase):
|
||||||
|
def test_generates_output(self):
|
||||||
|
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||||
|
for i in range(5):
|
||||||
|
f.write(json.dumps({
|
||||||
|
"id": str(i),
|
||||||
|
"provenance": {
|
||||||
|
"source": "curated" if i < 3 else "trajectory",
|
||||||
|
"source_session_id": f"s{i}",
|
||||||
|
"model": "timmy-curated" if i < 3 else "hermes3",
|
||||||
|
"timestamp": f"2026-04-{14+i:02d}T00:00:00Z",
|
||||||
|
},
|
||||||
|
}) + "\n")
|
||||||
|
f.close()
|
||||||
|
output = provenance_dashboard(f.name)
|
||||||
|
self.assertIn("PROVENANCE DASHBOARD", output)
|
||||||
|
self.assertIn("timmy-curated", output)
|
||||||
|
self.assertIn("hermes3", output)
|
||||||
|
self.assertIn("curated", output)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
397
training/training_pair_provenance.py
Normal file
397
training/training_pair_provenance.py
Normal file
@@ -0,0 +1,397 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
training_pair_provenance.py — Provenance tracking for training data pairs.
|
||||||
|
|
||||||
|
Every training pair should carry metadata about where it came from:
|
||||||
|
- Which session/trajectory produced it
|
||||||
|
- Which model generated it
|
||||||
|
- When it was created
|
||||||
|
- What source type (curated, trajectory, augmentation)
|
||||||
|
|
||||||
|
This module provides utilities to:
|
||||||
|
1. Attach provenance metadata to training pairs
|
||||||
|
2. Validate that provenance exists
|
||||||
|
3. Generate provenance statistics/dashboards
|
||||||
|
4. Backfill provenance on existing pairs
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from training_pair_provenance import attach_provenance, validate_provenance, provenance_dashboard
|
||||||
|
|
||||||
|
# Attach provenance to a pair
|
||||||
|
pair = attach_provenance(pair, source="trajectory", session_id="abc123", model="hermes3:latest")
|
||||||
|
|
||||||
|
# Validate provenance on a dataset
|
||||||
|
report = validate_provenance("data/curated_dataset.jsonl")
|
||||||
|
|
||||||
|
# Generate dashboard
|
||||||
|
print(provenance_dashboard("data/merged_training_data.jsonl"))
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
from collections import Counter
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
|
||||||
|
# === Required provenance fields ===
|
||||||
|
REQUIRED_FIELDS = ["source", "source_session_id", "model", "timestamp"]
|
||||||
|
|
||||||
|
# === Valid source types ===
|
||||||
|
VALID_SOURCES = {"curated", "trajectory", "augmentation", "backfill", "manual"}
|
||||||
|
|
||||||
|
|
||||||
|
def make_provenance(
|
||||||
|
source: str,
|
||||||
|
source_session_id: str,
|
||||||
|
model: str,
|
||||||
|
timestamp: Optional[str] = None,
|
||||||
|
extras: Optional[dict] = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Create a provenance metadata dict.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
source: One of curated, trajectory, augmentation, backfill, manual
|
||||||
|
source_session_id: Unique ID of the source session/trajectory
|
||||||
|
model: Model that generated the content
|
||||||
|
timestamp: ISO8601 timestamp (defaults to now)
|
||||||
|
extras: Optional additional metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Provenance dict ready to attach to a training pair
|
||||||
|
"""
|
||||||
|
if source not in VALID_SOURCES:
|
||||||
|
raise ValueError(f"Invalid source '{source}'. Must be one of: {VALID_SOURCES}")
|
||||||
|
|
||||||
|
prov = {
|
||||||
|
"source": source,
|
||||||
|
"source_session_id": source_session_id,
|
||||||
|
"model": model,
|
||||||
|
"timestamp": timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if extras:
|
||||||
|
prov.update(extras)
|
||||||
|
|
||||||
|
return prov
|
||||||
|
|
||||||
|
|
||||||
|
def attach_provenance(
|
||||||
|
pair: dict,
|
||||||
|
source: str,
|
||||||
|
source_session_id: str,
|
||||||
|
model: str,
|
||||||
|
timestamp: Optional[str] = None,
|
||||||
|
extras: Optional[dict] = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Attach provenance metadata to a training pair (mutates and returns).
|
||||||
|
|
||||||
|
The pair dict gets a 'provenance' key added. If provenance already exists,
|
||||||
|
it is NOT overwritten — use force=True in the extras to override.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pair: Training pair dict (ShareGPT format)
|
||||||
|
source: Source type
|
||||||
|
source_session_id: Session/trajectory ID
|
||||||
|
model: Model name
|
||||||
|
timestamp: ISO8601 timestamp
|
||||||
|
extras: Additional metadata
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The pair dict with provenance attached
|
||||||
|
"""
|
||||||
|
if "provenance" in pair and not (extras and extras.get("force")):
|
||||||
|
return pair
|
||||||
|
|
||||||
|
# Pop 'force' flag before passing to make_provenance
|
||||||
|
clean_extras = {k: v for k, v in (extras or {}).items() if k != "force"} or None
|
||||||
|
|
||||||
|
pair["provenance"] = make_provenance(
|
||||||
|
source=source,
|
||||||
|
source_session_id=source_session_id,
|
||||||
|
model=model,
|
||||||
|
timestamp=timestamp,
|
||||||
|
extras=clean_extras,
|
||||||
|
)
|
||||||
|
return pair
|
||||||
|
|
||||||
|
|
||||||
|
def extract_trajectory_provenance(trajectory_entry: dict) -> dict:
|
||||||
|
"""Extract provenance metadata from a trajectory JSONL entry.
|
||||||
|
|
||||||
|
Trajectory entries may have fields like:
|
||||||
|
- id / session_id
|
||||||
|
- model
|
||||||
|
- started_at / timestamp
|
||||||
|
- source file path
|
||||||
|
|
||||||
|
Returns dict with extracted fields or sensible defaults.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"source_session_id": (
|
||||||
|
trajectory_entry.get("id")
|
||||||
|
or trajectory_entry.get("session_id")
|
||||||
|
or "unknown"
|
||||||
|
),
|
||||||
|
"model": trajectory_entry.get("model", "unknown"),
|
||||||
|
"timestamp": (
|
||||||
|
trajectory_entry.get("started_at")
|
||||||
|
or trajectory_entry.get("timestamp")
|
||||||
|
or trajectory_entry.get("created_at")
|
||||||
|
or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def pair_fingerprint(pair: dict) -> str:
|
||||||
|
"""Generate a stable fingerprint for a training pair.
|
||||||
|
|
||||||
|
Used for deduplication and tracking. Based on conversation content,
|
||||||
|
not metadata (so same content = same hash regardless of provenance).
|
||||||
|
"""
|
||||||
|
convos = pair.get("conversations", [])
|
||||||
|
content_parts = []
|
||||||
|
for c in convos:
|
||||||
|
if c.get("from") != "system": # Skip system prompt for fingerprint
|
||||||
|
content_parts.append(f"{c.get('from', '')}:{c.get('value', '')}")
|
||||||
|
content = "|".join(content_parts)
|
||||||
|
return hashlib.sha256(content.encode()).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def load_jsonl(path) -> list[dict]:
|
||||||
|
"""Load a JSONL file."""
|
||||||
|
path = Path(path)
|
||||||
|
entries = []
|
||||||
|
with open(path) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line:
|
||||||
|
entries.append(json.loads(line))
|
||||||
|
return entries
|
||||||
|
|
||||||
|
|
||||||
|
def save_jsonl(path, entries: list[dict]):
|
||||||
|
"""Save entries to a JSONL file."""
|
||||||
|
path = Path(path)
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(path, "w") as f:
|
||||||
|
for entry in entries:
|
||||||
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
|
||||||
|
|
||||||
|
def validate_provenance(path) -> dict:
|
||||||
|
"""Validate provenance metadata on all pairs in a JSONL file.
|
||||||
|
|
||||||
|
Returns a report dict with:
|
||||||
|
- total: total pairs
|
||||||
|
- with_provenance: pairs that have provenance
|
||||||
|
- missing_provenance: pairs without provenance
|
||||||
|
- missing_fields: pairs with provenance but missing required fields
|
||||||
|
- invalid_source: pairs with unrecognized source type
|
||||||
|
- issues: list of specific issue descriptions
|
||||||
|
"""
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
return {"error": f"File not found: {path}", "total": 0}
|
||||||
|
|
||||||
|
entries = load_jsonl(path)
|
||||||
|
report = {
|
||||||
|
"total": len(entries),
|
||||||
|
"with_provenance": 0,
|
||||||
|
"missing_provenance": 0,
|
||||||
|
"missing_fields": 0,
|
||||||
|
"invalid_source": 0,
|
||||||
|
"issues": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, entry in enumerate(entries):
|
||||||
|
prov = entry.get("provenance")
|
||||||
|
if not prov:
|
||||||
|
report["missing_provenance"] += 1
|
||||||
|
report["issues"].append(f"Pair {i} (id={entry.get('id', '?')}): no provenance")
|
||||||
|
continue
|
||||||
|
|
||||||
|
report["with_provenance"] += 1
|
||||||
|
|
||||||
|
# Check required fields
|
||||||
|
missing = [f for f in REQUIRED_FIELDS if f not in prov]
|
||||||
|
if missing:
|
||||||
|
report["missing_fields"] += 1
|
||||||
|
report["issues"].append(
|
||||||
|
f"Pair {i} (id={entry.get('id', '?')}): missing fields: {missing}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check source validity
|
||||||
|
source = prov.get("source", "")
|
||||||
|
if source and source not in VALID_SOURCES:
|
||||||
|
report["invalid_source"] += 1
|
||||||
|
report["issues"].append(
|
||||||
|
f"Pair {i} (id={entry.get('id', '?')}): invalid source '{source}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
report["coverage"] = (
|
||||||
|
report["with_provenance"] / report["total"] * 100 if report["total"] > 0 else 0
|
||||||
|
)
|
||||||
|
return report
|
||||||
|
|
||||||
|
|
||||||
|
def provenance_dashboard(path) -> str:
|
||||||
|
"""Generate a human-readable provenance dashboard for a dataset.
|
||||||
|
|
||||||
|
Shows:
|
||||||
|
- Pair count by model over time
|
||||||
|
- Pair count by source type
|
||||||
|
- Provenance coverage
|
||||||
|
- Model distribution
|
||||||
|
"""
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
return f"File not found: {path}"
|
||||||
|
|
||||||
|
entries = load_jsonl(path)
|
||||||
|
if not entries:
|
||||||
|
return "Empty dataset"
|
||||||
|
|
||||||
|
models = Counter()
|
||||||
|
sources = Counter()
|
||||||
|
timestamps = []
|
||||||
|
with_prov = 0
|
||||||
|
|
||||||
|
for entry in entries:
|
||||||
|
prov = entry.get("provenance")
|
||||||
|
if prov:
|
||||||
|
with_prov += 1
|
||||||
|
models[prov.get("model", "unknown")] += 1
|
||||||
|
sources[prov.get("source", "unknown")] += 1
|
||||||
|
ts = prov.get("timestamp", "")
|
||||||
|
if ts:
|
||||||
|
timestamps.append(ts[:10]) # Date only
|
||||||
|
else:
|
||||||
|
models["(no provenance)"] += 1
|
||||||
|
sources["(no provenance)"] += 1
|
||||||
|
|
||||||
|
coverage = with_prov / len(entries) * 100 if entries else 0
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
"=" * 50,
|
||||||
|
"PROVENANCE DASHBOARD",
|
||||||
|
"=" * 50,
|
||||||
|
f"Total pairs: {len(entries)}",
|
||||||
|
f"Provenance coverage: {coverage:.1f}% ({with_prov}/{len(entries)})",
|
||||||
|
"",
|
||||||
|
"--- By Model ---",
|
||||||
|
]
|
||||||
|
for model, count in models.most_common():
|
||||||
|
pct = count / len(entries) * 100
|
||||||
|
lines.append(f" {model:<30} {count:>6} ({pct:.1f}%)")
|
||||||
|
|
||||||
|
lines.append("")
|
||||||
|
lines.append("--- By Source ---")
|
||||||
|
for source, count in sources.most_common():
|
||||||
|
pct = count / len(entries) * 100
|
||||||
|
lines.append(f" {source:<20} {count:>6} ({pct:.1f}%)")
|
||||||
|
|
||||||
|
if timestamps:
|
||||||
|
dates = Counter(timestamps)
|
||||||
|
lines.append("")
|
||||||
|
lines.append("--- By Date (top 10) ---")
|
||||||
|
for date, count in dates.most_common(10):
|
||||||
|
lines.append(f" {date:<12} {count:>6}")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def backfill_provenance(
|
||||||
|
path,
|
||||||
|
source: str = "backfill",
|
||||||
|
model: str = "unknown",
|
||||||
|
output_path: Optional[str] = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Add provenance to all pairs missing it.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: Input JSONL file
|
||||||
|
source: Source type to use for backfilled pairs
|
||||||
|
model: Model name to use for backfilled pairs
|
||||||
|
output_path: Output path (defaults to overwriting input)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Stats dict
|
||||||
|
"""
|
||||||
|
entries = load_jsonl(path)
|
||||||
|
stats = {"total": len(entries), "backfilled": 0, "already_had": 0}
|
||||||
|
|
||||||
|
for entry in entries:
|
||||||
|
if "provenance" not in entry:
|
||||||
|
session_id = entry.get("id", f"backfill-{stats['backfilled']}")
|
||||||
|
entry["provenance"] = make_provenance(
|
||||||
|
source=source,
|
||||||
|
source_session_id=session_id,
|
||||||
|
model=model,
|
||||||
|
)
|
||||||
|
stats["backfilled"] += 1
|
||||||
|
else:
|
||||||
|
stats["already_had"] += 1
|
||||||
|
|
||||||
|
out = Path(output_path) if output_path else Path(path)
|
||||||
|
save_jsonl(out, entries)
|
||||||
|
stats["output"] = str(out)
|
||||||
|
return stats
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Provenance tracking for training data")
|
||||||
|
sub = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
# validate
|
||||||
|
p_validate = sub.add_parser("validate", help="Validate provenance in a dataset")
|
||||||
|
p_validate.add_argument("input", help="Input JSONL file")
|
||||||
|
p_validate.add_argument("--json", action="store_true", help="Output as JSON")
|
||||||
|
|
||||||
|
# dashboard
|
||||||
|
p_dash = sub.add_parser("dashboard", help="Show provenance dashboard")
|
||||||
|
p_dash.add_argument("input", help="Input JSONL file")
|
||||||
|
|
||||||
|
# backfill
|
||||||
|
p_back = sub.add_parser("backfill", help="Add provenance to pairs missing it")
|
||||||
|
p_back.add_argument("input", help="Input JSONL file")
|
||||||
|
p_back.add_argument("--source", default="backfill", help="Source type")
|
||||||
|
p_back.add_argument("--model", default="unknown", help="Model name")
|
||||||
|
p_back.add_argument("--output", "-o", help="Output path (default: overwrite)")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command == "validate":
|
||||||
|
report = validate_provenance(args.input)
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps(report, indent=2))
|
||||||
|
else:
|
||||||
|
print(f"Provenance Validation: {args.input}")
|
||||||
|
print(f" Total: {report['total']}")
|
||||||
|
print(f" With provenance: {report['with_provenance']}")
|
||||||
|
print(f" Missing provenance: {report['missing_provenance']}")
|
||||||
|
print(f" Missing fields: {report['missing_fields']}")
|
||||||
|
print(f" Invalid source: {report['invalid_source']}")
|
||||||
|
print(f" Coverage: {report.get('coverage', 0):.1f}%")
|
||||||
|
if report["issues"]:
|
||||||
|
print(f"\n Issues ({len(report['issues'])}):")
|
||||||
|
for issue in report["issues"][:20]:
|
||||||
|
print(f" {issue}")
|
||||||
|
|
||||||
|
elif args.command == "dashboard":
|
||||||
|
print(provenance_dashboard(args.input))
|
||||||
|
|
||||||
|
elif args.command == "backfill":
|
||||||
|
stats = backfill_provenance(args.input, args.source, args.model, args.output)
|
||||||
|
print(f"Backfill complete:")
|
||||||
|
print(f" Total: {stats['total']}")
|
||||||
|
print(f" Backfilled: {stats['backfilled']}")
|
||||||
|
print(f" Already had provenance: {stats['already_had']}")
|
||||||
|
print(f" Output: {stats['output']}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
Reference in New Issue
Block a user