Compare commits

...

2 Commits

Author SHA1 Message Date
ae063a8c71 test: training pair provenance tracker (#691)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 24s
PR Checklist / pr-checklist (pull_request) Failing after 3m29s
Smoke Test / smoke (pull_request) Failing after 18s
Validate Config / YAML Lint (pull_request) Failing after 10s
Validate Config / JSON Validate (pull_request) Successful in 12s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m15s
Validate Config / Shell Script Lint (pull_request) Failing after 31s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 3s
Validate Config / Playbook Schema Validation (pull_request) Successful in 11s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Comprehensive tests for tag/filter/report commands.
Covers: tagging, dedup, skip logic, error handling,
filtering by model, reporting, and full pipeline.

Closes #691
2026-04-15 14:54:53 +00:00
41afbc2ca9 feat: add training pair provenance tracker (#691)
Tag, filter, and report provenance metadata for JSONL training pairs.
Tracks source_session_id, model, and timestamp per pair.

Closes #691
2026-04-15 14:53:11 +00:00
2 changed files with 629 additions and 0 deletions

View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
Training Pair Provenance Tracker — Timmy Foundation
Adds, filters, and reports provenance metadata for JSONL training pairs.
Tracks source_session_id, model, and timestamp for quality auditing.
Usage:
# Tag pairs with provenance
python3 scripts/training_provenance.py tag input.jsonl -o tagged.jsonl \
--session abc123 --model nous/hermes-3
# Filter by model (exclude Anthropic-sourced)
python3 scripts/training_provenance.py filter input.jsonl -o filtered.jsonl \
--exclude-model anthropic
# Report: pair count by source model
python3 scripts/training_provenance.py report input.jsonl
# Pipe support
cat pairs.jsonl | python3 scripts/training_provenance.py report -
"""
import sys
import json
import argparse
from datetime import datetime, timezone
from collections import Counter
from typing import Dict, Any, Optional, List, TextIO
PROVENANCE_KEYS = ["source_session_id", "source_model", "source_timestamp"]
def tag_pair(pair: Dict[str, Any], session_id: Optional[str] = None,
model: Optional[str] = None) -> Dict[str, Any]:
"""Add provenance metadata to a training pair."""
meta = dict(pair.get("_provenance", {}))
if session_id:
meta["source_session_id"] = session_id
if model:
meta["source_model"] = model
meta["source_timestamp"] = datetime.now(timezone.utc).isoformat()
pair["_provenance"] = meta
return pair
def _open_input(path: str) -> TextIO:
"""Open input file or return stdin."""
return sys.stdin if path == "-" else open(path, "r", encoding="utf-8")
def _open_output(path: str) -> TextIO:
"""Open output file or return stdout."""
return sys.stdout if path == "-" else open(path, "w", encoding="utf-8")
def stamp_command(input_path: str, output_path: str,
session_id: Optional[str], model: Optional[str]) -> Dict[str, Any]:
"""Tag all pairs in a file with provenance metadata."""
tagged = 0
skipped = 0
errors = 0
source = _open_input(input_path)
out = _open_output(output_path)
try:
for line in source:
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
errors += 1
continue
# Skip if already tagged with same model+session
existing = pair.get("_provenance", {})
if (existing.get("source_model") == model
and existing.get("source_session_id") == session_id):
skipped += 1
out.write(line + "\n")
continue
pair = tag_pair(pair, session_id=session_id, model=model)
out.write(json.dumps(pair, ensure_ascii=False) + "\n")
tagged += 1
finally:
if source is not sys.stdin:
source.close()
# Never close stdout — it breaks downstream piping
return {"tagged": tagged, "skipped": skipped, "errors": errors}
def filter_pairs(input_path: str, output_path: str,
include_models: Optional[List[str]] = None,
exclude_models: Optional[List[str]] = None) -> Dict[str, Any]:
"""Filter pairs by provenance metadata."""
kept = []
removed = []
errors = 0
source = _open_input(input_path)
try:
for line in source:
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
errors += 1
continue
prov = pair.get("_provenance", {})
model = prov.get("source_model", "unknown")
should_keep = True
if include_models:
should_keep = should_keep and model in include_models
if exclude_models:
should_keep = should_keep and model not in exclude_models
if should_keep:
kept.append(pair)
else:
removed.append(pair)
finally:
if source is not sys.stdin:
source.close()
# Write output
if output_path:
out = _open_output(output_path)
try:
for pair in kept:
out.write(json.dumps(pair, ensure_ascii=False) + "\n")
finally:
if out is not sys.stdout:
out.close()
return {
"total": len(kept) + len(removed),
"kept": len(kept),
"filtered_out": len(removed),
"errors": errors,
}
def report(input_path: str) -> Dict[str, Any]:
"""Report pair counts by source model and session."""
model_counts: Counter = Counter()
session_counts: Counter = Counter()
tagged = 0
untagged = 0
total = 0
errors = 0
source = _open_input(input_path)
try:
for line in source:
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
errors += 1
continue
total += 1
prov = pair.get("_provenance", {})
if prov:
tagged += 1
model = prov.get("source_model", "unknown")
session = prov.get("source_session_id", "unknown")
model_counts[model] += 1
session_counts[session] += 1
else:
untagged += 1
finally:
if source is not sys.stdin:
source.close()
return {
"total": total,
"tagged": tagged,
"untagged": untagged,
"tag_rate": round(tagged / max(total, 1) * 100, 1),
"by_model": dict(model_counts.most_common(20)),
"by_session": dict(session_counts.most_common(10)),
"errors": errors,
}
def main():
parser = argparse.ArgumentParser(description="Training pair provenance tracking")
sub = parser.add_subparsers(dest="command", required=True)
# tag subcommand
tag_p = sub.add_parser("tag", help="Tag pairs with provenance metadata")
tag_p.add_argument("input", help="Input JSONL file (use - for stdin)")
tag_p.add_argument("-o", "--output", default="-", help="Output JSONL file")
tag_p.add_argument("--session", help="Source session ID")
tag_p.add_argument("--model", help="Source model name")
# filter subcommand
filt_p = sub.add_parser("filter", help="Filter pairs by provenance")
filt_p.add_argument("input", help="Input JSONL file (use - for stdin)")
filt_p.add_argument("-o", "--output", default="-", help="Output JSONL file")
filt_p.add_argument("--include-model", action="append", help="Only include these models")
filt_p.add_argument("--exclude-model", action="append", help="Exclude these models")
# report subcommand
rpt_p = sub.add_parser("report", help="Report provenance statistics")
rpt_p.add_argument("input", help="Input JSONL file (use - for stdin)")
args = parser.parse_args()
if args.command == "tag":
result = stamp_command(args.input, args.output, args.session, args.model)
print(f"Tagged: {result['tagged']} Skipped: {result['skipped']} Errors: {result['errors']}", file=sys.stderr)
elif args.command == "filter":
result = filter_pairs(
args.input, args.output,
include_models=args.include_model,
exclude_models=args.exclude_model,
)
print(f"Total: {result['total']} Kept: {result['kept']} Filtered: {result['filtered_out']}", file=sys.stderr)
elif args.command == "report":
result = report(args.input)
print(f"Training Pair Provenance Report", file=sys.stderr)
print(f"{'='*40}", file=sys.stderr)
print(f"Total pairs: {result['total']}", file=sys.stderr)
print(f"Tagged: {result['tagged']} ({result['tag_rate']}%)", file=sys.stderr)
print(f"Untagged: {result['untagged']}", file=sys.stderr)
if result["by_model"]:
print(f"\nBy source model:", file=sys.stderr)
for model, count in result["by_model"].items():
print(f" {model}: {count}", file=sys.stderr)
if result["by_session"]:
print(f"\nBy source session (top 10):", file=sys.stderr)
for session, count in result["by_session"].items():
session_short = session[:12] + "..." if len(session) > 12 else session
print(f" {session_short}: {count}", file=sys.stderr)
# JSON output to stdout
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,363 @@
"""Tests for training pair provenance tracking."""
import json
import os
import subprocess
import sys
import tempfile
import pytest
SCRIPT = os.path.join(os.path.dirname(__file__), "..", "scripts", "training_provenance.py")
def _run(args, stdin=None):
"""Run training_provenance.py and return (stdout, stderr, returncode)."""
result = subprocess.run(
[sys.executable, SCRIPT] + args,
capture_output=True, text=True,
input=stdin,
)
return result.stdout, result.stderr, result.returncode
def _make_pairs(count=3, model="nous/hermes-3", session="sess-123"):
"""Generate test JSONL pairs."""
lines = []
for i in range(count):
lines.append(json.dumps({"terse": f"q{i}", "rich": f"a{i}", "domain": "test"}))
return "\n".join(lines)
# ── tag command ──────────────────────────────────────────────────
class TestTagCommand:
def test_tag_adds_provenance_to_each_pair(self):
pairs = _make_pairs(3)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".tagged"
try:
out, err, rc = _run(["tag", f.name, "-o", out_path,
"--session", "sess-abc", "--model", "nous/hermes-3"])
assert rc == 0
with open(out_path) as f:
lines = [json.loads(l) for l in f if l.strip()]
assert len(lines) == 3
for pair in lines:
prov = pair["_provenance"]
assert prov["source_session_id"] == "sess-abc"
assert prov["source_model"] == "nous/hermes-3"
assert "source_timestamp" in prov
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_tag_preserves_existing_pair_data(self):
pairs = '{"terse": "hello", "rich": "world", "domain": "greeting"}\n'
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".tagged"
try:
_run(["tag", f.name, "-o", out_path, "--model", "m1"])
with open(out_path) as f:
pair = json.loads(f.readline())
assert pair["terse"] == "hello"
assert pair["rich"] == "world"
assert pair["domain"] == "greeting"
assert pair["_provenance"]["source_model"] == "m1"
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_tag_skips_already_tagged_same_provenance(self):
pair = json.dumps({
"terse": "q", "rich": "a",
"_provenance": {"source_model": "m1", "source_session_id": "s1"}
})
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pair + "\n")
f.flush()
out_path = f.name + ".tagged"
try:
_, err, rc = _run(["tag", f.name, "-o", out_path,
"--session", "s1", "--model", "m1"])
assert rc == 0
assert "Skipped: 1" in err
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_tag_overwrites_different_provenance(self):
pair = json.dumps({
"terse": "q", "rich": "a",
"_provenance": {"source_model": "old-model", "source_session_id": "old-sess"}
})
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pair + "\n")
f.flush()
out_path = f.name + ".tagged"
try:
_, err, rc = _run(["tag", f.name, "-o", out_path,
"--session", "new-sess", "--model", "new-model"])
assert rc == 0
assert "Tagged: 1" in err
with open(out_path) as f:
tagged = json.loads(f.readline())
assert tagged["_provenance"]["source_model"] == "new-model"
assert tagged["_provenance"]["source_session_id"] == "new-sess"
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_tag_skips_blank_lines(self):
pairs = '{"t":"a","r":"b"}\n\n{"t":"c","r":"d"}\n'
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".tagged"
try:
_, err, rc = _run(["tag", f.name, "-o", out_path, "--model", "m1"])
assert rc == 0
assert "Tagged: 2" in err
assert "Errors: 0" in err
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_tag_counts_malformed_lines_as_errors(self):
pairs = '{"t":"a"}\nNOT_JSON\n{"t":"b"}\n'
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".tagged"
try:
_, err, rc = _run(["tag", f.name, "-o", out_path, "--model", "m1"])
assert rc == 0
assert "Errors: 1" in err
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
# ── filter command ───────────────────────────────────────────────
class TestFilterCommand:
def test_filter_exclude_model(self):
lines = []
for model in ["nous/hermes-3", "anthropic/claude", "openai/gpt-4"]:
lines.append(json.dumps({
"t": "q", "r": "a",
"_provenance": {"source_model": model}
}))
pairs = "\n".join(lines)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".filtered"
try:
_, err, rc = _run(["filter", f.name, "-o", out_path,
"--exclude-model", "anthropic"])
assert rc == 0
assert "Kept: 2" in err
assert "Filtered: 1" in err
with open(out_path) as f:
kept = [json.loads(l) for l in f if l.strip()]
models = [p["_provenance"]["source_model"] for p in kept]
assert "anthropic/claude" not in models
assert "nous/hermes-3" in models
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_filter_include_model(self):
lines = []
for model in ["nous/hermes-3", "anthropic/claude", "openai/gpt-4"]:
lines.append(json.dumps({
"t": "q", "r": "a",
"_provenance": {"source_model": model}
}))
pairs = "\n".join(lines)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".filtered"
try:
_, err, rc = _run(["filter", f.name, "-o", out_path,
"--include-model", "nous/hermes-3"])
assert rc == 0
assert "Kept: 1" in err
with open(out_path) as f:
kept = [json.loads(l) for l in f if l.strip()]
assert len(kept) == 1
assert kept[0]["_provenance"]["source_model"] == "nous/hermes-3"
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
def test_filter_untreated_pairs_have_unknown_model(self):
pairs = '{"t":"q","r":"a"}\n' # no _provenance
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
out_path = f.name + ".filtered"
try:
# Exclude "unknown" — should filter out unprovenanced pair
_, err, rc = _run(["filter", f.name, "-o", out_path,
"--exclude-model", "unknown"])
assert rc == 0
assert "Kept: 0" in err
finally:
os.unlink(f.name)
if os.path.exists(out_path):
os.unlink(out_path)
# ── report command ───────────────────────────────────────────────
class TestReportCommand:
def test_report_counts_by_model(self):
lines = []
for model in ["nous/hermes-3", "nous/hermes-3", "anthropic/claude"]:
lines.append(json.dumps({
"t": "q", "r": "a",
"_provenance": {"source_model": model, "source_session_id": "s1"}
}))
pairs = "\n".join(lines)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
try:
out, err, rc = _run(["report", f.name])
assert rc == 0
result = json.loads(out)
assert result["total"] == 3
assert result["tagged"] == 3
assert result["untagged"] == 0
assert result["tag_rate"] == 100.0
assert result["by_model"]["nous/hermes-3"] == 2
assert result["by_model"]["anthropic/claude"] == 1
finally:
os.unlink(f.name)
def test_report_distinguishes_tagged_vs_untagged(self):
pairs = '{"t":"q","r":"a"}\n{"t":"q2","r":"a2","_provenance":{"source_model":"m1"}}\n'
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
try:
out, err, rc = _run(["report", f.name])
assert rc == 0
result = json.loads(out)
assert result["total"] == 2
assert result["tagged"] == 1
assert result["untagged"] == 1
assert result["tag_rate"] == 50.0
finally:
os.unlink(f.name)
def test_report_handles_empty_file(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write("")
f.flush()
try:
out, err, rc = _run(["report", f.name])
assert rc == 0
result = json.loads(out)
assert result["total"] == 0
assert result["tag_rate"] == 0
finally:
os.unlink(f.name)
def test_report_counts_by_session(self):
lines = []
for sess in ["sess-a", "sess-a", "sess-b"]:
lines.append(json.dumps({
"t": "q", "r": "a",
"_provenance": {"source_model": "m1", "source_session_id": sess}
}))
pairs = "\n".join(lines)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(pairs)
f.flush()
try:
out, _, rc = _run(["report", f.name])
assert rc == 0
result = json.loads(out)
assert result["by_session"]["sess-a"] == 2
assert result["by_session"]["sess-b"] == 1
finally:
os.unlink(f.name)
# ── integration ──────────────────────────────────────────────────
class TestIntegration:
def test_tag_then_filter_then_report(self):
"""Full pipeline: tag → filter → report."""
lines = []
for i, model in enumerate(["nous/hermes-3", "anthropic/claude", "openai/gpt-4"]):
lines.append(json.dumps({"terse": f"q{i}", "rich": f"a{i}", "domain": "test"}))
pairs = "\n".join(lines)
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as src:
src.write(pairs)
src.flush()
tagged_path = src.name + ".tagged"
filtered_path = src.name + ".filtered"
try:
# Step 1: Tag all with session info
_, _, rc = _run(["tag", src.name, "-o", tagged_path,
"--session", "pipe-1", "--model", "nous/hermes-3"])
assert rc == 0
# Step 2: Filter — exclude "unknown" model (untagged pairs)
_, err2, rc = _run(["filter", tagged_path, "-o", filtered_path,
"--exclude-model", "unknown"])
assert rc == 0
assert "Kept: 3" in err2
# Step 3: Report
out, _, rc = _run(["report", filtered_path])
assert rc == 0
result = json.loads(out)
assert result["total"] == 3
assert result["tagged"] == 3
assert result["tag_rate"] == 100.0
finally:
for p in [src.name, tagged_path, filtered_path]:
if os.path.exists(p):
os.unlink(p)