Compare commits
2 Commits
fix/621
...
burn/687-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 106da492e2 | |||
| ea51f44866 |
@@ -1,568 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
orchestrator.py — Shared Pipeline Orchestrator
|
||||
|
||||
SQLite-backed job queue with parallel workers, token budget tracking,
|
||||
checkpoint resume, rate limiting, and error retry.
|
||||
|
||||
All 5 pipelines use this orchestrator for consistent execution.
|
||||
|
||||
Usage:
|
||||
python3 orchestrator.py --pipeline training_factory --jobs jobs.jsonl
|
||||
python3 orchestrator.py --pipeline adversary --jobs jobs.jsonl --workers 5
|
||||
python3 orchestrator.py --status
|
||||
python3 orchestrator.py --resume training_factory
|
||||
python3 orchestrator.py --report training_factory
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import sqlite3
|
||||
import hashlib
|
||||
import threading
|
||||
import signal
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict, Any, Callable
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
DB_PATH = Path.home() / ".hermes" / "pipeline" / "orchestrator.db"
|
||||
REPORT_DIR = Path.home() / ".hermes" / "pipeline" / "reports"
|
||||
|
||||
# ============================================================
|
||||
# Data Structures
|
||||
# ============================================================
|
||||
|
||||
@dataclass
|
||||
class JobStatus:
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
RETRYING = "retrying"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineStats:
|
||||
"""Runtime statistics for a pipeline run."""
|
||||
pipeline: str
|
||||
total_jobs: int = 0
|
||||
completed: int = 0
|
||||
failed: int = 0
|
||||
skipped: int = 0
|
||||
tokens_used: int = 0
|
||||
tokens_budget: int = 5_000_000
|
||||
elapsed_seconds: float = 0.0
|
||||
start_time: str = ""
|
||||
jobs_per_minute: float = 0.0
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"pipeline": self.pipeline,
|
||||
"total_jobs": self.total_jobs,
|
||||
"completed": self.completed,
|
||||
"failed": self.failed,
|
||||
"skipped": self.skipped,
|
||||
"tokens_used": self.tokens_used,
|
||||
"tokens_budget": self.tokens_budget,
|
||||
"elapsed_seconds": round(self.elapsed_seconds, 1),
|
||||
"start_time": self.start_time,
|
||||
"jobs_per_minute": round(self.jobs_per_minute, 2),
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Database
|
||||
# ============================================================
|
||||
|
||||
def get_db():
|
||||
"""Get SQLite database connection."""
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH), timeout=30, check_same_thread=False)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
_init_db(conn)
|
||||
return conn
|
||||
|
||||
|
||||
def _init_db(conn):
|
||||
"""Initialize database schema."""
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
pipeline TEXT NOT NULL,
|
||||
job_key TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
status TEXT DEFAULT 'pending',
|
||||
attempts INTEGER DEFAULT 0,
|
||||
max_attempts INTEGER DEFAULT 3,
|
||||
tokens_used INTEGER DEFAULT 0,
|
||||
error TEXT,
|
||||
result TEXT,
|
||||
checkpoint TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
started_at TEXT,
|
||||
completed_at TEXT,
|
||||
UNIQUE(pipeline, job_key)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_status ON jobs(pipeline, status);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_pipeline_key ON jobs(pipeline, job_key);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS pipeline_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
pipeline TEXT NOT NULL,
|
||||
started_at TEXT DEFAULT (datetime('now')),
|
||||
completed_at TEXT,
|
||||
total_jobs INTEGER DEFAULT 0,
|
||||
completed INTEGER DEFAULT 0,
|
||||
failed INTEGER DEFAULT 0,
|
||||
tokens_used INTEGER DEFAULT 0,
|
||||
report TEXT
|
||||
);
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Job Queue
|
||||
# ============================================================
|
||||
|
||||
class JobQueue:
|
||||
"""SQLite-backed job queue."""
|
||||
|
||||
def __init__(self, pipeline: str, conn=None):
|
||||
self.pipeline = pipeline
|
||||
self.conn = conn or get_db()
|
||||
|
||||
def enqueue(self, job_key: str, payload: dict, max_attempts: int = 3):
|
||||
"""Add a job to the queue (skip if already exists)."""
|
||||
try:
|
||||
self.conn.execute(
|
||||
"INSERT INTO jobs (pipeline, job_key, payload, max_attempts) VALUES (?, ?, ?, ?)",
|
||||
(self.pipeline, job_key, json.dumps(payload), max_attempts),
|
||||
)
|
||||
self.conn.commit()
|
||||
return True
|
||||
except sqlite3.IntegrityError:
|
||||
# Already exists — check if it needs retry
|
||||
row = self.conn.execute(
|
||||
"SELECT status FROM jobs WHERE pipeline=? AND job_key=?",
|
||||
(self.pipeline, job_key),
|
||||
).fetchone()
|
||||
if row and row[0] == "failed":
|
||||
# Reset for retry
|
||||
self.conn.execute(
|
||||
"UPDATE jobs SET status='pending', attempts=0, error=NULL WHERE pipeline=? AND job_key=?",
|
||||
(self.pipeline, job_key),
|
||||
)
|
||||
self.conn.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
def enqueue_batch(self, jobs: List[dict], key_field: str = "id"):
|
||||
"""Enqueue multiple jobs. Returns (added, skipped) counts."""
|
||||
added = 0
|
||||
skipped = 0
|
||||
for job in jobs:
|
||||
key = str(job.get(key_field, hashlib.md5(json.dumps(job).encode()).hexdigest()[:12]))
|
||||
if self.enqueue(key, job):
|
||||
added += 1
|
||||
else:
|
||||
skipped += 1
|
||||
return added, skipped
|
||||
|
||||
def claim_next(self) -> Optional[dict]:
|
||||
"""Claim the next pending job (atomic)."""
|
||||
row = self.conn.execute(
|
||||
"""UPDATE jobs SET status='running', started_at=datetime('now')
|
||||
WHERE id = (
|
||||
SELECT id FROM jobs WHERE pipeline=? AND status IN ('pending', 'retrying')
|
||||
ORDER BY attempts ASC, created_at ASC LIMIT 1
|
||||
) RETURNING *""",
|
||||
(self.pipeline,),
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
cols = [d[1] for d in self.conn.execute("PRAGMA table_info(jobs)").fetchall()]
|
||||
return dict(zip(cols, row))
|
||||
|
||||
def complete(self, job_key: str, result: dict, tokens_used: int = 0):
|
||||
"""Mark a job as completed."""
|
||||
self.conn.execute(
|
||||
"""UPDATE jobs SET status='completed', completed_at=datetime('now'),
|
||||
result=?, tokens_used=? WHERE pipeline=? AND job_key=?""",
|
||||
(json.dumps(result), tokens_used, self.pipeline, job_key),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def fail(self, job_key: str, error: str, retry: bool = True):
|
||||
"""Mark a job as failed, optionally retry."""
|
||||
row = self.conn.execute(
|
||||
"SELECT attempts, max_attempts FROM jobs WHERE pipeline=? AND job_key=?",
|
||||
(self.pipeline, job_key),
|
||||
).fetchone()
|
||||
|
||||
if not row:
|
||||
return
|
||||
|
||||
attempts, max_attempts = row
|
||||
new_attempts = attempts + 1
|
||||
|
||||
if retry and new_attempts < max_attempts:
|
||||
# Exponential backoff: 2^attempts seconds
|
||||
delay = min(2 ** new_attempts, 60)
|
||||
self.conn.execute(
|
||||
"""UPDATE jobs SET status='retrying', attempts=?, error=?
|
||||
WHERE pipeline=? AND job_key=?""",
|
||||
(new_attempts, error, self.pipeline, job_key),
|
||||
)
|
||||
else:
|
||||
self.conn.execute(
|
||||
"""UPDATE jobs SET status='failed', attempts=?, error=?,
|
||||
completed_at=datetime('now') WHERE pipeline=? AND job_key=?""",
|
||||
(new_attempts, error, self.pipeline, job_key),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def save_checkpoint(self, job_key: str, checkpoint: dict):
|
||||
"""Save progress checkpoint for resume."""
|
||||
self.conn.execute(
|
||||
"UPDATE jobs SET checkpoint=? WHERE pipeline=? AND job_key=?",
|
||||
(json.dumps(checkpoint), self.pipeline, job_key),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def get_checkpoint(self, job_key: str) -> Optional[dict]:
|
||||
"""Get saved checkpoint."""
|
||||
row = self.conn.execute(
|
||||
"SELECT checkpoint FROM jobs WHERE pipeline=? AND job_key=?",
|
||||
(self.pipeline, job_key),
|
||||
).fetchone()
|
||||
if row and row[0]:
|
||||
return json.loads(row[0])
|
||||
return None
|
||||
|
||||
def stats(self) -> dict:
|
||||
"""Get queue statistics."""
|
||||
rows = self.conn.execute(
|
||||
"""SELECT status, COUNT(*), COALESCE(SUM(tokens_used), 0)
|
||||
FROM jobs WHERE pipeline=? GROUP BY status""",
|
||||
(self.pipeline,),
|
||||
).fetchall()
|
||||
|
||||
result = {"total": 0, "tokens_used": 0}
|
||||
for status, count, tokens in rows:
|
||||
result[status] = count
|
||||
result["total"] += count
|
||||
result["tokens_used"] += tokens
|
||||
return result
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Orchestrator
|
||||
# ============================================================
|
||||
|
||||
class Orchestrator:
|
||||
"""
|
||||
Shared orchestrator for all pipelines.
|
||||
|
||||
Features:
|
||||
- Parallel worker pool (configurable)
|
||||
- Token budget tracking
|
||||
- Checkpoint resume
|
||||
- Rate limiting
|
||||
- Error retry with exponential backoff
|
||||
- Final report generation
|
||||
"""
|
||||
|
||||
def __init__(self, pipeline: str, workers: int = 10, token_budget: int = 5_000_000):
|
||||
self.pipeline = pipeline
|
||||
self.workers = workers
|
||||
self.token_budget = token_budget
|
||||
self.queue = JobQueue(pipeline)
|
||||
self.conn = self.queue.conn
|
||||
self._shutdown = False
|
||||
self._stats = PipelineStats(pipeline=pipeline, tokens_budget=token_budget)
|
||||
self._rate_limit_delay = 0.1 # seconds between jobs
|
||||
self._response_cache: Dict[str, dict] = {}
|
||||
|
||||
signal.signal(signal.SIGINT, self._handle_signal)
|
||||
signal.signal(signal.SIGTERM, self._handle_signal)
|
||||
|
||||
def _handle_signal(self, signum, frame):
|
||||
"""Graceful shutdown on signal."""
|
||||
print(f"\nReceived signal {signum}. Shutting down gracefully...")
|
||||
self._shutdown = True
|
||||
|
||||
def load_jobs(self, jobs_path: str, key_field: str = "id"):
|
||||
"""Load jobs from a JSONL file into the queue."""
|
||||
jobs = []
|
||||
with open(jobs_path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
jobs.append(json.loads(line))
|
||||
|
||||
added, skipped = self.queue.enqueue_batch(jobs, key_field)
|
||||
print(f"Loaded: {added} new, {skipped} existing")
|
||||
|
||||
def run(self, job_handler: Callable[[dict], dict] = None):
|
||||
"""
|
||||
Run the orchestrator. Processes all pending jobs with parallel workers.
|
||||
|
||||
Args:
|
||||
job_handler: function(job_payload) -> dict with 'tokens_used' key
|
||||
"""
|
||||
start = time.time()
|
||||
self._stats.start_time = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Record run
|
||||
self.conn.execute(
|
||||
"INSERT INTO pipeline_runs (pipeline, started_at) VALUES (?, ?)",
|
||||
(self.pipeline, self._stats.start_time),
|
||||
)
|
||||
run_id = self.conn.execute("SELECT last_insert_rowid()").fetchone()[0]
|
||||
self.conn.commit()
|
||||
|
||||
stats = self.queue.stats()
|
||||
self._stats.total_jobs = stats.get("pending", 0) + stats.get("retrying", 0)
|
||||
print(f"\nPipeline: {self.pipeline}")
|
||||
print(f"Jobs: {self._stats.total_jobs} pending | Workers: {self.workers} | Budget: {self.token_budget:,} tokens")
|
||||
print()
|
||||
|
||||
if self._stats.total_jobs == 0:
|
||||
print("No jobs to process.")
|
||||
return
|
||||
|
||||
completed = 0
|
||||
failed = 0
|
||||
skipped = 0
|
||||
tokens_used = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=self.workers) as executor:
|
||||
futures = {}
|
||||
|
||||
while not self._shutdown:
|
||||
# Check token budget
|
||||
if tokens_used >= self.token_budget:
|
||||
print(f"Token budget exhausted ({tokens_used:,}/{self.token_budget:,})")
|
||||
break
|
||||
|
||||
# Fill worker pool
|
||||
while len(futures) < self.workers and not self._shutdown:
|
||||
job = self.queue.claim_next()
|
||||
if not job:
|
||||
break
|
||||
|
||||
# Check response cache (zero-token retries)
|
||||
job_key = job["job_key"]
|
||||
payload = json.loads(job["payload"])
|
||||
cache_key = hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()
|
||||
|
||||
if cache_key in self._response_cache:
|
||||
result = self._response_cache[cache_key]
|
||||
self.queue.complete(job_key, result, tokens_used=0)
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Submit to worker
|
||||
future = executor.submit(self._process_job, job, job_handler)
|
||||
futures[future] = job
|
||||
|
||||
# Rate limiting
|
||||
time.sleep(self._rate_limit_delay)
|
||||
|
||||
if not futures:
|
||||
break
|
||||
|
||||
# Collect results
|
||||
done = []
|
||||
for future in as_completed(futures, timeout=1):
|
||||
job = futures[future]
|
||||
try:
|
||||
result = future.result()
|
||||
if result.get("success"):
|
||||
tokens = result.get("tokens_used", 0)
|
||||
tokens_used += tokens
|
||||
self.queue.complete(job["job_key"], result, tokens_used=tokens)
|
||||
completed += 1
|
||||
else:
|
||||
error = result.get("error", "unknown error")
|
||||
self.queue.fail(job["job_key"], error, retry=True)
|
||||
failed += 1
|
||||
except Exception as e:
|
||||
self.queue.fail(job["job_key"], str(e), retry=True)
|
||||
failed += 1
|
||||
|
||||
done.append(future)
|
||||
|
||||
# Progress update
|
||||
total = completed + failed + skipped
|
||||
if total % 10 == 0:
|
||||
elapsed = time.time() - start
|
||||
rate = completed / (elapsed / 60) if elapsed > 0 else 0
|
||||
print(f" Progress: {total}/{self._stats.total_jobs} | "
|
||||
f"completed={completed} failed={failed} | "
|
||||
f"tokens={tokens_used:,} | "
|
||||
f"{rate:.1f}/min")
|
||||
|
||||
for f in done:
|
||||
del futures[f]
|
||||
|
||||
# Final report
|
||||
elapsed = time.time() - start
|
||||
self._stats.completed = completed
|
||||
self._stats.failed = failed
|
||||
self._stats.skipped = skipped
|
||||
self._stats.tokens_used = tokens_used
|
||||
self._stats.elapsed_seconds = elapsed
|
||||
self._stats.jobs_per_minute = completed / (elapsed / 60) if elapsed > 0 else 0
|
||||
|
||||
# Save run
|
||||
self.conn.execute(
|
||||
"""UPDATE pipeline_runs SET completed_at=?, total_jobs=?, completed=?,
|
||||
failed=?, tokens_used=?, report=? WHERE id=?""",
|
||||
(datetime.now(timezone.utc).isoformat(), self._stats.total_jobs,
|
||||
completed, failed, tokens_used, json.dumps(self._stats.to_dict()), run_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
# Print report
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Pipeline: {self.pipeline}")
|
||||
print(f"Completed: {completed}/{self._stats.total_jobs}")
|
||||
print(f"Failed: {failed}")
|
||||
print(f"Skipped (cached): {skipped}")
|
||||
print(f"Tokens: {tokens_used:,}/{self.token_budget:,}")
|
||||
print(f"Time: {elapsed:.1f}s ({self._stats.jobs_per_minute:.1f}/min)")
|
||||
print(f"{'='*50}")
|
||||
|
||||
# Save report file
|
||||
self._save_report()
|
||||
|
||||
def _process_job(self, job: dict, handler: Callable = None) -> dict:
|
||||
"""Process a single job."""
|
||||
payload = json.loads(job["payload"])
|
||||
job_key = job["job_key"]
|
||||
checkpoint = self.queue.get_checkpoint(job_key)
|
||||
|
||||
if handler:
|
||||
try:
|
||||
result = handler(payload, checkpoint=checkpoint)
|
||||
return result or {"success": True, "tokens_used": 0}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
else:
|
||||
# Default handler: just mark as complete
|
||||
return {"success": True, "tokens_used": 0}
|
||||
|
||||
def _save_report(self):
|
||||
"""Save pipeline run report."""
|
||||
REPORT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
path = REPORT_DIR / f"{self.pipeline}_{ts}.json"
|
||||
with open(path, "w") as f:
|
||||
json.dump(self._stats.to_dict(), f, indent=2)
|
||||
print(f"Report: {path}")
|
||||
|
||||
def resume(self):
|
||||
"""Resume failed/retrying jobs from a previous run."""
|
||||
stats = self.queue.stats()
|
||||
retrying = stats.get("retrying", 0)
|
||||
failed = stats.get("failed", 0)
|
||||
print(f"Resume {self.pipeline}: {retrying} retrying, {failed} failed to reset")
|
||||
|
||||
# Reset failed jobs to pending for retry
|
||||
self.conn.execute(
|
||||
"UPDATE jobs SET status='pending', attempts=0 WHERE pipeline=? AND status='failed'",
|
||||
(self.pipeline,),
|
||||
)
|
||||
self.conn.execute(
|
||||
"UPDATE jobs SET status='pending' WHERE pipeline=? AND status='retrying'",
|
||||
(self.pipeline,),
|
||||
)
|
||||
self.conn.commit()
|
||||
|
||||
def status(self):
|
||||
"""Show pipeline status."""
|
||||
stats = self.queue.stats()
|
||||
print(f"\nPipeline: {self.pipeline}")
|
||||
for k, v in sorted(stats.items()):
|
||||
print(f" {k}: {v}")
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CLI
|
||||
# ============================================================
|
||||
|
||||
def show_all_status():
|
||||
"""Show status of all pipelines."""
|
||||
conn = get_db()
|
||||
pipelines = conn.execute(
|
||||
"SELECT DISTINCT pipeline FROM jobs ORDER BY pipeline"
|
||||
).fetchall()
|
||||
|
||||
if not pipelines:
|
||||
print("No pipelines in database.")
|
||||
return
|
||||
|
||||
print(f"\nAll Pipeline Status")
|
||||
print(f"{'='*60}")
|
||||
|
||||
for (pipeline,) in pipelines:
|
||||
queue = JobQueue(pipeline, conn)
|
||||
stats = queue.stats()
|
||||
total = stats.get("total", 0)
|
||||
pending = stats.get("pending", 0)
|
||||
running = stats.get("running", 0)
|
||||
completed = stats.get("completed", 0)
|
||||
failed = stats.get("failed", 0)
|
||||
tokens = stats.get("tokens_used", 0)
|
||||
print(f" {pipeline:25} total={total:4} pending={pending:3} running={running:2} "
|
||||
f"completed={completed:4} failed={failed:3} tokens={tokens:,}")
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Shared Pipeline Orchestrator")
|
||||
parser.add_argument("--pipeline", "-p", help="Pipeline name")
|
||||
parser.add_argument("--jobs", "-j", help="Jobs JSONL file to load")
|
||||
parser.add_argument("--workers", "-w", type=int, default=10, help="Parallel workers")
|
||||
parser.add_argument("--budget", "-b", type=int, default=5_000_000, help="Token budget")
|
||||
parser.add_argument("--status", action="store_true", help="Show status")
|
||||
parser.add_argument("--resume", action="store_true", help="Resume failed jobs")
|
||||
parser.add_argument("--key-field", default="id", help="Job key field name")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.status:
|
||||
if args.pipeline:
|
||||
orch = Orchestrator(args.pipeline)
|
||||
orch.status()
|
||||
else:
|
||||
show_all_status()
|
||||
return
|
||||
|
||||
if not args.pipeline:
|
||||
parser.error("--pipeline is required")
|
||||
|
||||
orch = Orchestrator(args.pipeline, workers=args.workers, token_budget=args.budget)
|
||||
|
||||
if args.jobs:
|
||||
orch.load_jobs(args.jobs, key_field=args.key_field)
|
||||
|
||||
if args.resume:
|
||||
orch.resume()
|
||||
|
||||
if args.jobs or args.resume:
|
||||
orch.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
389
scripts/filter_training_data.py
Normal file
389
scripts/filter_training_data.py
Normal file
@@ -0,0 +1,389 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Training Data Quality Filter (#687)
|
||||
|
||||
Scores and removes low-quality training pairs from JSONL files.
|
||||
Supports: ShareGPT format, preference pairs, generic JSONL.
|
||||
|
||||
Usage:
|
||||
python3 scripts/filter_training_data.py <input.jsonl> [--output filtered.jsonl]
|
||||
python3 scripts/filter_training_data.py training/data/preference_pairs.jsonl
|
||||
python3 scripts/filter_training_data.py training/data/curated_dataset.jsonl --threshold 0.3
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
# ============================================================
|
||||
# QUALITY SCORING
|
||||
# ============================================================
|
||||
|
||||
# Generic filler phrases that indicate low-quality responses
|
||||
FILLER_PHRASES = [
|
||||
"as an ai", "i'm an ai", "as a language model", "i don't have personal",
|
||||
"i cannot", "i can't", "it's important to note", "please note that",
|
||||
"in conclusion", "to summarize", "in summary", "hope this helps",
|
||||
"let me know if", "feel free to", "i'd be happy to", "certainly!",
|
||||
"of course!", "absolutely!", "great question!", "that's a great",
|
||||
"i understand your", "i appreciate your", "thank you for asking",
|
||||
"it depends", "there are many ways", "various factors",
|
||||
]
|
||||
|
||||
# Vague/generic short responses
|
||||
VAGUE_RESPONSES = [
|
||||
"ok", "okay", "sure", "yes", "no", "maybe", "idk", "i don't know",
|
||||
"thanks", "thank you", "got it", "understood", "right", "correct",
|
||||
"hello", "hi", "hey", "goodbye", "bye",
|
||||
]
|
||||
|
||||
CODE_BLOCK_PATTERN = re.compile(r"```(?:\w+)?\n(.+?)```", re.DOTALL)
|
||||
INLINE_CODE_PATTERN = re.compile(r"`([^`]+)`")
|
||||
|
||||
|
||||
def detect_format(record: dict) -> str:
|
||||
"""Detect the training data format of a record."""
|
||||
if "conversations" in record:
|
||||
return "sharegpt"
|
||||
if "prompt" in record and "chosen" in record:
|
||||
return "preference"
|
||||
if "scene" in record and "lyric_line" in record:
|
||||
return "scene"
|
||||
if "terse" in record and "rich" in record:
|
||||
return "pairs"
|
||||
return "generic"
|
||||
|
||||
|
||||
def extract_text_fields(record: dict, fmt: str) -> Tuple[str, str]:
|
||||
"""Extract (input_text, output_text) from a record based on format."""
|
||||
if fmt == "sharegpt":
|
||||
convs = record.get("conversations", [])
|
||||
human_msgs = [c["value"] for c in convs if c.get("from") == "human"]
|
||||
gpt_msgs = [c["value"] for c in convs if c.get("from") == "gpt"]
|
||||
input_text = human_msgs[-1] if human_msgs else ""
|
||||
output_text = gpt_msgs[-1] if gpt_msgs else ""
|
||||
return input_text, output_text
|
||||
|
||||
elif fmt == "preference":
|
||||
return record.get("prompt", ""), record.get("chosen", "")
|
||||
|
||||
elif fmt == "scene":
|
||||
return record.get("lyric_line", ""), record.get("scene", {}).get("description", "")
|
||||
|
||||
elif fmt == "pairs":
|
||||
return record.get("terse", ""), record.get("rich", "")
|
||||
|
||||
else:
|
||||
# Generic: try common field names
|
||||
input_text = record.get("input", record.get("prompt", record.get("question", "")))
|
||||
output_text = record.get("output", record.get("response", record.get("answer", "")))
|
||||
return str(input_text), str(output_text)
|
||||
|
||||
|
||||
def score_specificity(text: str) -> float:
|
||||
"""Score 0-1 how specific/detailed a response is vs generic filler."""
|
||||
if not text or not text.strip():
|
||||
return 0.0
|
||||
|
||||
text_lower = text.lower().strip()
|
||||
score = 0.5 # baseline
|
||||
|
||||
# Penalize filler phrases
|
||||
filler_count = sum(1 for phrase in FILLER_PHRASES if phrase in text_lower)
|
||||
score -= filler_count * 0.08
|
||||
|
||||
# Penalize very short responses
|
||||
word_count = len(text.split())
|
||||
if word_count < 5:
|
||||
score -= 0.3
|
||||
elif word_count < 10:
|
||||
score -= 0.15
|
||||
elif word_count > 30:
|
||||
score += 0.1 # longer responses tend to be more detailed
|
||||
|
||||
# Penalize vague single-word responses
|
||||
if text_lower.strip() in VAGUE_RESPONSES:
|
||||
score -= 0.4
|
||||
|
||||
# Reward specificity indicators
|
||||
specificity_markers = [
|
||||
r"\d+", # numbers
|
||||
r"```", # code blocks
|
||||
r"https?://", # URLs
|
||||
r"\$\{", r"\w+\.\w+", # code-like patterns
|
||||
r"(?:specifically|exactly|precisely|in particular)",
|
||||
r"(?:step \d|first,|second,|third,|finally,)",
|
||||
]
|
||||
for pattern in specificity_markers:
|
||||
if re.search(pattern, text):
|
||||
score += 0.05
|
||||
|
||||
# Reward code presence
|
||||
if "```" in text:
|
||||
score += 0.15
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
|
||||
def score_length_ratio(input_text: str, output_text: str) -> float:
|
||||
"""Score 0-1 based on reasonable length ratio between input and output."""
|
||||
in_len = len(input_text.split())
|
||||
out_len = len(output_text.split())
|
||||
|
||||
if in_len == 0 and out_len == 0:
|
||||
return 0.0
|
||||
if out_len == 0:
|
||||
return 0.0
|
||||
|
||||
# Ideal ratio: output 0.5x to 10x input length
|
||||
# Too short output for long input = bad
|
||||
# Too long output for short input = acceptable (detailed answer)
|
||||
if in_len > 0:
|
||||
ratio = out_len / in_len
|
||||
else:
|
||||
ratio = out_len / 10 # normalize when no input
|
||||
|
||||
if ratio < 0.05:
|
||||
return 0.1 # output way too short
|
||||
elif ratio < 0.2:
|
||||
return 0.3
|
||||
elif ratio < 0.5:
|
||||
return 0.6
|
||||
elif ratio <= 15:
|
||||
return 1.0 # sweet spot
|
||||
elif ratio <= 50:
|
||||
return 0.8
|
||||
else:
|
||||
return 0.5 # extremely long output, maybe noise
|
||||
|
||||
|
||||
def score_code_correctness(text: str) -> float:
|
||||
"""Score 0-1 for code correctness if code blocks are present."""
|
||||
code_blocks = CODE_BLOCK_PATTERN.findall(text)
|
||||
|
||||
if not code_blocks:
|
||||
return 1.0 # no code, not penalized
|
||||
|
||||
total = len(code_blocks)
|
||||
valid = 0
|
||||
|
||||
for code in code_blocks:
|
||||
# Try Python syntax check
|
||||
try:
|
||||
ast.parse(code)
|
||||
valid += 1
|
||||
continue
|
||||
except SyntaxError:
|
||||
pass
|
||||
|
||||
# Try JavaScript basic check (balanced braces/parens)
|
||||
if _check_brackets_balanced(code):
|
||||
valid += 0.8
|
||||
continue
|
||||
|
||||
# JSON check
|
||||
try:
|
||||
json.loads(code)
|
||||
valid += 1
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Shell/YAML: just check it's not empty garbage
|
||||
if len(code.strip()) > 10 and "\n" in code:
|
||||
valid += 0.5
|
||||
|
||||
return valid / total if total > 0 else 1.0
|
||||
|
||||
|
||||
def _check_brackets_balanced(code: str) -> bool:
|
||||
"""Check if brackets are balanced in code."""
|
||||
stack = []
|
||||
pairs = {"(": ")", "[": "]", "{": "}"}
|
||||
for ch in code:
|
||||
if ch in pairs:
|
||||
stack.append(pairs[ch])
|
||||
elif ch in pairs.values():
|
||||
if not stack or stack[-1] != ch:
|
||||
return False
|
||||
stack.pop()
|
||||
return len(stack) == 0
|
||||
|
||||
|
||||
def score_record(record: dict, fmt: str) -> Dict[str, float]:
|
||||
"""Score a single training record. Returns dict of component scores."""
|
||||
input_text, output_text = extract_text_fields(record, fmt)
|
||||
|
||||
specificity = score_specificity(output_text)
|
||||
length_ratio = score_length_ratio(input_text, output_text)
|
||||
code_correctness = score_code_correctness(output_text)
|
||||
|
||||
# Weighted composite
|
||||
composite = (
|
||||
specificity * 0.45 +
|
||||
length_ratio * 0.25 +
|
||||
code_correctness * 0.30
|
||||
)
|
||||
|
||||
return {
|
||||
"specificity": round(specificity, 3),
|
||||
"length_ratio": round(length_ratio, 3),
|
||||
"code_correctness": round(code_correctness, 3),
|
||||
"composite": round(composite, 3),
|
||||
}
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FILTERING
|
||||
# ============================================================
|
||||
|
||||
def filter_jsonl(
|
||||
input_path: str,
|
||||
output_path: Optional[str] = None,
|
||||
threshold: float = 0.3,
|
||||
dry_run: bool = False,
|
||||
verbose: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Filter a JSONL file, removing low-quality records."""
|
||||
|
||||
if output_path is None:
|
||||
stem = Path(input_path).stem
|
||||
output_path = str(Path(input_path).parent / f"{stem}_filtered.jsonl")
|
||||
|
||||
records = []
|
||||
with open(input_path, "r", encoding="utf-8") as f:
|
||||
for i, line in enumerate(f):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
records.append(json.loads(line))
|
||||
except json.JSONDecodeError as e:
|
||||
print(f" [WARN] Line {i+1}: invalid JSON, skipping: {e}", file=sys.stderr)
|
||||
|
||||
if not records:
|
||||
return {"error": "No valid records found", "total": 0}
|
||||
|
||||
# Detect format from first record
|
||||
fmt = detect_format(records[0])
|
||||
print(f" Detected format: {fmt}")
|
||||
print(f" Total records: {len(records)}")
|
||||
|
||||
# Score all records
|
||||
scored = []
|
||||
for i, record in enumerate(records):
|
||||
scores = score_record(record, fmt)
|
||||
scored.append((record, scores, i))
|
||||
|
||||
# Sort by composite score
|
||||
scored.sort(key=lambda x: x[1]["composite"])
|
||||
|
||||
# Filter
|
||||
kept = [(r, s, i) for r, s, i in scored if s["composite"] >= threshold]
|
||||
removed = [(r, s, i) for r, s, i in scored if s["composite"] < threshold]
|
||||
|
||||
# Report
|
||||
report = {
|
||||
"input_file": input_path,
|
||||
"output_file": output_path,
|
||||
"format": fmt,
|
||||
"total_records": len(records),
|
||||
"kept": len(kept),
|
||||
"removed": len(removed),
|
||||
"threshold": threshold,
|
||||
"removal_rate": f"{len(removed) / len(records) * 100:.1f}%",
|
||||
"score_distribution": {
|
||||
"min": scored[0][1]["composite"] if scored else 0,
|
||||
"max": scored[-1][1]["composite"] if scored else 0,
|
||||
"median": scored[len(scored)//2][1]["composite"] if scored else 0,
|
||||
"mean": round(sum(s["composite"] for _, s, _ in scored) / len(scored), 3) if scored else 0,
|
||||
},
|
||||
"removed_score_breakdown": {
|
||||
"specificity_below_0.3": sum(1 for _, s, _ in removed if s["specificity"] < 0.3),
|
||||
"length_ratio_below_0.3": sum(1 for _, s, _ in removed if s["length_ratio"] < 0.3),
|
||||
"code_correctness_below_0.5": sum(1 for _, s, _ in removed if s["code_correctness"] < 0.5),
|
||||
},
|
||||
}
|
||||
|
||||
# Show worst offenders if verbose
|
||||
if verbose and removed:
|
||||
print(f"\n Worst 5 records (by composite score):")
|
||||
for r, s, i in removed[:5]:
|
||||
_, output_text = extract_text_fields(r, fmt)
|
||||
preview = output_text[:80].replace("\n", " ") if output_text else "(empty)"
|
||||
print(f" [{s['composite']:.3f}] {preview}...")
|
||||
|
||||
# Write output (unless dry run)
|
||||
if not dry_run:
|
||||
# Preserve original order, only keeping filtered records
|
||||
kept_indices = {i for _, _, i in kept}
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
for i, record in enumerate(records):
|
||||
if i in kept_indices:
|
||||
f.write(json.dumps(record, ensure_ascii=False) + "\n")
|
||||
print(f"\n Written: {output_path}")
|
||||
|
||||
return report
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CLI
|
||||
# ============================================================
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Training data quality filter — remove low-quality pairs (#687)"
|
||||
)
|
||||
parser.add_argument("input", help="Input JSONL file path")
|
||||
parser.add_argument("--output", "-o", help="Output file path (default: <input>_filtered.jsonl)")
|
||||
parser.add_argument("--threshold", "-t", type=float, default=0.3,
|
||||
help="Minimum composite score to keep (default: 0.3)")
|
||||
parser.add_argument("--dry-run", "-n", action="store_true",
|
||||
help="Score only, don't write output")
|
||||
parser.add_argument("--verbose", "-v", action="store_true",
|
||||
help="Show worst offenders")
|
||||
parser.add_argument("--report-json", "-j", help="Write report as JSON to file")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.exists(args.input):
|
||||
print(f"Error: {args.input} not found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Filtering: {args.input}")
|
||||
print(f"Threshold: {args.threshold}")
|
||||
print()
|
||||
|
||||
report = filter_jsonl(
|
||||
args.input,
|
||||
output_path=args.output,
|
||||
threshold=args.threshold,
|
||||
dry_run=args.dry_run,
|
||||
verbose=args.verbose,
|
||||
)
|
||||
|
||||
print(f"\n{'=' * 50}")
|
||||
print(f" RESULTS")
|
||||
print(f"{'=' * 50}")
|
||||
print(f" Format: {report['format']}")
|
||||
print(f" Total: {report['total_records']}")
|
||||
print(f" Kept: {report['kept']}")
|
||||
print(f" Removed: {report['removed']} ({report['removal_rate']})")
|
||||
print(f" Threshold: {report['threshold']}")
|
||||
print(f" Score range: {report['score_distribution']['min']:.3f} - {report['score_distribution']['max']:.3f}")
|
||||
print(f" Mean score: {report['score_distribution']['mean']:.3f}")
|
||||
|
||||
if args.report_json:
|
||||
with open(args.report_json, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"\n Report saved: {args.report_json}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
192
tests/test_filter_training_data.py
Normal file
192
tests/test_filter_training_data.py
Normal file
@@ -0,0 +1,192 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for training data quality filter (#687).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
# Import from the script
|
||||
import sys
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
|
||||
from filter_training_data import (
|
||||
detect_format,
|
||||
extract_text_fields,
|
||||
score_specificity,
|
||||
score_length_ratio,
|
||||
score_code_correctness,
|
||||
score_record,
|
||||
filter_jsonl,
|
||||
FILLER_PHRASES,
|
||||
VAGUE_RESPONSES,
|
||||
)
|
||||
|
||||
|
||||
class TestFormatDetection(unittest.TestCase):
|
||||
def test_sharegpt_format(self):
|
||||
record = {"conversations": [{"from": "human", "value": "hi"}]}
|
||||
self.assertEqual(detect_format(record), "sharegpt")
|
||||
|
||||
def test_preference_format(self):
|
||||
record = {"prompt": "do X", "chosen": "done", "rejected": "no"}
|
||||
self.assertEqual(detect_format(record), "preference")
|
||||
|
||||
def test_scene_format(self):
|
||||
record = {"lyric_line": "test", "scene": {"description": "desc"}}
|
||||
self.assertEqual(detect_format(record), "scene")
|
||||
|
||||
def test_pairs_format(self):
|
||||
record = {"terse": "short", "rich": "detailed"}
|
||||
self.assertEqual(detect_format(record), "pairs")
|
||||
|
||||
def test_generic_format(self):
|
||||
record = {"input": "q", "output": "a"}
|
||||
self.assertEqual(detect_format(record), "generic")
|
||||
|
||||
|
||||
class TestExtractTextFields(unittest.TestCase):
|
||||
def test_sharegpt_extraction(self):
|
||||
record = {
|
||||
"conversations": [
|
||||
{"from": "system", "value": "system prompt"},
|
||||
{"from": "human", "value": "hello"},
|
||||
{"from": "gpt", "value": "hi there"},
|
||||
]
|
||||
}
|
||||
inp, out = extract_text_fields(record, "sharegpt")
|
||||
self.assertEqual(inp, "hello")
|
||||
self.assertEqual(out, "hi there")
|
||||
|
||||
def test_preference_extraction(self):
|
||||
record = {"prompt": "question", "chosen": "good answer"}
|
||||
inp, out = extract_text_fields(record, "preference")
|
||||
self.assertEqual(inp, "question")
|
||||
self.assertEqual(out, "good answer")
|
||||
|
||||
|
||||
class TestSpecificityScoring(unittest.TestCase):
|
||||
def test_empty_text(self):
|
||||
self.assertEqual(score_specificity(""), 0.0)
|
||||
|
||||
def test_filler_heavy(self):
|
||||
text = "As an AI, I cannot provide that. It's important to note that I'm an AI."
|
||||
score = score_specificity(text)
|
||||
self.assertLess(score, 0.3)
|
||||
|
||||
def test_vague_response(self):
|
||||
score = score_specificity("ok")
|
||||
self.assertLess(score, 0.2)
|
||||
|
||||
def test_specific_response(self):
|
||||
text = "Here are the steps:\n1. First, install Python 3.12\n2. Run `pip install numpy`\n3. Execute main.py"
|
||||
score = score_specificity(text)
|
||||
self.assertGreater(score, 0.5)
|
||||
|
||||
def test_code_response(self):
|
||||
text = "Use this:\n```python\ndef hello():\n print('world')\n```"
|
||||
score = score_specificity(text)
|
||||
self.assertGreater(score, 0.6)
|
||||
|
||||
|
||||
class TestLengthRatio(unittest.TestCase):
|
||||
def test_both_empty(self):
|
||||
self.assertEqual(score_length_ratio("", ""), 0.0)
|
||||
|
||||
def test_empty_output(self):
|
||||
self.assertEqual(score_length_ratio("hello world", ""), 0.0)
|
||||
|
||||
def test_good_ratio(self):
|
||||
score = score_length_ratio("short question", "This is a reasonable length answer that addresses the question.")
|
||||
self.assertGreater(score, 0.7)
|
||||
|
||||
def test_too_short_output(self):
|
||||
score = score_length_ratio("This is a very long question with many words that expects a detailed answer", "ok")
|
||||
self.assertLess(score, 0.5)
|
||||
|
||||
|
||||
class TestCodeCorrectness(unittest.TestCase):
|
||||
def test_no_code(self):
|
||||
self.assertEqual(score_code_correctness("plain text"), 1.0)
|
||||
|
||||
def test_valid_python(self):
|
||||
text = "```python\ndef foo():\n return 42\n```"
|
||||
self.assertEqual(score_code_correctness(text), 1.0)
|
||||
|
||||
def test_invalid_python(self):
|
||||
text = "```python\ndef foo(\n return 42\n```"
|
||||
score = score_code_correctness(text)
|
||||
self.assertLess(score, 1.0)
|
||||
|
||||
def test_valid_json(self):
|
||||
text = "```json\n{\"key\": \"value\"}\n```"
|
||||
self.assertEqual(score_code_correctness(text), 1.0)
|
||||
|
||||
|
||||
class TestFilterJsonl(unittest.TestCase):
|
||||
def _write_temp_jsonl(self, records):
|
||||
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||
for r in records:
|
||||
f.write(json.dumps(r) + "\n")
|
||||
f.close()
|
||||
return f.name
|
||||
|
||||
def test_filter_removes_low_quality(self):
|
||||
records = [
|
||||
{"conversations": [
|
||||
{"from": "human", "value": "How do I sort a list in Python?"},
|
||||
{"from": "gpt", "value": "Use `sorted()` or `list.sort()`.\n```python\nnums = [3,1,2]\nnums.sort()\nprint(nums) # [1, 2, 3]\n```"},
|
||||
]},
|
||||
{"conversations": [
|
||||
{"from": "human", "value": "What is Python?"},
|
||||
{"from": "gpt", "value": "ok"},
|
||||
]},
|
||||
{"conversations": [
|
||||
{"from": "human", "value": "Tell me about databases."},
|
||||
{"from": "gpt", "value": "As an AI, I cannot. It's important to note."},
|
||||
]},
|
||||
]
|
||||
path = self._write_temp_jsonl(records)
|
||||
try:
|
||||
report = filter_jsonl(path, threshold=0.3)
|
||||
self.assertEqual(report["total_records"], 3)
|
||||
self.assertGreater(report["kept"], 0)
|
||||
self.assertGreater(report["removed"], 0)
|
||||
self.assertEqual(report["format"], "sharegpt")
|
||||
finally:
|
||||
os.unlink(path)
|
||||
if os.path.exists(report.get("output_file", "")):
|
||||
os.unlink(report["output_file"])
|
||||
|
||||
def test_dry_run_no_output(self):
|
||||
records = [
|
||||
{"prompt": "test", "chosen": "good detailed answer with code: `print(1)`", "rejected": "no"},
|
||||
]
|
||||
path = self._write_temp_jsonl(records)
|
||||
try:
|
||||
out_path = path.replace(".jsonl", "_filtered.jsonl")
|
||||
report = filter_jsonl(path, threshold=0.3, dry_run=True)
|
||||
self.assertFalse(os.path.exists(out_path))
|
||||
self.assertEqual(report["total_records"], 1)
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
def test_preference_format(self):
|
||||
records = [
|
||||
{"prompt": "Write a function", "chosen": "```python\ndef f(): pass\n```", "rejected": ""},
|
||||
{"prompt": "Hi", "chosen": "ok", "rejected": "no"},
|
||||
]
|
||||
path = self._write_temp_jsonl(records)
|
||||
try:
|
||||
report = filter_jsonl(path, threshold=0.3)
|
||||
self.assertEqual(report["format"], "preference")
|
||||
self.assertEqual(report["total_records"], 2)
|
||||
finally:
|
||||
os.unlink(path)
|
||||
if os.path.exists(report.get("output_file", "")):
|
||||
os.unlink(report["output_file"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user