Compare commits

...

6 Commits

6 changed files with 427 additions and 10 deletions

View File

@@ -94,7 +94,22 @@ convert: ## Convert merged dataset to MLX format (train/valid split)
# ── Helpers ───────────────────────────────────────────────────────────
.PHONY: train-cloud train-local eval eval-baseline vibes ingest curated convert help
provenance-check: ## Validate provenance metadata on all training data
\tpython training_pair_provenance.py \
\t\t--validate data/curated_dataset.jsonl \
\t\t--validate data/merged_training_data.jsonl || true
\t@python -c "from training_pair_provenance import ProvenanceTracker, load_jsonl; t=ProvenanceTracker(); [t._track(p) for p in load_jsonl('data/curated_dataset.jsonl')]; print(t.report_text())"
provenance-backfill: ## Backfill provenance metadata on all training data
\tpython -c "from training_pair_provenance import ProvenanceTracker; t=ProvenanceTracker(); print(f'Backfilled curated: {t.backfill_file(\\'data/curated_dataset.jsonl\\', source=\\'backfill\\', model=\\'timmy-curated\\')}'); print(f'Backfilled merged: {t.backfill_file(\\'data/merged_training_data.jsonl\\', source=\\'backfill\\', model=\\'unknown\\')}')"
provenance-dashboard: ## Print provenance dashboard
\tpython -c "from training_pair_provenance import ProvenanceTracker, load_jsonl, provenance_dashboard; from pathlib import Path; import sys; p=Path(sys.argv[1]) if len(sys.argv)>1 else Path('data/merged_training_data.jsonl'); print(provenance_dashboard(p))"
provenance-test: ## Run provenance validation tests
\tpython -m unittest training/tests/test_provenance.py -v
.PHONY: train-cloud train-local eval eval-baseline vibes ingest curated convert provenance-check provenance-backfill provenance-dashboard provenance-test help
help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \

View File

@@ -12,6 +12,8 @@ import json
import time
from pathlib import Path
from training_pair_provenance import ProvenanceTracker
SYSTEM_PROMPT = """# SOUL.md
## Inscription 1 — The Immutable Conscience
@@ -241,31 +243,60 @@ def main():
output_path = Path("data/curated_dataset.jsonl")
output_path.parent.mkdir(parents=True, exist_ok=True)
# Annotate provenance on all exemplars
tracker = ProvenanceTracker()
annotated = []
for exemplar in EXEMPLARS:
exemplar = tracker.annotate(
exemplar,
source="curated",
model="timmy-curated",
session_id=exemplar["id"],
timestamp=exemplar.get("started_at", ""),
)
annotated.append(exemplar)
# Validate
invalid = 0
for pair in annotated:
errs = tracker.validate(pair)
if errs:
print(f"WARN: {pair['id']} provenance invalid: {errs}")
invalid += 1
with open(output_path, "w") as f:
for exemplar in EXEMPLARS:
for exemplar in annotated:
f.write(json.dumps(exemplar) + "\n")
print(f"Curated dataset: {len(EXEMPLARS)} exemplar conversations")
print(f"Curated dataset: {len(annotated)} exemplar conversations")
print(f"Saved to: {output_path}")
# Stats
total_turns = sum(len(e["conversations"]) for e in EXEMPLARS)
total_turns = sum(len(e["conversations"]) for e in annotated)
total_chars = sum(
sum(len(c["value"]) for c in e["conversations"])
for e in EXEMPLARS
for e in annotated
)
print(f"Total turns: {total_turns}")
print(f"Total chars: {total_chars:,}")
# Category breakdown
categories = {}
for e in EXEMPLARS:
for e in annotated:
cat = e["id"].split("_")[1]
categories[cat] = categories.get(cat, 0) + 1
print("\nBy category:")
for cat, count in sorted(categories.items()):
print(f" {cat}: {count}")
# Provenance report
print(f"\n{tracker.report_text()}")
if invalid > 0:
print(f"\nWARNING: {invalid} pairs have invalid provenance!")
else:
print(f"\nAll {len(annotated)} pairs have valid provenance ✓")
if __name__ == "__main__":
main()

View File

@@ -11,10 +11,12 @@ Applies quality filtering:
- Skip cycles where the model just echoed the perception
- Keep cycles with genuine thought, discovery, or action
Adds provenance metadata to every ingested pair.
Usage:
python ingest_nexus_trajectories.py \\
--trajectories ~/.nexus/trajectories/ \\
--curated ../data/curated_dataset.jsonl \\
python ingest_trajectories.py \
--trajectories ~/.nexus/trajectories/ \
--curated ../data/curated_dataset.jsonl \
--output ../data/merged_training_data.jsonl
"""
@@ -23,6 +25,8 @@ import json
from pathlib import Path
from difflib import SequenceMatcher
from training_pair_provenance import ProvenanceTracker
def load_jsonl(path: Path) -> list[dict]:
"""Load a JSONL file."""
@@ -75,19 +79,36 @@ def is_quality_cycle(cycle: dict, min_thought_len: int = 30) -> bool:
return True
def _extract_session_id(path: Path) -> str:
"""Extract session ID from trajectory filename."""
stem = path.stem
# trajectory_20260315_123456_abc123.jsonl -> 20260315_123456_abc123
if stem.startswith("trajectory_"):
return stem[len("trajectory_"):]
return stem
def _extract_model(cycle: dict) -> str:
"""Extract model name from cycle metadata."""
return cycle.get("model", "unknown")
def merge_datasets(
trajectory_dir: Path,
curated_path: Path,
output_path: Path,
min_thought_len: int = 30,
) -> dict:
"""Merge nexus trajectories with curated dataset."""
"""Merge nexus trajectories with curated dataset. Annotates provenance."""
tracker = ProvenanceTracker()
stats = {
"curated_count": 0,
"trajectory_files": 0,
"trajectory_raw": 0,
"trajectory_quality": 0,
"total_output": 0,
"provenance_valid": 0,
"provenance_invalid": 0,
}
# Load curated
@@ -95,17 +116,32 @@ def merge_datasets(
if curated_path.exists():
curated = load_jsonl(curated_path)
stats["curated_count"] = len(curated)
curated = [
tracker.annotate(pair, source="curated", model="timmy-curated", session_id=pair.get("id", "curated"))
for pair in curated
]
# Load and filter trajectories
quality_trajectories = []
for traj_file in sorted(trajectory_dir.glob("trajectory_*.jsonl")):
stats["trajectory_files"] += 1
session_id = _extract_session_id(traj_file)
cycles = load_jsonl(traj_file)
stats["trajectory_raw"] += len(cycles)
for cycle in cycles:
if is_quality_cycle(cycle, min_thought_len):
model = _extract_model(cycle)
cycle = tracker.annotate(
cycle,
source="trajectory",
model=model,
session_id=session_id,
timestamp=cycle.get("started_at", ""),
)
quality_trajectories.append(cycle)
else:
cycle = tracker.exclude(cycle, "quality_filter")
stats["trajectory_quality"] = len(quality_trajectories)
@@ -113,12 +149,21 @@ def merge_datasets(
merged = curated + quality_trajectories
stats["total_output"] = len(merged)
# Validate all merged pairs
for pair in merged:
errs = tracker.validate(pair)
if errs:
stats["provenance_invalid"] += 1
else:
stats["provenance_valid"] += 1
# Write
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
for entry in merged:
f.write(json.dumps(entry) + "\n")
print(tracker.report_text())
return stats
@@ -166,6 +211,8 @@ def main():
print(f" Raw cycles: {stats['trajectory_raw']}")
print(f" Quality cycles: {stats['trajectory_quality']}")
print(f" Total training data: {stats['total_output']}")
print(f" Provenance valid: {stats['provenance_valid']}")
print(f" Provenance invalid: {stats['provenance_invalid']}")
print(f"\nOutput: {args.output}")

View File

@@ -0,0 +1 @@
# Provenance tests

View File

@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""Tests for training_pair_provenance.py"""
import json, tempfile, unittest
from pathlib import Path
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from training.training_pair_provenance import ProvenanceTracker, load_jsonl, write_jsonl
class TestAnnotate(unittest.TestCase):
def test_annotate_adds_required_fields(self):
t = ProvenanceTracker()
p = {"conversations": [{"from": "human", "value": "hi"}]}
r = t.annotate(p, source="trajectory", model="hermes4:14b", session_id="s1")
m = r["provenance"]
self.assertEqual(m["source"], "trajectory")
self.assertEqual(m["model"], "hermes4:14b")
self.assertTrue(m["approved"])
self.assertNotEqual(m["timestamp"], "")
def test_exclude_sets_flag(self):
t = ProvenanceTracker()
p = {"conversations": []}
t.annotate(p, source="trajectory", model="hermes4:14b")
r = t.exclude(p, "quality_filter")
self.assertTrue(r["provenance"]["excluded"])
self.assertFalse(r["provenance"]["approved"])
def test_backfill_adds_provenance(self):
t = ProvenanceTracker()
p = {"conversations": []}
r = t.backfill(p, source="backfill", model="unknown")
self.assertEqual(r["provenance"]["source"], "backfill")
class TestValidate(unittest.TestCase):
def test_valid_pair(self):
t = ProvenanceTracker()
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "source_session_id": "c1", "approved": True}}
self.assertEqual(t.validate(p), [])
def test_missing_provenance(self):
t = ProvenanceTracker()
self.assertTrue(any("missing" in e for e in t.validate({"conversations": []})))
def test_missing_field(self):
t = ProvenanceTracker()
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "approved": True}}
self.assertTrue(any("source_session_id" in e for e in t.validate(p)))
def test_excluded_no_reason(self):
t = ProvenanceTracker()
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "source_session_id": "c1", "approved": True, "excluded": True}}
self.assertTrue(any("exclusion_reason" in e for e in t.validate(p)))
class TestReport(unittest.TestCase):
def test_report_counts(self):
t = ProvenanceTracker()
for i in range(5):
t.annotate({"conversations": []}, source="trajectory", model="hermes4:14b", session_id=f"s{i}")
for i in range(3):
t.annotate({"conversations": []}, source="curated", model="timmy-curated", session_id=f"c{i}")
r = t.report()
self.assertEqual(r["total"], 8)
self.assertEqual(r["approved"], 8)
self.assertEqual(r["by_source"]["trajectory"], 5)
self.assertEqual(r["by_source"]["curated"], 3)
class TestBackfillFile(unittest.TestCase):
def test_round_trip(self):
t = ProvenanceTracker()
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
for i in range(3):
f.write(json.dumps({"conversations": [{"from": "human", "value": f"p{i}"}]}) + "
")
p = Path(f.name)
try:
cnt = t.backfill_file(p, source="backfill", model="unknown")
self.assertEqual(cnt, 3)
loaded = load_jsonl(p)
for pair in loaded:
self.assertEqual(pair["provenance"]["source"], "backfill")
finally:
p.unlink()
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@@ -0,0 +1,233 @@
#!/usr/bin/env python3
"""Training pair provenance tracking.
Every training pair in the pipeline must carry provenance metadata:
- source_session_id: origin session identifier
- source: trajectory|curated|backfill
- model: model that produced the assistant turn
- timestamp: ISO 8601 when the pair was captured
- excluded: bool + reason if filtered out during quality screening
- approved: bool (default True for accepted pairs)
Usage:
from training_pair_provenance import ProvenanceTracker
tracker = ProvenanceTracker()
for pair in pairs:
pair = tracker.annotate(pair, source="trajectory", model="hermes4:14b")
tracker.report()
"""
import json
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
REQUIRED_FIELDS = ("source_session_id", "source", "model", "timestamp", "approved")
@dataclass
class ProvenanceMeta:
source_session_id: str = ""
source: str = "" # trajectory | curated | backfill
model: str = ""
timestamp: str = ""
excluded: bool = False
exclusion_reason: str = ""
approved: bool = True
class ProvenanceTracker:
"""Annotate and validate provenance metadata on training pairs."""
def __init__(self):
self._stats = {
"total": 0,
"approved": 0,
"excluded": 0,
"missing_provenance": 0,
"by_source": {},
"by_model": {},
}
# ── annotation ─────────────────────────────────────────────
def annotate(
self,
pair: dict,
*,
source: str,
model: str,
session_id: str = "",
timestamp: str = "",
) -> dict:
"""Attach provenance metadata to a training pair dict."""
meta = pair.get("provenance", {})
meta.setdefault("source", source)
meta.setdefault("model", model)
if session_id:
meta.setdefault("source_session_id", session_id)
if timestamp:
meta.setdefault("timestamp", timestamp)
if "timestamp" not in meta:
meta["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
if "approved" not in meta:
meta["approved"] = True
pair["provenance"] = meta
self._track(pair)
return pair
def exclude(self, pair: dict, reason: str) -> dict:
"""Mark pair as excluded with a reason."""
meta = pair.get("provenance", {})
meta["excluded"] = True
meta["exclusion_reason"] = reason
meta["approved"] = False
pair["provenance"] = meta
return pair
# ── backfill ───────────────────────────────────────────────
def backfill(
self,
pair: dict,
*,
source: str,
model: str,
) -> dict:
"""Add provenance to a pair that has none (historical data)."""
meta = pair.get("provenance")
if meta and meta.get("source") and meta.get("model"):
return pair # already has provenance
return self.annotate(pair, source=source, model=model, session_id="backfill")
def backfill_file(self, path: Path, *, source: str, model: str) -> int:
"""Backfill provenance on an entire JSONL file. Returns count updated."""
pairs = []
count = 0
with open(path) as f:
for line in f:
line = line.strip()
if line:
pair = json.loads(line)
pair = self.backfill(pair, source=source, model=model)
pairs.append(pair)
count += 1
with open(path, "w") as f:
for pair in pairs:
f.write(json.dumps(pair) + "\n")
return count
# ── validation ─────────────────────────────────────────────
def validate(self, pair: dict) -> list[str]:
"""Return list of validation errors for a pair. Empty = valid."""
errors = []
meta = pair.get("provenance")
if not meta:
return ["missing provenance metadata"]
for field_name in REQUIRED_FIELDS:
if field_name not in meta:
errors.append(f"provenance missing field: {field_name}")
if meta.get("excluded") and not meta.get("exclusion_reason"):
errors.append("excluded pair missing exclusion_reason")
return errors
def validate_file(self, path: Path) -> dict:
"""Validate all pairs in a JSONL file. Returns {valid, invalid, errors}."""
results = {"valid": 0, "invalid": 0, "errors": []}
with open(path) as f:
for i, line in enumerate(f, 1):
line = line.strip()
if line:
pair = json.loads(line)
errs = self.validate(pair)
if errs:
results["invalid"] += 1
results["errors"].append({"line": i, "errors": errs})
else:
results["valid"] += 1
return results
# ── reporting ──────────────────────────────────────────────
def _track(self, pair: dict):
meta = pair.get("provenance", {})
self._stats["total"] += 1
if meta.get("approved"):
self._stats["approved"] += 1
if meta.get("excluded"):
self._stats["excluded"] += 1
source = meta.get("source", "unknown")
self._stats["by_source"][source] = self._stats["by_source"].get(source, 0) + 1
model = meta.get("model", "unknown")
self._stats["by_model"][model] = self._stats["by_model"].get(model, 0) + 1
def report(self) -> dict:
"""Return provenance statistics dict."""
return dict(self._stats)
def report_text(self) -> str:
"""Return human-readable provenance report."""
s = self._stats
lines = [
"Provenance Report",
"=" * 40,
f" Total pairs: {s['total']}",
f" Approved: {s['approved']}",
f" Excluded: {s['excluded']}",
"",
" By source:",
]
for source, count in sorted(s["by_source"].items()):
lines.append(f" {source:20s} {count}")
lines.append(" By model:")
for model, count in sorted(s["by_model"].items()):
lines.append(f" {model:20s} {count}")
return "\n".join(lines)
def load_jsonl(path: Path) -> list[dict]:
entries = []
with open(path) as f:
for line in f:
line = line.strip()
if line:
entries.append(json.loads(line))
return entries
def write_jsonl(path: Path, pairs: list[dict]):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
for pair in pairs:
f.write(json.dumps(pair) + "\n")
def provenance_dashboard(path: Path) -> str:
"""Generate a provenance dashboard from a JSONL file."""
pairs = load_jsonl(path)
tracker = ProvenanceTracker()
for pair in pairs:
tracker._track(pair)
report = tracker.report()
lines = [
"╔══════════════════════════════════════╗",
"║ Training Provenance Dashboard ║",
"╠══════════════════════════════════════╣",
f"║ Total pairs: {report['total']:>12}",
f"║ Approved: {report['approved']:>12}",
f"║ Excluded: {report['excluded']:>12}",
f"║ Provenance coverage:{'>0%' if report['total'] == 0 else f'{(report[\"approved\"] + report[\"excluded\"])*100 // max(report[\"total\"], 1)}%':>12s} ║",
"╠══════════════════════════════════════╣",
"║ By Source ║",
]
for source, count in sorted(report["by_source"].items()):
lines.append(f"{source:20s} {count:>8}")
lines.append("╠══════════════════════════════════════╣")
lines.append("║ By Model ║")
for model, count in sorted(report["by_model"].items()):
lines.append(f"{model:20s} {count:>8}")
lines.append("╚══════════════════════════════════════╝")
return "\n".join(lines)