Compare commits
6 Commits
issue-750-
...
fix/752-pr
| Author | SHA1 | Date | |
|---|---|---|---|
| 18a7dfc27e | |||
| 2b607f4eaf | |||
| 08c2c5b945 | |||
| 38eea190cb | |||
| f6e696f519 | |||
| 172ab0ec61 |
@@ -94,7 +94,22 @@ convert: ## Convert merged dataset to MLX format (train/valid split)
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
.PHONY: train-cloud train-local eval eval-baseline vibes ingest curated convert help
|
||||
provenance-check: ## Validate provenance metadata on all training data
|
||||
\tpython training_pair_provenance.py \
|
||||
\t\t--validate data/curated_dataset.jsonl \
|
||||
\t\t--validate data/merged_training_data.jsonl || true
|
||||
\t@python -c "from training_pair_provenance import ProvenanceTracker, load_jsonl; t=ProvenanceTracker(); [t._track(p) for p in load_jsonl('data/curated_dataset.jsonl')]; print(t.report_text())"
|
||||
|
||||
provenance-backfill: ## Backfill provenance metadata on all training data
|
||||
\tpython -c "from training_pair_provenance import ProvenanceTracker; t=ProvenanceTracker(); print(f'Backfilled curated: {t.backfill_file(\\'data/curated_dataset.jsonl\\', source=\\'backfill\\', model=\\'timmy-curated\\')}'); print(f'Backfilled merged: {t.backfill_file(\\'data/merged_training_data.jsonl\\', source=\\'backfill\\', model=\\'unknown\\')}')"
|
||||
|
||||
provenance-dashboard: ## Print provenance dashboard
|
||||
\tpython -c "from training_pair_provenance import ProvenanceTracker, load_jsonl, provenance_dashboard; from pathlib import Path; import sys; p=Path(sys.argv[1]) if len(sys.argv)>1 else Path('data/merged_training_data.jsonl'); print(provenance_dashboard(p))"
|
||||
|
||||
provenance-test: ## Run provenance validation tests
|
||||
\tpython -m unittest training/tests/test_provenance.py -v
|
||||
|
||||
.PHONY: train-cloud train-local eval eval-baseline vibes ingest curated convert provenance-check provenance-backfill provenance-dashboard provenance-test help
|
||||
|
||||
help: ## Show this help
|
||||
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | \
|
||||
|
||||
@@ -12,6 +12,8 @@ import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from training_pair_provenance import ProvenanceTracker
|
||||
|
||||
SYSTEM_PROMPT = """# SOUL.md
|
||||
|
||||
## Inscription 1 — The Immutable Conscience
|
||||
@@ -241,31 +243,60 @@ def main():
|
||||
output_path = Path("data/curated_dataset.jsonl")
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Annotate provenance on all exemplars
|
||||
tracker = ProvenanceTracker()
|
||||
annotated = []
|
||||
for exemplar in EXEMPLARS:
|
||||
exemplar = tracker.annotate(
|
||||
exemplar,
|
||||
source="curated",
|
||||
model="timmy-curated",
|
||||
session_id=exemplar["id"],
|
||||
timestamp=exemplar.get("started_at", ""),
|
||||
)
|
||||
annotated.append(exemplar)
|
||||
|
||||
# Validate
|
||||
invalid = 0
|
||||
for pair in annotated:
|
||||
errs = tracker.validate(pair)
|
||||
if errs:
|
||||
print(f"WARN: {pair['id']} provenance invalid: {errs}")
|
||||
invalid += 1
|
||||
|
||||
with open(output_path, "w") as f:
|
||||
for exemplar in EXEMPLARS:
|
||||
for exemplar in annotated:
|
||||
f.write(json.dumps(exemplar) + "\n")
|
||||
|
||||
print(f"Curated dataset: {len(EXEMPLARS)} exemplar conversations")
|
||||
print(f"Curated dataset: {len(annotated)} exemplar conversations")
|
||||
print(f"Saved to: {output_path}")
|
||||
|
||||
# Stats
|
||||
total_turns = sum(len(e["conversations"]) for e in EXEMPLARS)
|
||||
total_turns = sum(len(e["conversations"]) for e in annotated)
|
||||
total_chars = sum(
|
||||
sum(len(c["value"]) for c in e["conversations"])
|
||||
for e in EXEMPLARS
|
||||
for e in annotated
|
||||
)
|
||||
print(f"Total turns: {total_turns}")
|
||||
print(f"Total chars: {total_chars:,}")
|
||||
|
||||
# Category breakdown
|
||||
categories = {}
|
||||
for e in EXEMPLARS:
|
||||
for e in annotated:
|
||||
cat = e["id"].split("_")[1]
|
||||
categories[cat] = categories.get(cat, 0) + 1
|
||||
print("\nBy category:")
|
||||
for cat, count in sorted(categories.items()):
|
||||
print(f" {cat}: {count}")
|
||||
|
||||
# Provenance report
|
||||
print(f"\n{tracker.report_text()}")
|
||||
|
||||
if invalid > 0:
|
||||
print(f"\nWARNING: {invalid} pairs have invalid provenance!")
|
||||
else:
|
||||
print(f"\nAll {len(annotated)} pairs have valid provenance ✓")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -11,10 +11,12 @@ Applies quality filtering:
|
||||
- Skip cycles where the model just echoed the perception
|
||||
- Keep cycles with genuine thought, discovery, or action
|
||||
|
||||
Adds provenance metadata to every ingested pair.
|
||||
|
||||
Usage:
|
||||
python ingest_nexus_trajectories.py \\
|
||||
--trajectories ~/.nexus/trajectories/ \\
|
||||
--curated ../data/curated_dataset.jsonl \\
|
||||
python ingest_trajectories.py \
|
||||
--trajectories ~/.nexus/trajectories/ \
|
||||
--curated ../data/curated_dataset.jsonl \
|
||||
--output ../data/merged_training_data.jsonl
|
||||
"""
|
||||
|
||||
@@ -23,6 +25,8 @@ import json
|
||||
from pathlib import Path
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
from training_pair_provenance import ProvenanceTracker
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
"""Load a JSONL file."""
|
||||
@@ -75,19 +79,36 @@ def is_quality_cycle(cycle: dict, min_thought_len: int = 30) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _extract_session_id(path: Path) -> str:
|
||||
"""Extract session ID from trajectory filename."""
|
||||
stem = path.stem
|
||||
# trajectory_20260315_123456_abc123.jsonl -> 20260315_123456_abc123
|
||||
if stem.startswith("trajectory_"):
|
||||
return stem[len("trajectory_"):]
|
||||
return stem
|
||||
|
||||
|
||||
def _extract_model(cycle: dict) -> str:
|
||||
"""Extract model name from cycle metadata."""
|
||||
return cycle.get("model", "unknown")
|
||||
|
||||
|
||||
def merge_datasets(
|
||||
trajectory_dir: Path,
|
||||
curated_path: Path,
|
||||
output_path: Path,
|
||||
min_thought_len: int = 30,
|
||||
) -> dict:
|
||||
"""Merge nexus trajectories with curated dataset."""
|
||||
"""Merge nexus trajectories with curated dataset. Annotates provenance."""
|
||||
tracker = ProvenanceTracker()
|
||||
stats = {
|
||||
"curated_count": 0,
|
||||
"trajectory_files": 0,
|
||||
"trajectory_raw": 0,
|
||||
"trajectory_quality": 0,
|
||||
"total_output": 0,
|
||||
"provenance_valid": 0,
|
||||
"provenance_invalid": 0,
|
||||
}
|
||||
|
||||
# Load curated
|
||||
@@ -95,17 +116,32 @@ def merge_datasets(
|
||||
if curated_path.exists():
|
||||
curated = load_jsonl(curated_path)
|
||||
stats["curated_count"] = len(curated)
|
||||
curated = [
|
||||
tracker.annotate(pair, source="curated", model="timmy-curated", session_id=pair.get("id", "curated"))
|
||||
for pair in curated
|
||||
]
|
||||
|
||||
# Load and filter trajectories
|
||||
quality_trajectories = []
|
||||
for traj_file in sorted(trajectory_dir.glob("trajectory_*.jsonl")):
|
||||
stats["trajectory_files"] += 1
|
||||
session_id = _extract_session_id(traj_file)
|
||||
cycles = load_jsonl(traj_file)
|
||||
stats["trajectory_raw"] += len(cycles)
|
||||
|
||||
for cycle in cycles:
|
||||
if is_quality_cycle(cycle, min_thought_len):
|
||||
model = _extract_model(cycle)
|
||||
cycle = tracker.annotate(
|
||||
cycle,
|
||||
source="trajectory",
|
||||
model=model,
|
||||
session_id=session_id,
|
||||
timestamp=cycle.get("started_at", ""),
|
||||
)
|
||||
quality_trajectories.append(cycle)
|
||||
else:
|
||||
cycle = tracker.exclude(cycle, "quality_filter")
|
||||
|
||||
stats["trajectory_quality"] = len(quality_trajectories)
|
||||
|
||||
@@ -113,12 +149,21 @@ def merge_datasets(
|
||||
merged = curated + quality_trajectories
|
||||
stats["total_output"] = len(merged)
|
||||
|
||||
# Validate all merged pairs
|
||||
for pair in merged:
|
||||
errs = tracker.validate(pair)
|
||||
if errs:
|
||||
stats["provenance_invalid"] += 1
|
||||
else:
|
||||
stats["provenance_valid"] += 1
|
||||
|
||||
# Write
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(output_path, "w") as f:
|
||||
for entry in merged:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
|
||||
print(tracker.report_text())
|
||||
return stats
|
||||
|
||||
|
||||
@@ -166,6 +211,8 @@ def main():
|
||||
print(f" Raw cycles: {stats['trajectory_raw']}")
|
||||
print(f" Quality cycles: {stats['trajectory_quality']}")
|
||||
print(f" Total training data: {stats['total_output']}")
|
||||
print(f" Provenance valid: {stats['provenance_valid']}")
|
||||
print(f" Provenance invalid: {stats['provenance_invalid']}")
|
||||
print(f"\nOutput: {args.output}")
|
||||
|
||||
|
||||
|
||||
@@ -1,162 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fix Training Data Code Block Indentation
|
||||
Issue #750: Training data code blocks have inconsistent indentation
|
||||
|
||||
Normalizes code block indentation in JSONL training data files using textwrap.dedent.
|
||||
|
||||
Usage:
|
||||
python3 fix_training_indentation.py --input data.jsonl
|
||||
python3 fix_training_indentation.py --input data.jsonl --output fixed.jsonl
|
||||
python3 fix_training_indentation.py --input data.jsonl --dry-run
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def fix_code_block_indentation(text):
|
||||
"""
|
||||
Find code blocks in text and normalize their indentation.
|
||||
|
||||
Handles:
|
||||
- ```python ... ``` blocks
|
||||
- ```bash ... ``` blocks
|
||||
- ``` ... ``` blocks (no language)
|
||||
- Nested code blocks in JSON strings
|
||||
"""
|
||||
if not text or '```' not in text:
|
||||
return text, 0
|
||||
|
||||
fixes = 0
|
||||
result = text
|
||||
|
||||
# Pattern to match code blocks: ```language\n...code...\n```
|
||||
# Also handles cases where code block is indented
|
||||
code_block_pattern = re.compile(
|
||||
r'(```(?:\w+)?\n)(.*?)(```)',
|
||||
re.DOTALL
|
||||
)
|
||||
|
||||
def fix_block(match):
|
||||
nonlocal fixes
|
||||
opening = match.group(1) # ```python\n
|
||||
code = match.group(2) # The code content
|
||||
closing = match.group(3) # ```
|
||||
|
||||
if not code.strip():
|
||||
return match.group(0)
|
||||
|
||||
# Use textwrap.dedent to remove common leading whitespace
|
||||
dedented = textwrap.dedent(code)
|
||||
|
||||
# Also handle the case where first line has different indentation
|
||||
lines = dedented.split('\n')
|
||||
if lines:
|
||||
# Find minimum indentation (excluding empty lines)
|
||||
min_indent = float('inf')
|
||||
for line in lines:
|
||||
if line.strip():
|
||||
indent = len(line) - len(line.lstrip())
|
||||
min_indent = min(min_indent, indent)
|
||||
|
||||
if min_indent > 0 and min_indent != float('inf'):
|
||||
# Remove the minimum indentation from all lines
|
||||
lines = [line[min_indent:] if line.strip() else line for line in lines]
|
||||
dedented = '\n'.join(lines)
|
||||
|
||||
if dedented != code:
|
||||
fixes += 1
|
||||
|
||||
return opening + dedented + closing
|
||||
|
||||
result = code_block_pattern.sub(fix_block, result)
|
||||
return result, fixes
|
||||
|
||||
|
||||
def process_jsonl_file(input_path, output_path=None, dry_run=False):
|
||||
"""Process a JSONL file and fix code block indentation."""
|
||||
input_path = Path(input_path)
|
||||
if output_path is None:
|
||||
output_path = input_path.with_suffix('.fixed.jsonl')
|
||||
else:
|
||||
output_path = Path(output_path)
|
||||
|
||||
if not input_path.exists():
|
||||
print(f"Error: {input_path} does not exist")
|
||||
return 0, 0
|
||||
|
||||
total_entries = 0
|
||||
total_fixes = 0
|
||||
entries_with_fixes = 0
|
||||
|
||||
with open(input_path, 'r', encoding='utf-8') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
fixed_lines = []
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Warning: Line {i+1} is not valid JSON: {e}")
|
||||
fixed_lines.append(line)
|
||||
continue
|
||||
|
||||
total_entries += 1
|
||||
entry_fixes = 0
|
||||
|
||||
# Process all string fields in the entry
|
||||
for key in entry:
|
||||
if isinstance(entry[key], str):
|
||||
fixed_text, fixes = fix_code_block_indentation(entry[key])
|
||||
if fixes > 0:
|
||||
entry[key] = fixed_text
|
||||
entry_fixes += fixes
|
||||
|
||||
if entry_fixes > 0:
|
||||
entries_with_fixes += 1
|
||||
total_fixes += entry_fixes
|
||||
|
||||
fixed_lines.append(json.dumps(entry, ensure_ascii=False))
|
||||
|
||||
if dry_run:
|
||||
print(f"DRY RUN: Would fix {total_fixes} code blocks in {entries_with_fixes}/{total_entries} entries")
|
||||
return total_fixes, entries_with_fixes
|
||||
|
||||
# Write fixed data
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
for line in fixed_lines:
|
||||
f.write(line + '\n')
|
||||
|
||||
print(f"Fixed {total_fixes} code blocks in {entries_with_fixes}/{total_entries} entries")
|
||||
print(f"Output: {output_path}")
|
||||
|
||||
return total_fixes, entries_with_fixes
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Fix training data code block indentation')
|
||||
parser.add_argument('--input', required=True, help='Input JSONL file')
|
||||
parser.add_argument('--output', default=None, help='Output JSONL file (default: input.fixed.jsonl)')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Show what would be fixed without writing')
|
||||
args = parser.parse_args()
|
||||
|
||||
fixes, entries = process_jsonl_file(args.input, args.output, args.dry_run)
|
||||
|
||||
if fixes == 0:
|
||||
print("No fixes needed - code blocks are properly indented")
|
||||
elif not args.dry_run:
|
||||
print("Done!")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
1
training/tests/__init__.py
Normal file
1
training/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Provenance tests
|
||||
@@ -1,121 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for fix_training_indentation.py
|
||||
"""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
# Import the module
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from fix_training_indentation import fix_code_block_indentation, process_jsonl_file
|
||||
|
||||
|
||||
def test_fix_code_block_indentation():
|
||||
"""Test code block indentation fixing."""
|
||||
|
||||
# Test 1: Python code block with extra indentation
|
||||
text1 = """Here is some code:
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/users/{user_id}")
|
||||
def get_user(user_id: int):
|
||||
return {"user_id": user_id}
|
||||
```
|
||||
"""
|
||||
fixed1, fixes1 = fix_code_block_indentation(text1)
|
||||
assert fixes1 == 1, f"Expected 1 fix, got {fixes1}"
|
||||
assert "from fastapi import FastAPI" in fixed1
|
||||
# Check that indentation is normalized
|
||||
lines = fixed1.split("\n")
|
||||
for line in lines:
|
||||
if "from fastapi" in line:
|
||||
assert line.startswith("from"), f"First line should not have leading spaces: {repr(line)}"
|
||||
break
|
||||
|
||||
# Test 2: Bash code block
|
||||
text2 = """Run these commands:
|
||||
```bash
|
||||
python3 script.py
|
||||
git commit -m "fix"
|
||||
```
|
||||
"""
|
||||
fixed2, fixes2 = fix_code_block_indentation(text2)
|
||||
assert fixes2 == 1, f"Expected 1 fix, got {fixes2}"
|
||||
|
||||
# Test 3: No code block
|
||||
text3 = "This is plain text with no code blocks."
|
||||
fixed3, fixes3 = fix_code_block_indentation(text3)
|
||||
assert fixes3 == 0, f"Expected 0 fixes, got {fixes3}"
|
||||
assert fixed3 == text3
|
||||
|
||||
# Test 4: Empty code block
|
||||
text4 = """Empty:
|
||||
```
|
||||
```
|
||||
"""
|
||||
fixed4, fixes4 = fix_code_block_indentation(text4)
|
||||
assert fixes4 == 0, f"Expected 0 fixes for empty block, got {fixes4}"
|
||||
|
||||
print("All tests passed!")
|
||||
|
||||
|
||||
def test_process_jsonl_file():
|
||||
"""Test processing a JSONL file."""
|
||||
|
||||
# Create test data
|
||||
test_data = [
|
||||
{
|
||||
"prompt": "Write a function",
|
||||
"chosen": "```python\ndef hello():\n print('hello')\n```",
|
||||
"rejected": ""
|
||||
},
|
||||
{
|
||||
"prompt": "Run command",
|
||||
"chosen": "```bash\necho 'test'\n```",
|
||||
"rejected": ""
|
||||
}
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
|
||||
for entry in test_data:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
input_path = f.name
|
||||
|
||||
try:
|
||||
fixes, entries = process_jsonl_file(input_path, dry_run=True)
|
||||
print(f"Dry run: {fixes} fixes in {entries} entries")
|
||||
|
||||
# Actually fix
|
||||
output_path = input_path.replace('.jsonl', '.fixed.jsonl')
|
||||
fixes, entries = process_jsonl_file(input_path, output_path)
|
||||
print(f"Fixed: {fixes} fixes in {entries} entries")
|
||||
|
||||
# Verify output
|
||||
with open(output_path, 'r') as f:
|
||||
for line in f:
|
||||
entry = json.loads(line)
|
||||
if '```python' in entry.get('chosen', ''):
|
||||
# Check indentation
|
||||
code = entry['chosen']
|
||||
lines = code.split("\n")
|
||||
for line in lines:
|
||||
if "def hello" in line:
|
||||
assert not line.startswith(" "), f"Code should not have extra indentation: {repr(line)}"
|
||||
|
||||
print("JSONL processing test passed!")
|
||||
|
||||
finally:
|
||||
Path(input_path).unlink(missing_ok=True)
|
||||
Path(input_path.replace('.jsonl', '.fixed.jsonl')).unlink(missing_ok=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_fix_code_block_indentation()
|
||||
test_process_jsonl_file()
|
||||
print("\nAll tests passed!")
|
||||
90
training/tests/test_provenance.py
Normal file
90
training/tests/test_provenance.py
Normal file
@@ -0,0 +1,90 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for training_pair_provenance.py"""
|
||||
import json, tempfile, unittest
|
||||
from pathlib import Path
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
from training.training_pair_provenance import ProvenanceTracker, load_jsonl, write_jsonl
|
||||
|
||||
|
||||
class TestAnnotate(unittest.TestCase):
|
||||
def test_annotate_adds_required_fields(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"conversations": [{"from": "human", "value": "hi"}]}
|
||||
r = t.annotate(p, source="trajectory", model="hermes4:14b", session_id="s1")
|
||||
m = r["provenance"]
|
||||
self.assertEqual(m["source"], "trajectory")
|
||||
self.assertEqual(m["model"], "hermes4:14b")
|
||||
self.assertTrue(m["approved"])
|
||||
self.assertNotEqual(m["timestamp"], "")
|
||||
|
||||
def test_exclude_sets_flag(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"conversations": []}
|
||||
t.annotate(p, source="trajectory", model="hermes4:14b")
|
||||
r = t.exclude(p, "quality_filter")
|
||||
self.assertTrue(r["provenance"]["excluded"])
|
||||
self.assertFalse(r["provenance"]["approved"])
|
||||
|
||||
def test_backfill_adds_provenance(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"conversations": []}
|
||||
r = t.backfill(p, source="backfill", model="unknown")
|
||||
self.assertEqual(r["provenance"]["source"], "backfill")
|
||||
|
||||
|
||||
class TestValidate(unittest.TestCase):
|
||||
def test_valid_pair(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "source_session_id": "c1", "approved": True}}
|
||||
self.assertEqual(t.validate(p), [])
|
||||
|
||||
def test_missing_provenance(self):
|
||||
t = ProvenanceTracker()
|
||||
self.assertTrue(any("missing" in e for e in t.validate({"conversations": []})))
|
||||
|
||||
def test_missing_field(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "approved": True}}
|
||||
self.assertTrue(any("source_session_id" in e for e in t.validate(p)))
|
||||
|
||||
def test_excluded_no_reason(self):
|
||||
t = ProvenanceTracker()
|
||||
p = {"provenance": {"source": "curated", "model": "timmy-curated", "timestamp": "2026-01-01", "source_session_id": "c1", "approved": True, "excluded": True}}
|
||||
self.assertTrue(any("exclusion_reason" in e for e in t.validate(p)))
|
||||
|
||||
|
||||
class TestReport(unittest.TestCase):
|
||||
def test_report_counts(self):
|
||||
t = ProvenanceTracker()
|
||||
for i in range(5):
|
||||
t.annotate({"conversations": []}, source="trajectory", model="hermes4:14b", session_id=f"s{i}")
|
||||
for i in range(3):
|
||||
t.annotate({"conversations": []}, source="curated", model="timmy-curated", session_id=f"c{i}")
|
||||
r = t.report()
|
||||
self.assertEqual(r["total"], 8)
|
||||
self.assertEqual(r["approved"], 8)
|
||||
self.assertEqual(r["by_source"]["trajectory"], 5)
|
||||
self.assertEqual(r["by_source"]["curated"], 3)
|
||||
|
||||
|
||||
class TestBackfillFile(unittest.TestCase):
|
||||
def test_round_trip(self):
|
||||
t = ProvenanceTracker()
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
||||
for i in range(3):
|
||||
f.write(json.dumps({"conversations": [{"from": "human", "value": f"p{i}"}]}) + "
|
||||
")
|
||||
p = Path(f.name)
|
||||
try:
|
||||
cnt = t.backfill_file(p, source="backfill", model="unknown")
|
||||
self.assertEqual(cnt, 3)
|
||||
loaded = load_jsonl(p)
|
||||
for pair in loaded:
|
||||
self.assertEqual(pair["provenance"]["source"], "backfill")
|
||||
finally:
|
||||
p.unlink()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
233
training/training_pair_provenance.py
Normal file
233
training/training_pair_provenance.py
Normal file
@@ -0,0 +1,233 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Training pair provenance tracking.
|
||||
|
||||
Every training pair in the pipeline must carry provenance metadata:
|
||||
- source_session_id: origin session identifier
|
||||
- source: trajectory|curated|backfill
|
||||
- model: model that produced the assistant turn
|
||||
- timestamp: ISO 8601 when the pair was captured
|
||||
- excluded: bool + reason if filtered out during quality screening
|
||||
- approved: bool (default True for accepted pairs)
|
||||
|
||||
Usage:
|
||||
from training_pair_provenance import ProvenanceTracker
|
||||
|
||||
tracker = ProvenanceTracker()
|
||||
for pair in pairs:
|
||||
pair = tracker.annotate(pair, source="trajectory", model="hermes4:14b")
|
||||
tracker.report()
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
REQUIRED_FIELDS = ("source_session_id", "source", "model", "timestamp", "approved")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProvenanceMeta:
|
||||
source_session_id: str = ""
|
||||
source: str = "" # trajectory | curated | backfill
|
||||
model: str = ""
|
||||
timestamp: str = ""
|
||||
excluded: bool = False
|
||||
exclusion_reason: str = ""
|
||||
approved: bool = True
|
||||
|
||||
|
||||
class ProvenanceTracker:
|
||||
"""Annotate and validate provenance metadata on training pairs."""
|
||||
|
||||
def __init__(self):
|
||||
self._stats = {
|
||||
"total": 0,
|
||||
"approved": 0,
|
||||
"excluded": 0,
|
||||
"missing_provenance": 0,
|
||||
"by_source": {},
|
||||
"by_model": {},
|
||||
}
|
||||
|
||||
# ── annotation ─────────────────────────────────────────────
|
||||
|
||||
def annotate(
|
||||
self,
|
||||
pair: dict,
|
||||
*,
|
||||
source: str,
|
||||
model: str,
|
||||
session_id: str = "",
|
||||
timestamp: str = "",
|
||||
) -> dict:
|
||||
"""Attach provenance metadata to a training pair dict."""
|
||||
meta = pair.get("provenance", {})
|
||||
meta.setdefault("source", source)
|
||||
meta.setdefault("model", model)
|
||||
if session_id:
|
||||
meta.setdefault("source_session_id", session_id)
|
||||
if timestamp:
|
||||
meta.setdefault("timestamp", timestamp)
|
||||
if "timestamp" not in meta:
|
||||
meta["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||||
if "approved" not in meta:
|
||||
meta["approved"] = True
|
||||
pair["provenance"] = meta
|
||||
self._track(pair)
|
||||
return pair
|
||||
|
||||
def exclude(self, pair: dict, reason: str) -> dict:
|
||||
"""Mark pair as excluded with a reason."""
|
||||
meta = pair.get("provenance", {})
|
||||
meta["excluded"] = True
|
||||
meta["exclusion_reason"] = reason
|
||||
meta["approved"] = False
|
||||
pair["provenance"] = meta
|
||||
return pair
|
||||
|
||||
# ── backfill ───────────────────────────────────────────────
|
||||
|
||||
def backfill(
|
||||
self,
|
||||
pair: dict,
|
||||
*,
|
||||
source: str,
|
||||
model: str,
|
||||
) -> dict:
|
||||
"""Add provenance to a pair that has none (historical data)."""
|
||||
meta = pair.get("provenance")
|
||||
if meta and meta.get("source") and meta.get("model"):
|
||||
return pair # already has provenance
|
||||
return self.annotate(pair, source=source, model=model, session_id="backfill")
|
||||
|
||||
def backfill_file(self, path: Path, *, source: str, model: str) -> int:
|
||||
"""Backfill provenance on an entire JSONL file. Returns count updated."""
|
||||
pairs = []
|
||||
count = 0
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
pair = json.loads(line)
|
||||
pair = self.backfill(pair, source=source, model=model)
|
||||
pairs.append(pair)
|
||||
count += 1
|
||||
with open(path, "w") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair) + "\n")
|
||||
return count
|
||||
|
||||
# ── validation ─────────────────────────────────────────────
|
||||
|
||||
def validate(self, pair: dict) -> list[str]:
|
||||
"""Return list of validation errors for a pair. Empty = valid."""
|
||||
errors = []
|
||||
meta = pair.get("provenance")
|
||||
if not meta:
|
||||
return ["missing provenance metadata"]
|
||||
for field_name in REQUIRED_FIELDS:
|
||||
if field_name not in meta:
|
||||
errors.append(f"provenance missing field: {field_name}")
|
||||
if meta.get("excluded") and not meta.get("exclusion_reason"):
|
||||
errors.append("excluded pair missing exclusion_reason")
|
||||
return errors
|
||||
|
||||
def validate_file(self, path: Path) -> dict:
|
||||
"""Validate all pairs in a JSONL file. Returns {valid, invalid, errors}."""
|
||||
results = {"valid": 0, "invalid": 0, "errors": []}
|
||||
with open(path) as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
line = line.strip()
|
||||
if line:
|
||||
pair = json.loads(line)
|
||||
errs = self.validate(pair)
|
||||
if errs:
|
||||
results["invalid"] += 1
|
||||
results["errors"].append({"line": i, "errors": errs})
|
||||
else:
|
||||
results["valid"] += 1
|
||||
return results
|
||||
|
||||
# ── reporting ──────────────────────────────────────────────
|
||||
|
||||
def _track(self, pair: dict):
|
||||
meta = pair.get("provenance", {})
|
||||
self._stats["total"] += 1
|
||||
if meta.get("approved"):
|
||||
self._stats["approved"] += 1
|
||||
if meta.get("excluded"):
|
||||
self._stats["excluded"] += 1
|
||||
source = meta.get("source", "unknown")
|
||||
self._stats["by_source"][source] = self._stats["by_source"].get(source, 0) + 1
|
||||
model = meta.get("model", "unknown")
|
||||
self._stats["by_model"][model] = self._stats["by_model"].get(model, 0) + 1
|
||||
|
||||
def report(self) -> dict:
|
||||
"""Return provenance statistics dict."""
|
||||
return dict(self._stats)
|
||||
|
||||
def report_text(self) -> str:
|
||||
"""Return human-readable provenance report."""
|
||||
s = self._stats
|
||||
lines = [
|
||||
"Provenance Report",
|
||||
"=" * 40,
|
||||
f" Total pairs: {s['total']}",
|
||||
f" Approved: {s['approved']}",
|
||||
f" Excluded: {s['excluded']}",
|
||||
"",
|
||||
" By source:",
|
||||
]
|
||||
for source, count in sorted(s["by_source"].items()):
|
||||
lines.append(f" {source:20s} {count}")
|
||||
lines.append(" By model:")
|
||||
for model, count in sorted(s["by_model"].items()):
|
||||
lines.append(f" {model:20s} {count}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> list[dict]:
|
||||
entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
entries.append(json.loads(line))
|
||||
return entries
|
||||
|
||||
|
||||
def write_jsonl(path: Path, pairs: list[dict]):
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair) + "\n")
|
||||
|
||||
|
||||
def provenance_dashboard(path: Path) -> str:
|
||||
"""Generate a provenance dashboard from a JSONL file."""
|
||||
pairs = load_jsonl(path)
|
||||
tracker = ProvenanceTracker()
|
||||
for pair in pairs:
|
||||
tracker._track(pair)
|
||||
report = tracker.report()
|
||||
lines = [
|
||||
"╔══════════════════════════════════════╗",
|
||||
"║ Training Provenance Dashboard ║",
|
||||
"╠══════════════════════════════════════╣",
|
||||
f"║ Total pairs: {report['total']:>12} ║",
|
||||
f"║ Approved: {report['approved']:>12} ║",
|
||||
f"║ Excluded: {report['excluded']:>12} ║",
|
||||
f"║ Provenance coverage:{'>0%' if report['total'] == 0 else f'{(report[\"approved\"] + report[\"excluded\"])*100 // max(report[\"total\"], 1)}%':>12s} ║",
|
||||
"╠══════════════════════════════════════╣",
|
||||
"║ By Source ║",
|
||||
]
|
||||
for source, count in sorted(report["by_source"].items()):
|
||||
lines.append(f"║ {source:20s} {count:>8} ║")
|
||||
lines.append("╠══════════════════════════════════════╣")
|
||||
lines.append("║ By Model ║")
|
||||
for model, count in sorted(report["by_model"].items()):
|
||||
lines.append(f"║ {model:20s} {count:>8} ║")
|
||||
lines.append("╚══════════════════════════════════════╝")
|
||||
return "\n".join(lines)
|
||||
Reference in New Issue
Block a user