Compare commits
3 Commits
fix/624
...
burn/691-1
| Author | SHA1 | Date | |
|---|---|---|---|
| f05707254e | |||
| 7101a0d5e5 | |||
| 5763a148c2 |
@@ -1,331 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
nightly_scheduler.py — Nightly Pipeline Scheduler
|
||||
|
||||
Auto-starts batch pipelines when inference is available, respecting
|
||||
priority ordering, token budgets, and peak-hour pausing.
|
||||
|
||||
Usage:
|
||||
python3 nightly_scheduler.py # run scheduler
|
||||
python3 nightly_scheduler.py --check # dry-run: show what would start
|
||||
python3 nightly_scheduler.py --status # show pipeline status
|
||||
python3 nightly_scheduler.py --reset # reset daily budget
|
||||
|
||||
Crontab:
|
||||
# Run every 30 minutes during off-peak hours (10pm-6am)
|
||||
*/30 22-5 * * * cd /path/to/timmy-config && python3 pipeline/nightly_scheduler.py >> ~/.hermes/pipeline-logs/nightly.log 2>&1
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# --- Config ---
|
||||
STATE_FILE = Path.home() / ".hermes" / "pipeline_state.json"
|
||||
LOG_DIR = Path.home() / ".hermes" / "pipeline-logs"
|
||||
DAILY_TOKEN_BUDGET = 5_000_000 # 5M tokens per day
|
||||
PEAK_HOURS = list(range(8, 22)) # 8am-10pm = peak interactive usage
|
||||
CHECK_INTERVAL = 1800 # 30 minutes
|
||||
|
||||
INFERENCE_ENDPOINTS = [
|
||||
{"name": "local_ollama", "url": "http://localhost:11434/v1/models", "type": "local"},
|
||||
{"name": "runpod", "url": "https://8lfr3j47a5r3gn-11434.proxy.runpod.net/v1/models", "type": "gpu"},
|
||||
{"name": "openrouter", "url": "https://openrouter.ai/api/v1/models", "type": "cloud"},
|
||||
]
|
||||
|
||||
# Pipeline priority order (highest first)
|
||||
PIPELINE_PRIORITY = [
|
||||
{"name": "playground_factory", "script": "pipeline/playground_factory.py", "priority": 1},
|
||||
{"name": "training_factory", "script": "pipeline/training_factory.py", "priority": 2},
|
||||
{"name": "knowledge_mine", "script": "pipeline/knowledge_mine.py", "priority": 3},
|
||||
{"name": "adversary", "script": "pipeline/adversary_runner.py", "priority": 4},
|
||||
{"name": "codebase_genome", "script": "pipeline/codebase_genome.py", "priority": 5},
|
||||
]
|
||||
|
||||
# Dependency rules: some pipelines only start after others are running
|
||||
DEPENDENCY_RULES = {
|
||||
"playground_factory": [], # no deps, start immediately
|
||||
"training_factory": [], # no deps, start in parallel
|
||||
"knowledge_mine": ["training_factory"], # start after training is running
|
||||
"adversary": ["knowledge_mine"], # start after knowledge is halfway
|
||||
"codebase_genome": [], # continuous, one repo per night
|
||||
}
|
||||
|
||||
|
||||
def load_state():
|
||||
"""Load pipeline state from disk."""
|
||||
if STATE_FILE.exists():
|
||||
with open(STATE_FILE) as f:
|
||||
return json.load(f)
|
||||
return {
|
||||
"last_run": None,
|
||||
"daily_tokens_used": 0,
|
||||
"budget_reset_date": None,
|
||||
"pipelines": {},
|
||||
"active_sessions": [],
|
||||
}
|
||||
|
||||
|
||||
def save_state(state):
|
||||
"""Save pipeline state to disk."""
|
||||
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(STATE_FILE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def check_provider(endpoint):
|
||||
"""Check if an inference provider is available."""
|
||||
try:
|
||||
req = urllib.request.Request(endpoint["url"], headers={"Authorization": "Bearer ollama"})
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
return resp.status == 200
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_available_providers():
|
||||
"""Check all inference endpoints and return available ones."""
|
||||
available = []
|
||||
for ep in INFERENCE_ENDPOINTS:
|
||||
if check_provider(ep):
|
||||
available.append(ep["name"])
|
||||
return available
|
||||
|
||||
|
||||
def is_peak_hours():
|
||||
"""Check if current time is during peak interactive usage."""
|
||||
now = datetime.now()
|
||||
return now.hour in PEAK_HOURS
|
||||
|
||||
|
||||
def check_token_budget(state):
|
||||
"""Check if daily token budget allows starting new work."""
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
if state.get("budget_reset_date") != today:
|
||||
# New day, reset budget
|
||||
state["daily_tokens_used"] = 0
|
||||
state["budget_reset_date"] = today
|
||||
save_state(state)
|
||||
return state["daily_tokens_used"] < DAILY_TOKEN_BUDGET
|
||||
|
||||
|
||||
def get_pipeline_status(state, pipeline_name):
|
||||
"""Get the status of a specific pipeline."""
|
||||
return state.get("pipelines", {}).get(pipeline_name, {
|
||||
"status": "not_started",
|
||||
"last_run": None,
|
||||
"last_success": None,
|
||||
"progress": 0,
|
||||
})
|
||||
|
||||
|
||||
def check_dependencies(state, pipeline_name):
|
||||
"""Check if pipeline dependencies are satisfied."""
|
||||
deps = DEPENDENCY_RULES.get(pipeline_name, [])
|
||||
for dep in deps:
|
||||
dep_status = get_pipeline_status(state, dep)
|
||||
if dep_status["status"] not in ("running", "completed"):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def start_pipeline(pipeline, state, dry_run=False):
|
||||
"""Start a pipeline process."""
|
||||
name = pipeline["name"]
|
||||
script = pipeline["script"]
|
||||
|
||||
log(f"Starting pipeline: {name}")
|
||||
|
||||
if dry_run:
|
||||
log(f" DRY RUN — would run: python3 {script}")
|
||||
return True
|
||||
|
||||
# Check if script exists
|
||||
script_path = Path(script)
|
||||
if not script_path.exists():
|
||||
log(f" Script not found: {script_path}")
|
||||
# Update state anyway so we track the attempt
|
||||
state["pipelines"][name] = {
|
||||
"status": "script_missing",
|
||||
"last_run": datetime.now(timezone.utc).isoformat(),
|
||||
"progress": 0,
|
||||
}
|
||||
save_state(state)
|
||||
return False
|
||||
|
||||
# Run the pipeline script
|
||||
import subprocess
|
||||
log_dir = LOG_DIR / name
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
log_file = log_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
["python3", str(script_path)],
|
||||
stdout=open(log_file, "w"),
|
||||
stderr=subprocess.STDOUT,
|
||||
cwd=str(Path(script).parent.parent),
|
||||
)
|
||||
|
||||
state["pipelines"][name] = {
|
||||
"status": "running",
|
||||
"pid": proc.pid,
|
||||
"last_run": datetime.now(timezone.utc).isoformat(),
|
||||
"log_file": str(log_file),
|
||||
"progress": 0,
|
||||
}
|
||||
save_state(state)
|
||||
log(f" Started PID {proc.pid}, log: {log_file}")
|
||||
return True
|
||||
except Exception as e:
|
||||
log(f" Failed to start: {e}")
|
||||
state["pipelines"][name] = {
|
||||
"status": "failed",
|
||||
"last_run": datetime.now(timezone.utc).isoformat(),
|
||||
"error": str(e),
|
||||
}
|
||||
save_state(state)
|
||||
return False
|
||||
|
||||
|
||||
def check_running_pipelines(state):
|
||||
"""Check status of running pipelines and update state."""
|
||||
import subprocess
|
||||
for name, info in state.get("pipelines", {}).items():
|
||||
if info.get("status") == "running":
|
||||
pid = info.get("pid")
|
||||
if pid:
|
||||
try:
|
||||
os.kill(pid, 0) # Check if process exists
|
||||
except ProcessLookupError:
|
||||
# Process finished
|
||||
info["status"] = "completed"
|
||||
info["completed_at"] = datetime.now(timezone.utc).isoformat()
|
||||
log(f"Pipeline {name} completed (PID {pid} exited)")
|
||||
save_state(state)
|
||||
|
||||
|
||||
def run_scheduler(dry_run=False, check_only=False):
|
||||
"""Main scheduler loop."""
|
||||
state = load_state()
|
||||
|
||||
log("=" * 50)
|
||||
log(f"Pipeline Scheduler — {datetime.now().isoformat()}")
|
||||
log(f"Mode: {'CHECK' if check_only else 'DRY RUN' if dry_run else 'LIVE'}")
|
||||
|
||||
# Check peak hours
|
||||
if is_peak_hours():
|
||||
log("Peak hours detected. Pausing pipeline starts.")
|
||||
log("Pipelines will resume at 10pm.")
|
||||
return
|
||||
|
||||
# Check token budget
|
||||
if not check_token_budget(state):
|
||||
log(f"Daily token budget exhausted ({state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET})")
|
||||
return
|
||||
log(f"Token budget: {state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET}")
|
||||
|
||||
# Check providers
|
||||
providers = get_available_providers()
|
||||
if not providers:
|
||||
log("No inference providers available. Skipping.")
|
||||
return
|
||||
log(f"Available providers: {', '.join(providers)}")
|
||||
|
||||
# Check running pipelines
|
||||
check_running_pipelines(state)
|
||||
|
||||
# Find next pipeline to start
|
||||
started = 0
|
||||
for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]):
|
||||
name = pipeline["name"]
|
||||
status = get_pipeline_status(state, name)
|
||||
|
||||
# Skip if already running or completed
|
||||
if status["status"] in ("running", "completed"):
|
||||
log(f" {name}: {status['status']} (skipping)")
|
||||
continue
|
||||
|
||||
# Check dependencies
|
||||
if not check_dependencies(state, name):
|
||||
deps = DEPENDENCY_RULES.get(name, [])
|
||||
log(f" {name}: waiting for dependencies: {deps}")
|
||||
continue
|
||||
|
||||
# Start the pipeline
|
||||
if check_only:
|
||||
log(f" {name}: READY to start (priority {pipeline['priority']})")
|
||||
else:
|
||||
if start_pipeline(pipeline, state, dry_run):
|
||||
started += 1
|
||||
# Only start one pipeline per run to avoid overload
|
||||
if started >= 1:
|
||||
log("Started 1 pipeline. Will check again next cycle.")
|
||||
break
|
||||
|
||||
if started == 0 and not check_only:
|
||||
log("No pipelines to start. All are running, completed, or blocked.")
|
||||
|
||||
log("=" * 50)
|
||||
|
||||
|
||||
def show_status():
|
||||
"""Show current pipeline status."""
|
||||
state = load_state()
|
||||
print(f"\nPipeline Status — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
||||
print(f"Token budget: {state.get('daily_tokens_used', 0)}/{DAILY_TOKEN_BUDGET}")
|
||||
print(f"Last run: {state.get('last_run', 'never')}")
|
||||
print()
|
||||
|
||||
for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]):
|
||||
name = pipeline["name"]
|
||||
status = get_pipeline_status(state, name)
|
||||
st = status["status"]
|
||||
icon = {"running": "●", "completed": "✓", "failed": "✗", "not_started": "○", "script_missing": "?"}.get(st, "?")
|
||||
print(f" {icon} {name:25} {st:15} last={(status.get('last_run') or 'never')[:19]}")
|
||||
|
||||
|
||||
def reset_budget():
|
||||
"""Reset daily token budget."""
|
||||
state = load_state()
|
||||
state["daily_tokens_used"] = 0
|
||||
state["budget_reset_date"] = datetime.now().strftime("%Y-%m-%d")
|
||||
save_state(state)
|
||||
print("Budget reset.")
|
||||
|
||||
|
||||
def log(msg):
|
||||
"""Log to stdout and file."""
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
line = f"[{timestamp}] {msg}"
|
||||
print(line)
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
log_file = LOG_DIR / "nightly.log"
|
||||
with open(log_file, "a") as f:
|
||||
f.write(line + "\n")
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Nightly Pipeline Scheduler")
|
||||
parser.add_argument("--check", action="store_true", help="Dry-run: show what would start")
|
||||
parser.add_argument("--status", action="store_true", help="Show pipeline status")
|
||||
parser.add_argument("--reset", action="store_true", help="Reset daily token budget")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Dry-run mode")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.status:
|
||||
show_status()
|
||||
elif args.reset:
|
||||
reset_budget()
|
||||
else:
|
||||
run_scheduler(dry_run=args.dry_run or args.check, check_only=args.check)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -75,3 +75,69 @@ The data (curated exemplars, preference pairs, trained weights) is proprietary.
|
||||
### Key Insight
|
||||
|
||||
The base model's RLHF priors override LoRA on crisis/faith — the most important parts of SOUL.md. Fix: inference-time grounding (inject SOUL.md crisis protocol) + larger pure-Timmy corpus over time.
|
||||
|
||||
|
||||
## Training Pair Provenance Tracking
|
||||
|
||||
Tracks the provenance of training pairs for quality filtering and reporting.
|
||||
|
||||
### Features
|
||||
|
||||
- **Metadata tracking**: Each pair gets provenance metadata:
|
||||
- `source_session_id`: Which session generated the pair
|
||||
- `model`: Which model generated it
|
||||
- `timestamp`: When it was generated
|
||||
- `source`: Source type (curated, trajectory, etc.)
|
||||
- `content_hash`: For deduplication
|
||||
|
||||
- **Filtering**: Filter pairs by provenance criteria:
|
||||
- Exclude specific models (e.g., Anthropic models)
|
||||
- Exclude specific sources
|
||||
- Filter by timestamp range
|
||||
|
||||
- **Reporting**: Generate reports showing:
|
||||
- Pair count by source model
|
||||
- Pair count by source type
|
||||
- Exclusion statistics
|
||||
|
||||
### Usage
|
||||
|
||||
```bash
|
||||
# Add provenance to existing dataset
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --output data/curated_with_provenance.jsonl
|
||||
|
||||
# Filter out Anthropic-sourced pairs
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --filter exclude_anthropic
|
||||
|
||||
# Generate provenance report
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --report
|
||||
|
||||
# JSON report
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --report --json
|
||||
```
|
||||
|
||||
### Integration
|
||||
|
||||
The provenance tracker can be integrated into existing pipelines:
|
||||
|
||||
```python
|
||||
from training_pair_provenance import ProvenanceTracker
|
||||
|
||||
tracker = ProvenanceTracker()
|
||||
|
||||
# Process pairs
|
||||
for pair in pairs:
|
||||
processed = tracker.process_pair(pair)
|
||||
|
||||
# Filter
|
||||
filtered = tracker.filter_by_provenance(processed_pairs, exclude_models=["anthropic/claude-3-opus"])
|
||||
|
||||
# Report
|
||||
print(tracker.generate_report())
|
||||
```
|
||||
|
||||
### Testing
|
||||
|
||||
```bash
|
||||
python3 -m pytest training/test_training_pair_provenance.py -v
|
||||
```
|
||||
|
||||
157
training/test_training_pair_provenance.py
Normal file
157
training/test_training_pair_provenance.py
Normal file
@@ -0,0 +1,157 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for Training Pair Provenance Tracking
|
||||
"""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from training_pair_provenance import ProvenanceTracker, load_jsonl, save_jsonl
|
||||
|
||||
|
||||
class TestProvenanceTracker:
|
||||
"""Test the ProvenanceTracker class."""
|
||||
|
||||
def test_init(self):
|
||||
"""Test tracker initialization."""
|
||||
tracker = ProvenanceTracker()
|
||||
assert tracker.stats["total_pairs"] == 0
|
||||
assert tracker.stats["pairs_with_provenance"] == 0
|
||||
assert tracker.stats["pairs_without_provenance"] == 0
|
||||
|
||||
def test_generate_pair_id(self):
|
||||
"""Test pair ID generation."""
|
||||
tracker = ProvenanceTracker()
|
||||
pair = {"prompt": "test", "chosen": "response", "rejected": "bad"}
|
||||
|
||||
id1 = tracker.generate_pair_id(pair)
|
||||
id2 = tracker.generate_pair_id(pair)
|
||||
|
||||
# Same content should generate same ID
|
||||
assert id1 == id2
|
||||
assert len(id1) == 16
|
||||
|
||||
def test_add_provenance(self):
|
||||
"""Test adding provenance to a pair."""
|
||||
tracker = ProvenanceTracker()
|
||||
pair = {"prompt": "test", "chosen": "response", "rejected": "bad"}
|
||||
|
||||
result = tracker.add_provenance(pair, source_session_id="session123", model="test-model")
|
||||
|
||||
assert "provenance" in result
|
||||
assert result["provenance"]["source_session_id"] == "session123"
|
||||
assert result["provenance"]["model"] == "test-model"
|
||||
assert "timestamp" in result["provenance"]
|
||||
assert result["provenance"]["source"] == "curated"
|
||||
assert "content_hash" in result["provenance"]
|
||||
|
||||
def test_extract_provenance_from_existing(self):
|
||||
"""Test extracting provenance from existing fields."""
|
||||
tracker = ProvenanceTracker()
|
||||
pair = {
|
||||
"id": "session456",
|
||||
"model": "claude-3-opus",
|
||||
"started_at": "2024-01-01T00:00:00Z",
|
||||
"conversations": [{"from": "human", "value": "test"}]
|
||||
}
|
||||
|
||||
provenance = tracker.extract_provenance_from_existing(pair)
|
||||
|
||||
assert provenance["source_session_id"] == "session456"
|
||||
assert provenance["model"] == "claude-3-opus"
|
||||
assert provenance["timestamp"] == "2024-01-01T00:00:00Z"
|
||||
assert provenance["source"] == "curated"
|
||||
assert "content_hash" in provenance
|
||||
|
||||
def test_process_pair(self):
|
||||
"""Test processing a pair."""
|
||||
tracker = ProvenanceTracker()
|
||||
pair = {"id": "test123", "model": "test-model", "conversations": []}
|
||||
|
||||
result = tracker.process_pair(pair)
|
||||
|
||||
assert tracker.stats["total_pairs"] == 1
|
||||
assert tracker.stats["pairs_without_provenance"] == 1
|
||||
assert "provenance" in result
|
||||
|
||||
def test_filter_by_provenance(self):
|
||||
"""Test filtering pairs by provenance."""
|
||||
tracker = ProvenanceTracker()
|
||||
|
||||
pairs = [
|
||||
{"provenance": {"model": "anthropic/claude-3-opus"}},
|
||||
{"provenance": {"model": "gpt-4"}},
|
||||
{"provenance": {"model": "anthropic/claude-3-sonnet"}},
|
||||
]
|
||||
|
||||
filtered = tracker.filter_by_provenance(pairs, exclude_models=["anthropic/claude-3-opus", "anthropic/claude-3-sonnet"])
|
||||
|
||||
assert len(filtered) == 1
|
||||
assert filtered[0]["provenance"]["model"] == "gpt-4"
|
||||
assert tracker.stats["excluded"] == 2
|
||||
|
||||
def test_generate_report(self):
|
||||
"""Test report generation."""
|
||||
tracker = ProvenanceTracker()
|
||||
tracker.stats = {
|
||||
"total_pairs": 10,
|
||||
"pairs_with_provenance": 8,
|
||||
"pairs_without_provenance": 2,
|
||||
"by_model": {"gpt-4": 5, "claude-3": 3},
|
||||
"by_source": {"curated": 8},
|
||||
"excluded": 0
|
||||
}
|
||||
|
||||
report = tracker.generate_report()
|
||||
|
||||
assert "Total pairs: 10" in report
|
||||
assert "Pairs with provenance: 8" in report
|
||||
assert "gpt-4: 5" in report
|
||||
|
||||
|
||||
class TestJsonlFunctions:
|
||||
"""Test JSONL load/save functions."""
|
||||
|
||||
def test_load_jsonl(self):
|
||||
"""Test loading JSONL file."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
|
||||
f.write('{"id": "1", "value": "test1"}\n')
|
||||
f.write('{"id": "2", "value": "test2"}\n')
|
||||
f.write('{"id": "3", "value": "test3"}\n')
|
||||
temp_path = Path(f.name)
|
||||
|
||||
try:
|
||||
entries = load_jsonl(temp_path)
|
||||
assert len(entries) == 3
|
||||
assert entries[0]["id"] == "1"
|
||||
assert entries[2]["value"] == "test3"
|
||||
finally:
|
||||
temp_path.unlink()
|
||||
|
||||
def test_save_jsonl(self):
|
||||
"""Test saving JSONL file."""
|
||||
entries = [
|
||||
{"id": "1", "value": "test1"},
|
||||
{"id": "2", "value": "test2"}
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
|
||||
temp_path = Path(f.name)
|
||||
|
||||
try:
|
||||
save_jsonl(entries, temp_path)
|
||||
|
||||
with open(temp_path) as f:
|
||||
lines = f.readlines()
|
||||
|
||||
assert len(lines) == 2
|
||||
assert json.loads(lines[0])["id"] == "1"
|
||||
assert json.loads(lines[1])["value"] == "test2"
|
||||
finally:
|
||||
temp_path.unlink()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
281
training/training_pair_provenance.py
Normal file
281
training/training_pair_provenance.py
Normal file
@@ -0,0 +1,281 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Training Pair Provenance Tracking
|
||||
|
||||
Adds provenance metadata to training pairs for quality filtering and reporting.
|
||||
Tracks source session, model, timestamp, and other metadata.
|
||||
|
||||
Usage:
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --output data/curated_with_provenance.jsonl
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --filter exclude_anthropic
|
||||
python3 training_pair_provenance.py --input data/curated_dataset.jsonl --report
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import hashlib
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
class ProvenanceTracker:
|
||||
"""Track provenance of training pairs."""
|
||||
|
||||
# Models to exclude by default (configurable)
|
||||
EXCLUDED_MODELS = {"anthropic/claude-3-opus", "anthropic/claude-3-sonnet", "anthropic/claude-3-haiku"}
|
||||
|
||||
def __init__(self):
|
||||
self.stats = {
|
||||
"total_pairs": 0,
|
||||
"pairs_with_provenance": 0,
|
||||
"pairs_without_provenance": 0,
|
||||
"by_model": {},
|
||||
"by_source": {},
|
||||
"excluded": 0
|
||||
}
|
||||
|
||||
def generate_pair_id(self, pair: Dict[str, Any]) -> str:
|
||||
"""Generate a unique ID for a training pair."""
|
||||
# Use content hash for deduplication
|
||||
content = json.dumps(pair, sort_keys=True)
|
||||
return hashlib.sha256(content.encode()).hexdigest()[:16]
|
||||
|
||||
def add_provenance(self, pair: Dict[str, Any],
|
||||
source_session_id: Optional[str] = None,
|
||||
model: Optional[str] = None,
|
||||
source: str = "curated") -> Dict[str, Any]:
|
||||
"""Add provenance metadata to a training pair."""
|
||||
|
||||
# Generate pair ID if not present
|
||||
if "id" not in pair:
|
||||
pair["id"] = self.generate_pair_id(pair)
|
||||
|
||||
# Add provenance metadata
|
||||
if "provenance" not in pair:
|
||||
pair["provenance"] = {}
|
||||
|
||||
provenance = pair["provenance"]
|
||||
|
||||
# Source session ID
|
||||
if source_session_id:
|
||||
provenance["source_session_id"] = source_session_id
|
||||
elif "id" in pair:
|
||||
# Use existing ID as session ID
|
||||
provenance["source_session_id"] = pair["id"]
|
||||
|
||||
# Model
|
||||
if model:
|
||||
provenance["model"] = model
|
||||
elif "model" in pair:
|
||||
# Use existing model field
|
||||
provenance["model"] = pair["model"]
|
||||
|
||||
# Timestamp
|
||||
if "timestamp" not in provenance:
|
||||
provenance["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Source type
|
||||
provenance["source"] = source
|
||||
|
||||
# Content hash for deduplication
|
||||
if "content_hash" not in provenance:
|
||||
# Hash the conversations for dedup
|
||||
conversations = pair.get("conversations", [])
|
||||
content_str = json.dumps(conversations, sort_keys=True)
|
||||
provenance["content_hash"] = hashlib.sha256(content_str.encode()).hexdigest()[:32]
|
||||
|
||||
return pair
|
||||
|
||||
def extract_provenance_from_existing(self, pair: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Extract provenance from existing pair fields."""
|
||||
provenance = {}
|
||||
|
||||
# Extract from existing fields
|
||||
if "id" in pair:
|
||||
provenance["source_session_id"] = pair["id"]
|
||||
|
||||
if "model" in pair:
|
||||
provenance["model"] = pair["model"]
|
||||
|
||||
if "started_at" in pair:
|
||||
provenance["timestamp"] = pair["started_at"]
|
||||
|
||||
# Add source
|
||||
provenance["source"] = "curated"
|
||||
|
||||
# Add content hash
|
||||
conversations = pair.get("conversations", [])
|
||||
content_str = json.dumps(conversations, sort_keys=True)
|
||||
provenance["content_hash"] = hashlib.sha256(content_str.encode()).hexdigest()[:32]
|
||||
|
||||
return provenance
|
||||
|
||||
def process_pair(self, pair: Dict[str, Any],
|
||||
add_provenance: bool = True) -> Dict[str, Any]:
|
||||
"""Process a single training pair."""
|
||||
self.stats["total_pairs"] += 1
|
||||
|
||||
# Check if provenance already exists
|
||||
if "provenance" in pair:
|
||||
self.stats["pairs_with_provenance"] += 1
|
||||
provenance = pair["provenance"]
|
||||
else:
|
||||
self.stats["pairs_without_provenance"] += 1
|
||||
|
||||
if add_provenance:
|
||||
# Extract from existing fields
|
||||
provenance = self.extract_provenance_from_existing(pair)
|
||||
pair["provenance"] = provenance
|
||||
else:
|
||||
provenance = {}
|
||||
|
||||
# Update statistics
|
||||
model = provenance.get("model", "unknown")
|
||||
self.stats["by_model"][model] = self.stats["by_model"].get(model, 0) + 1
|
||||
|
||||
source = provenance.get("source", "unknown")
|
||||
self.stats["by_source"][source] = self.stats["by_source"].get(source, 0) + 1
|
||||
|
||||
return pair
|
||||
|
||||
def filter_by_provenance(self, pairs: List[Dict[str, Any]],
|
||||
exclude_models: Optional[List[str]] = None,
|
||||
exclude_sources: Optional[List[str]] = None,
|
||||
min_timestamp: Optional[str] = None,
|
||||
max_timestamp: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Filter pairs by provenance criteria."""
|
||||
if exclude_models is None:
|
||||
exclude_models = list(self.EXCLUDED_MODELS)
|
||||
|
||||
filtered = []
|
||||
|
||||
for pair in pairs:
|
||||
provenance = pair.get("provenance", {})
|
||||
|
||||
# Check model exclusion
|
||||
model = provenance.get("model", "")
|
||||
if model in exclude_models:
|
||||
self.stats["excluded"] += 1
|
||||
continue
|
||||
|
||||
# Check source exclusion
|
||||
source = provenance.get("source", "")
|
||||
if exclude_sources and source in exclude_sources:
|
||||
self.stats["excluded"] += 1
|
||||
continue
|
||||
|
||||
# Check timestamp range
|
||||
timestamp = provenance.get("timestamp", "")
|
||||
if min_timestamp and timestamp < min_timestamp:
|
||||
self.stats["excluded"] += 1
|
||||
continue
|
||||
if max_timestamp and timestamp > max_timestamp:
|
||||
self.stats["excluded"] += 1
|
||||
continue
|
||||
|
||||
filtered.append(pair)
|
||||
|
||||
return filtered
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""Generate a provenance report."""
|
||||
report = []
|
||||
report.append("=== Training Pair Provenance Report ===")
|
||||
report.append(f"Total pairs: {self.stats['total_pairs']}")
|
||||
report.append(f"Pairs with provenance: {self.stats['pairs_with_provenance']}")
|
||||
report.append(f"Pairs without provenance: {self.stats['pairs_without_provenance']}")
|
||||
report.append(f"Excluded pairs: {self.stats['excluded']}")
|
||||
report.append("")
|
||||
|
||||
report.append("=== Pairs by Model ===")
|
||||
for model, count in sorted(self.stats["by_model"].items(), key=lambda x: x[1], reverse=True):
|
||||
report.append(f" {model}: {count}")
|
||||
report.append("")
|
||||
|
||||
report.append("=== Pairs by Source ===")
|
||||
for source, count in sorted(self.stats["by_source"].items(), key=lambda x: x[1], reverse=True):
|
||||
report.append(f" {source}: {count}")
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
|
||||
def load_jsonl(path: Path) -> List[Dict[str, Any]]:
|
||||
"""Load a JSONL file."""
|
||||
entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
entries.append(json.loads(line))
|
||||
return entries
|
||||
|
||||
|
||||
def save_jsonl(entries: List[Dict[str, Any]], path: Path):
|
||||
"""Save entries to a JSONL file."""
|
||||
with open(path, "w") as f:
|
||||
for entry in entries:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Training Pair Provenance Tracking")
|
||||
parser.add_argument("--input", required=True, help="Input JSONL file")
|
||||
parser.add_argument("--output", help="Output JSONL file (with provenance added)")
|
||||
parser.add_argument("--filter", choices=["exclude_anthropic", "exclude_openai", "custom"],
|
||||
help="Apply filter")
|
||||
parser.add_argument("--exclude-models", nargs="+", help="Models to exclude")
|
||||
parser.add_argument("--exclude-sources", nargs="+", help="Sources to exclude")
|
||||
parser.add_argument("--report", action="store_true", help="Generate report only")
|
||||
parser.add_argument("--json", action="store_true", help="Output report as JSON")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load input
|
||||
pairs = load_jsonl(Path(args.input))
|
||||
print(f"Loaded {len(pairs)} pairs from {args.input}")
|
||||
|
||||
# Create tracker
|
||||
tracker = ProvenanceTracker()
|
||||
|
||||
# Process pairs
|
||||
processed_pairs = []
|
||||
for pair in pairs:
|
||||
processed = tracker.process_pair(pair, add_provenance=True)
|
||||
processed_pairs.append(processed)
|
||||
|
||||
# Apply filters if requested
|
||||
if args.filter:
|
||||
exclude_models = []
|
||||
if args.filter == "exclude_anthropic":
|
||||
exclude_models = list(ProvenanceTracker.EXCLUDED_MODELS)
|
||||
elif args.exclude_models:
|
||||
exclude_models = args.exclude_models
|
||||
|
||||
processed_pairs = tracker.filter_by_provenance(
|
||||
processed_pairs,
|
||||
exclude_models=exclude_models,
|
||||
exclude_sources=args.exclude_sources
|
||||
)
|
||||
print(f"After filtering: {len(processed_pairs)} pairs")
|
||||
|
||||
# Output
|
||||
if args.report:
|
||||
# Generate report
|
||||
report = tracker.generate_report()
|
||||
if args.json:
|
||||
print(json.dumps(tracker.stats, indent=2))
|
||||
else:
|
||||
print(report)
|
||||
elif args.output:
|
||||
# Save with provenance
|
||||
save_jsonl(processed_pairs, Path(args.output))
|
||||
print(f"Saved {len(processed_pairs)} pairs to {args.output}")
|
||||
print(tracker.generate_report())
|
||||
else:
|
||||
# Just print report
|
||||
print(tracker.generate_report())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user