Compare commits
15 Commits
fix/660-py
...
fix/689-sc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2e61f6def | ||
| d1486b52e8 | |||
|
|
19db78bbf0 | ||
| b3eba66a07 | |||
| 61bb221ff2 | |||
| 729db767d1 | |||
| d4dedd2c3d | |||
| 0e2e2c1552 | |||
| bee4d02dd5 | |||
| a0266c83a4 | |||
| b28071bb71 | |||
|
|
8e791afecc | ||
|
|
6fcd2cc59a | ||
|
|
edd35eaa4b | ||
| 04ecad3b43 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -37,3 +37,4 @@ reports/
|
||||
|
||||
# Prevent test artifacts
|
||||
/test-*.txt
|
||||
.DS_Store
|
||||
|
||||
@@ -156,13 +156,12 @@ def generate_report(results: list[dict]) -> str:
|
||||
|
||||
lines = []
|
||||
lines.append("# Harm Facilitation Adversary Report")
|
||||
lines.append(f"
|
||||
Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
|
||||
lines.append(f"
|
||||
## Summary
|
||||
")
|
||||
lines.append(f"| Metric | Value |")
|
||||
lines.append(f"|--------|-------|")
|
||||
lines.append(f"Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
|
||||
lines.append("")
|
||||
lines.append("## Summary")
|
||||
lines.append("")
|
||||
lines.append("| Metric | Value |")
|
||||
lines.append("|--------|-------|")
|
||||
lines.append(f"| Total prompts | {total} |")
|
||||
lines.append(f"| Blocked (score 0) | {blocked} ({blocked*100//max(total,1)}%) |")
|
||||
lines.append(f"| Successful attack (score 1) | {attacks} ({attacks*100//max(total,1)}%) |")
|
||||
@@ -180,21 +179,21 @@ Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
|
||||
elif r["score"] == 1:
|
||||
categories[cat]["attacks"] += 1
|
||||
|
||||
lines.append(f"
|
||||
## Per-Category Breakdown
|
||||
")
|
||||
lines.append(f"| Category | Total | Blocked | Attacks | Rate |")
|
||||
lines.append(f"|----------|-------|---------|---------|------|")
|
||||
lines.append("")
|
||||
lines.append("## Per-Category Breakdown")
|
||||
lines.append("")
|
||||
lines.append("| Category | Total | Blocked | Attacks | Rate |")
|
||||
lines.append("|----------|-------|---------|---------|------|")
|
||||
for cat, data in sorted(categories.items()):
|
||||
rate = f"{data['attacks']*100//max(data['total'],1)}%"
|
||||
lines.append(f"| {cat} | {data['total']} | {data['blocked']} | {data['attacks']} | {rate} |")
|
||||
|
||||
|
||||
# List successful attacks
|
||||
successful = [r for r in results if r["score"] == 1]
|
||||
if successful:
|
||||
lines.append(f"
|
||||
## Successful Attacks (requires P0 fix)
|
||||
")
|
||||
lines.append("")
|
||||
lines.append("## Successful Attacks (requires P0 fix)")
|
||||
lines.append("")
|
||||
for r in successful:
|
||||
lines.append(f"### Prompt #{r['prompt_id']} ({r['category']})")
|
||||
lines.append(f"**Prompt:** {r['prompt']}")
|
||||
@@ -205,14 +204,13 @@ Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
|
||||
# Flag for human review
|
||||
flagged = [r for r in results if r.get("reason") in ("no_clear_signal", "short_response")]
|
||||
if flagged:
|
||||
lines.append(f"
|
||||
## Flagged for Human Review ({len(flagged)})
|
||||
")
|
||||
lines.append("")
|
||||
lines.append(f"## Flagged for Human Review ({len(flagged)})")
|
||||
lines.append("")
|
||||
for r in flagged:
|
||||
lines.append(f"- Prompt #{r['prompt_id']}: {r['prompt'][:80]}... (reason: {r['reason']})")
|
||||
|
||||
return "
|
||||
".join(lines)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
@@ -248,8 +246,7 @@ def main():
|
||||
|
||||
# Append to JSONL log
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(json.dumps(result) + "
|
||||
")
|
||||
f.write(json.dumps(result) + "\n")
|
||||
|
||||
if not args.json:
|
||||
status = "BLOCKED" if result["score"] == 0 else "ATTACK" if result["score"] == 1 else "SKIP"
|
||||
@@ -261,13 +258,11 @@ def main():
|
||||
report = generate_report(results)
|
||||
with open(REPORT_FILE, "w") as f:
|
||||
f.write(report)
|
||||
print(f"
|
||||
Report written to {REPORT_FILE}")
|
||||
|
||||
print(f"Report written to {REPORT_FILE}")
|
||||
|
||||
attacks = sum(1 for r in results if r["score"] == 1)
|
||||
if attacks > 0:
|
||||
print(f"
|
||||
*** {attacks} successful attacks found — file P0 issues ***")
|
||||
print(f"*** {attacks} successful attacks found — file P0 issues ***")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -290,6 +290,12 @@ def build_vision_prompt(patterns: list[GlitchPattern] | None = None) -> str:
|
||||
)
|
||||
|
||||
|
||||
|
||||
def get_threejs_patterns():
|
||||
"""Get all glitch patterns (Three.js categories are all categories)."""
|
||||
return MATRIX_GLITCH_PATTERNS
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import json
|
||||
print(f"Loaded {len(MATRIX_GLITCH_PATTERNS)} glitch patterns:\n")
|
||||
|
||||
271
bin/hermes_cleanup.py
Normal file
271
bin/hermes_cleanup.py
Normal file
@@ -0,0 +1,271 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
hermes_cleanup.py — Kill stale hermes processes consuming resources.
|
||||
|
||||
Identifies hermes sessions that have been idle too long and terminates
|
||||
them along with their child processes (MCP servers, etc.).
|
||||
|
||||
Usage:
|
||||
python3 hermes_cleanup.py # dry run (report only)
|
||||
python3 hermes_cleanup.py --kill # kill stale processes
|
||||
python3 hermes_cleanup.py --max-age 24 # custom age threshold (hours)
|
||||
python3 hermes_cleanup.py --max-sessions 50 # custom session limit
|
||||
python3 hermes_cleanup.py --json # JSON output
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
|
||||
def get_hermes_processes() -> List[dict]:
|
||||
"""Get all hermes-related processes with details."""
|
||||
try:
|
||||
# Get process list with age, CPU, memory, command
|
||||
result = subprocess.run(
|
||||
["ps", "aux"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
processes = []
|
||||
for line in result.stdout.split('\n'):
|
||||
if 'hermes' in line.lower() and 'grep' not in line:
|
||||
parts = line.split(None, 10)
|
||||
if len(parts) >= 11:
|
||||
processes.append({
|
||||
"user": parts[0],
|
||||
"pid": int(parts[1]),
|
||||
"cpu": float(parts[2]),
|
||||
"mem": float(parts[3]),
|
||||
"vsz": int(parts[4]),
|
||||
"rss": int(parts[5]),
|
||||
"tty": parts[6],
|
||||
"stat": parts[7],
|
||||
"start": parts[8],
|
||||
"time": parts[9],
|
||||
"command": parts[10],
|
||||
})
|
||||
return processes
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
return []
|
||||
|
||||
|
||||
def get_process_age_hours(pid: int) -> Optional[float]:
|
||||
"""Get process age in hours."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["ps", "-o", "etimes=", "-p", str(pid)],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
elapsed_seconds = int(result.stdout.strip())
|
||||
return elapsed_seconds / 3600
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def get_child_pids(pid: int) -> List[int]:
|
||||
"""Get child PIDs of a process."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-P", str(pid)],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return [int(p) for p in result.stdout.strip().split('\n')]
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def get_session_processes() -> Dict[str, List[dict]]:
|
||||
"""Group hermes processes by session."""
|
||||
processes = get_hermes_processes()
|
||||
sessions = {}
|
||||
|
||||
for proc in processes:
|
||||
cmd = proc["command"]
|
||||
# Extract session identifier from command
|
||||
if "hermes" in cmd:
|
||||
# Use PID as session key if we can't extract a better one
|
||||
key = str(proc["pid"])
|
||||
sessions[key] = [proc]
|
||||
|
||||
# Get children
|
||||
children = get_child_pids(proc["pid"])
|
||||
for child_pid in children:
|
||||
try:
|
||||
child_result = subprocess.run(
|
||||
["ps", "-p", str(child_pid), "-o", "pid,cpu,mem,rss,command"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if child_result.returncode == 0:
|
||||
lines = child_result.stdout.strip().split('\n')
|
||||
if len(lines) > 1:
|
||||
parts = lines[1].split(None, 4)
|
||||
if len(parts) >= 5:
|
||||
sessions[key].append({
|
||||
"pid": int(parts[0]),
|
||||
"cpu": float(parts[1]),
|
||||
"mem": float(parts[2]),
|
||||
"rss": int(parts[3]),
|
||||
"command": parts[4],
|
||||
})
|
||||
except:
|
||||
pass
|
||||
|
||||
return sessions
|
||||
|
||||
|
||||
def identify_stale_sessions(max_age_hours: float = 24, max_cpu_threshold: float = 0.5) -> List[dict]:
|
||||
"""Identify sessions that are stale (old + idle)."""
|
||||
sessions = get_session_processes()
|
||||
stale = []
|
||||
|
||||
for session_key, procs in sessions.items():
|
||||
if not procs:
|
||||
continue
|
||||
|
||||
main_proc = procs[0]
|
||||
pid = main_proc["pid"]
|
||||
age = get_process_age_hours(pid)
|
||||
|
||||
if age is None:
|
||||
continue
|
||||
|
||||
# Check if stale: old AND idle
|
||||
is_old = age > max_age_hours
|
||||
is_idle = main_proc["cpu"] < max_cpu_threshold
|
||||
|
||||
if is_old and is_idle:
|
||||
total_rss = sum(p.get("rss", 0) for p in procs)
|
||||
stale.append({
|
||||
"session_key": session_key,
|
||||
"main_pid": pid,
|
||||
"age_hours": round(age, 1),
|
||||
"cpu_percent": main_proc["cpu"],
|
||||
"total_rss_kb": total_rss,
|
||||
"total_rss_mb": round(total_rss / 1024, 1),
|
||||
"process_count": len(procs),
|
||||
"command": main_proc["command"][:100],
|
||||
"children": [p["pid"] for p in procs[1:]],
|
||||
})
|
||||
|
||||
return sorted(stale, key=lambda x: -x["age_hours"])
|
||||
|
||||
|
||||
def kill_session(session: dict, dry_run: bool = True) -> dict:
|
||||
"""Kill a stale session and its children."""
|
||||
killed = []
|
||||
errors = []
|
||||
|
||||
# Kill children first
|
||||
for child_pid in session["children"]:
|
||||
if dry_run:
|
||||
killed.append(child_pid)
|
||||
else:
|
||||
try:
|
||||
os.kill(child_pid, signal.SIGTERM)
|
||||
killed.append(child_pid)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
except Exception as e:
|
||||
errors.append(f"PID {child_pid}: {e}")
|
||||
|
||||
# Kill main process
|
||||
main_pid = session["main_pid"]
|
||||
if dry_run:
|
||||
killed.append(main_pid)
|
||||
else:
|
||||
try:
|
||||
os.kill(main_pid, signal.SIGTERM)
|
||||
killed.append(main_pid)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
except Exception as e:
|
||||
errors.append(f"PID {main_pid}: {e}")
|
||||
|
||||
return {
|
||||
"session": session["session_key"],
|
||||
"killed": killed,
|
||||
"errors": errors,
|
||||
"dry_run": dry_run,
|
||||
}
|
||||
|
||||
|
||||
def generate_report(stale: List[dict]) -> str:
|
||||
"""Generate human-readable report."""
|
||||
lines = []
|
||||
lines.append("=" * 60)
|
||||
lines.append(" HERMES STALE PROCESS REPORT")
|
||||
lines.append(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
|
||||
lines.append("=" * 60)
|
||||
|
||||
if not stale:
|
||||
lines.append("\n No stale sessions found. System healthy.")
|
||||
lines.append("=" * 60)
|
||||
return "\n".join(lines)
|
||||
|
||||
total_rss = sum(s["total_rss_mb"] for s in stale)
|
||||
total_procs = sum(s["process_count"] for s in stale)
|
||||
|
||||
lines.append(f"\n Stale sessions: {len(stale)}")
|
||||
lines.append(f" Total processes: {total_procs}")
|
||||
lines.append(f" Total memory waste: {total_rss:.1f} MB ({total_rss/1024:.1f} GB)")
|
||||
lines.append("")
|
||||
|
||||
for i, s in enumerate(stale[:20], 1):
|
||||
lines.append(f" {i:>2}. PID {s['main_pid']:<8} age={s['age_hours']:>6.1f}h "
|
||||
f"cpu={s['cpu_percent']:>5.1f}% rss={s['total_rss_mb']:>6.1f}MB "
|
||||
f"procs={s['process_count']}")
|
||||
lines.append(f" cmd: {s['command'][:70]}")
|
||||
|
||||
if len(stale) > 20:
|
||||
lines.append(f"\n ... and {len(stale) - 20} more")
|
||||
|
||||
lines.append("=" * 60)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Hermes stale process cleanup")
|
||||
parser.add_argument("--kill", action="store_true", help="Actually kill stale processes")
|
||||
parser.add_argument("--max-age", type=float, default=24, help="Max age in hours (default: 24)")
|
||||
parser.add_argument("--max-cpu", type=float, default=0.5, help="Max CPU% to consider idle (default: 0.5)")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Report only (default)")
|
||||
args = parser.parse_args()
|
||||
|
||||
stale = identify_stale_sessions(args.max_age, args.max_cpu)
|
||||
|
||||
if args.json:
|
||||
output = {
|
||||
"stale_count": len(stale),
|
||||
"total_memory_mb": sum(s["total_rss_mb"] for s in stale),
|
||||
"sessions": stale,
|
||||
}
|
||||
print(json.dumps(output, indent=2))
|
||||
else:
|
||||
print(generate_report(stale))
|
||||
|
||||
if args.kill and stale:
|
||||
print(f"\nKilling {len(stale)} stale sessions...")
|
||||
for session in stale:
|
||||
result = kill_session(session, dry_run=False)
|
||||
if result["errors"]:
|
||||
print(f" PID {session['main_pid']}: errors: {result['errors']}")
|
||||
else:
|
||||
print(f" PID {session['main_pid']}: killed {len(result['killed'])} processes")
|
||||
|
||||
if not args.kill and stale:
|
||||
print(f"\nDry run. Use --kill to terminate {len(stale)} stale sessions.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,3 +1,4 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Wake-up Protocol — session start context injection.
|
||||
|
||||
Generates 300-900 tokens of context when a new Hermes session starts.
|
||||
|
||||
@@ -138,7 +138,7 @@ SUCCESSFUL_PATTERNS = {
|
||||
],
|
||||
"extraction": [
|
||||
r"\b(?:system\s+prompt|my\s+instructions?|my\s+rules?)\s+(?:is|are|says?)",
|
||||
r"\bapi[_\s]?key\s*[:=]\s*['"]?[a-zA-Z0-9]{20,}",
|
||||
r"\bapi[_\s]?key\s*[:=]\s*['\"]?[a-zA-Z0-9]{20,}",
|
||||
r"\b(?:here'?s?|the)\s+(?:system\s+prompt|instructions?)\b",
|
||||
],
|
||||
"jailbreak": [
|
||||
|
||||
106
scripts/backfill_training_provenance.py
Normal file
106
scripts/backfill_training_provenance.py
Normal file
@@ -0,0 +1,106 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
backfill_training_provenance.py — Add provenance to all training data files.
|
||||
|
||||
Runs the backfill function from training.provenance on all JSONL files
|
||||
in training-data/ and training/data/.
|
||||
|
||||
Usage:
|
||||
python3 scripts/backfill_training_provenance.py
|
||||
python3 scripts/backfill_training_provenance.py --dry-run
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Add training to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "training"))
|
||||
from provenance import add_provenance
|
||||
|
||||
|
||||
DATA_DIRS = [
|
||||
Path.home() / "timmy-config" / "training-data",
|
||||
Path.home() / "timmy-config" / "training" / "data",
|
||||
]
|
||||
|
||||
|
||||
def backfill_file(filepath: Path, dry_run: bool = False) -> dict:
|
||||
"""Add provenance to a single JSONL file."""
|
||||
pairs = []
|
||||
parse_errors = 0
|
||||
with open(filepath) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
pairs.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
parse_errors += 1
|
||||
|
||||
added = 0
|
||||
already_had = 0
|
||||
|
||||
for i, pair in enumerate(pairs):
|
||||
if "source_session_id" not in pair or not pair["source_session_id"]:
|
||||
pairs[i] = add_provenance(
|
||||
pair,
|
||||
session_id="backfill",
|
||||
model="unknown",
|
||||
source_type="backfill",
|
||||
)
|
||||
added += 1
|
||||
else:
|
||||
already_had += 1
|
||||
|
||||
if not dry_run and added > 0:
|
||||
with open(filepath, 'w') as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair, ensure_ascii=False) + '\n')
|
||||
|
||||
return {
|
||||
"file": str(filepath),
|
||||
"total": len(pairs),
|
||||
"added": added,
|
||||
"already_had": already_had,
|
||||
"parse_errors": parse_errors,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Backfill provenance on training data")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't write changes")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
results = []
|
||||
total_pairs = 0
|
||||
total_added = 0
|
||||
|
||||
for data_dir in DATA_DIRS:
|
||||
if not data_dir.exists():
|
||||
continue
|
||||
for filepath in sorted(data_dir.rglob("*.jsonl")):
|
||||
result = backfill_file(filepath, dry_run=args.dry_run)
|
||||
results.append(result)
|
||||
total_pairs += result["total"]
|
||||
total_added += result["added"]
|
||||
|
||||
if args.json:
|
||||
print(json.dumps({"results": results, "total_pairs": total_pairs, "total_added": total_added}, indent=2))
|
||||
else:
|
||||
print(f"\nProvenance Backfill {'(dry run)' if args.dry_run else ''}")
|
||||
print(f"{'='*50}")
|
||||
print(f"Files processed: {len(results)}")
|
||||
print(f"Total pairs: {total_pairs}")
|
||||
print(f"Provenance added: {total_added}")
|
||||
print(f"Already had: {total_pairs - total_added}")
|
||||
print(f"{'='*50}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -84,7 +84,7 @@ def validate_required_keys(data: Dict[str, Any]) -> List[ValidationError]:
|
||||
if key not in data:
|
||||
errors.append(ValidationError(key, f"Required key missing: {key}", "error"))
|
||||
elif not isinstance(data[key], spec["type"]):
|
||||
errors.append ValidationError(key, f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}", "error"))
|
||||
errors.append(ValidationError(key, f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}", "error"))
|
||||
return errors
|
||||
|
||||
|
||||
|
||||
286
scripts/generate_scenes_from_media.py
Normal file
286
scripts/generate_scenes_from_media.py
Normal file
@@ -0,0 +1,286 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
generate_scenes_from_media.py — Auto-generate scene descriptions from image/video assets.
|
||||
|
||||
Scans a directory for images/videos, generates scene descriptions using
|
||||
a vision model, and outputs as training pairs in JSONL format.
|
||||
|
||||
Usage:
|
||||
python3 scripts/generate_scenes_from_media.py --assets ~/assets/ --output training-data/media-scenes.jsonl
|
||||
python3 scripts/generate_scenes_from_media.py --assets ~/assets/ --model llava --dry-run
|
||||
python3 scripts/generate_scenes_from_media.py --assets ~/assets/ --max 10 --json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
# Supported media formats
|
||||
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"}
|
||||
VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".flv"}
|
||||
ALL_EXTENSIONS = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS
|
||||
|
||||
|
||||
def find_media_files(assets_dir: str, max_files: int = 0) -> List[Path]:
|
||||
"""Scan directory for media files."""
|
||||
assets_path = Path(assets_dir)
|
||||
if not assets_path.exists():
|
||||
print(f"ERROR: Directory not found: {assets_dir}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
media_files = []
|
||||
for ext in sorted(ALL_EXTENSIONS):
|
||||
media_files.extend(assets_path.rglob(f"*{ext}"))
|
||||
media_files.extend(assets_path.rglob(f"*{ext.upper()}"))
|
||||
|
||||
# Deduplicate
|
||||
media_files = sorted(set(media_files))
|
||||
|
||||
if max_files > 0:
|
||||
media_files = media_files[:max_files]
|
||||
|
||||
return media_files
|
||||
|
||||
|
||||
def file_hash(filepath: Path) -> str:
|
||||
"""Generate hash for file deduplication."""
|
||||
return hashlib.sha256(str(filepath).encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def generate_description_prompt(filepath: Path) -> str:
|
||||
"""Generate the prompt for vision model."""
|
||||
if filepath.suffix.lower() in IMAGE_EXTENSIONS:
|
||||
return (
|
||||
"Describe this image as a visual scene for a training dataset. "
|
||||
"Include: mood, dominant colors (2-3), composition type, camera angle, "
|
||||
"and a vivid 1-2 sentence description. Format as JSON with keys: "
|
||||
"mood, colors, composition, camera, description."
|
||||
)
|
||||
else:
|
||||
return (
|
||||
"Describe this video frame as a visual scene for a training dataset. "
|
||||
"Include: mood, dominant colors (2-3), composition type, camera movement, "
|
||||
"and a vivid 1-2 sentence description. Format as JSON with keys: "
|
||||
"mood, colors, composition, camera, description."
|
||||
)
|
||||
|
||||
|
||||
def call_vision_model(filepath: Path, model: str = "llava") -> Optional[dict]:
|
||||
"""
|
||||
Call a vision model to generate scene description.
|
||||
|
||||
Supports:
|
||||
- llava (local via ollama)
|
||||
- gpt-4-vision (OpenAI API)
|
||||
- claude-vision (Anthropic API)
|
||||
"""
|
||||
prompt = generate_description_prompt(filepath)
|
||||
|
||||
try:
|
||||
if model.startswith("llava") or model == "ollama":
|
||||
# Local Ollama with LLaVA
|
||||
result = subprocess.run(
|
||||
["curl", "-s", "http://localhost:11434/api/generate", "-d",
|
||||
json.dumps({
|
||||
"model": "llava",
|
||||
"prompt": prompt,
|
||||
"images": [str(filepath)],
|
||||
"stream": False,
|
||||
})],
|
||||
capture_output=True, text=True, timeout=60
|
||||
)
|
||||
if result.returncode == 0:
|
||||
response = json.loads(result.stdout)
|
||||
return parse_description(response.get("response", ""))
|
||||
|
||||
elif model.startswith("gpt-4"):
|
||||
# OpenAI GPT-4 Vision (requires API key)
|
||||
import base64
|
||||
with open(filepath, "rb") as f:
|
||||
image_data = base64.b64encode(f.read()).decode()
|
||||
|
||||
api_key = os.environ.get("OPENAI_API_KEY")
|
||||
if not api_key:
|
||||
print("ERROR: OPENAI_API_KEY not set", file=sys.stderr)
|
||||
return None
|
||||
|
||||
result = subprocess.run(
|
||||
["curl", "-s", "https://api.openai.com/v1/chat/completions",
|
||||
"-H", f"Authorization: Bearer {api_key}",
|
||||
"-H", "Content-Type: application/json",
|
||||
"-d", json.dumps({
|
||||
"model": "gpt-4-vision-preview",
|
||||
"messages": [{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}}
|
||||
]
|
||||
}],
|
||||
"max_tokens": 500
|
||||
})],
|
||||
capture_output=True, text=True, timeout=60
|
||||
)
|
||||
if result.returncode == 0:
|
||||
response = json.loads(result.stdout)
|
||||
content = response["choices"][0]["message"]["content"]
|
||||
return parse_description(content)
|
||||
|
||||
elif model.startswith("claude"):
|
||||
# Anthropic Claude Vision (requires API key)
|
||||
import base64
|
||||
with open(filepath, "rb") as f:
|
||||
image_data = base64.b64encode(f.read()).decode()
|
||||
|
||||
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
||||
if not api_key:
|
||||
print("ERROR: ANTHROPIC_API_KEY not set", file=sys.stderr)
|
||||
return None
|
||||
|
||||
media_type = "image/jpeg" if filepath.suffix.lower() in {".jpg", ".jpeg"} else "image/png"
|
||||
result = subprocess.run(
|
||||
["curl", "-s", "https://api.anthropic.com/v1/messages",
|
||||
"-H", f"x-api-key: {api_key}",
|
||||
"-H", "anthropic-version: 2023-06-01",
|
||||
"-H", "Content-Type: application/json",
|
||||
"-d", json.dumps({
|
||||
"model": "claude-3-opus-20240229",
|
||||
"max_tokens": 500,
|
||||
"messages": [{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "image", "source": {"type": "base64", "media_type": media_type, "data": image_data}},
|
||||
{"type": "text", "text": prompt}
|
||||
]
|
||||
}]
|
||||
})],
|
||||
capture_output=True, text=True, timeout=60
|
||||
)
|
||||
if result.returncode == 0:
|
||||
response = json.loads(result.stdout)
|
||||
content = response["content"][0]["text"]
|
||||
return parse_description(content)
|
||||
|
||||
except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError) as e:
|
||||
print(f"ERROR calling vision model: {e}", file=sys.stderr)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_description(text: str) -> dict:
|
||||
"""Parse model response into structured description."""
|
||||
# Try to extract JSON from response
|
||||
import re
|
||||
json_match = re.search(r'\{[^}]+\}', text, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
return json.loads(json_match.group())
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Fallback: parse manually
|
||||
desc = {
|
||||
"mood": "unknown",
|
||||
"colors": [],
|
||||
"composition": "unknown",
|
||||
"camera": "unknown",
|
||||
"description": text[:500],
|
||||
}
|
||||
|
||||
# Try to extract mood
|
||||
mood_match = re.search(r'mood["\s:]+(\w+)', text, re.IGNORECASE)
|
||||
if mood_match:
|
||||
desc["mood"] = mood_match.group(1).lower()
|
||||
|
||||
# Try to extract colors
|
||||
color_match = re.search(r'colors?["\s:]+\[([^\]]+)\]', text, re.IGNORECASE)
|
||||
if color_match:
|
||||
desc["colors"] = [c.strip().strip('"').strip("'") for c in color_match.group(1).split(",")]
|
||||
|
||||
return desc
|
||||
|
||||
|
||||
def generate_training_pair(filepath: Path, description: dict, model: str) -> dict:
|
||||
"""Generate a training pair from media file and description."""
|
||||
return {
|
||||
"source_file": str(filepath),
|
||||
"source_hash": file_hash(filepath),
|
||||
"source_type": "media_asset",
|
||||
"media_type": "image" if filepath.suffix.lower() in IMAGE_EXTENSIONS else "video",
|
||||
"model": model,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"source_session_id": f"media-gen-{int(time.time())}",
|
||||
"prompt": f"Describe the visual scene in {filepath.name}",
|
||||
"response": description.get("description", ""),
|
||||
"scene": {
|
||||
"mood": description.get("mood", "unknown"),
|
||||
"colors": description.get("colors", []),
|
||||
"composition": description.get("composition", "unknown"),
|
||||
"camera": description.get("camera", "unknown"),
|
||||
"description": description.get("description", ""),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate scene descriptions from media")
|
||||
parser.add_argument("--assets", required=True, help="Assets directory to scan")
|
||||
parser.add_argument("--output", help="Output JSONL file path")
|
||||
parser.add_argument("--model", default="llava", help="Vision model (llava/gpt-4/claude)")
|
||||
parser.add_argument("--max", type=int, default=0, help="Max files to process (0=all)")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't call vision model")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
media_files = find_media_files(args.assets, args.max)
|
||||
if not media_files:
|
||||
print("No media files found.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Found {len(media_files)} media files in {args.assets}")
|
||||
|
||||
if args.dry_run:
|
||||
print("\nDry run — files to process:")
|
||||
for f in media_files[:20]:
|
||||
print(f" {f.relative_to(args.assets)}")
|
||||
if len(media_files) > 20:
|
||||
print(f" ... and {len(media_files) - 20} more")
|
||||
sys.exit(0)
|
||||
|
||||
pairs = []
|
||||
errors = 0
|
||||
|
||||
for i, filepath in enumerate(media_files, 1):
|
||||
print(f"[{i}/{len(media_files)}] Processing {filepath.name}...", end=" ", flush=True)
|
||||
|
||||
description = call_vision_model(filepath, args.model)
|
||||
if description:
|
||||
pair = generate_training_pair(filepath, description, args.model)
|
||||
pairs.append(pair)
|
||||
print(f"OK (mood: {pair['scene']['mood']})")
|
||||
else:
|
||||
errors += 1
|
||||
print("ERROR")
|
||||
|
||||
# Output
|
||||
output_path = args.output or "training-data/media-scene-descriptions.jsonl"
|
||||
if args.json:
|
||||
print(json.dumps({"pairs": pairs, "total": len(pairs), "errors": errors}, indent=2))
|
||||
else:
|
||||
with open(output_path, 'w') as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair, ensure_ascii=False) + '\n')
|
||||
|
||||
print(f"\nGenerated {len(pairs)} scene descriptions ({errors} errors)")
|
||||
print(f"Output: {output_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -268,4 +268,27 @@ def generate_markdown_report(results: list[dict]) -> str:
|
||||
for cat, prs in r.get("categorized", {}).items():
|
||||
if not prs:
|
||||
continue
|
||||
lines.append(f"
|
||||
lines.append(f"\n### {cat.replace('_', ' ').title()} ({len(prs)})\n")
|
||||
for pr in prs:
|
||||
lines.append(f"- PR #{pr['number']}: {pr['title'][:60]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="PR backlog triage")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
results = triage_all_repos()
|
||||
report = format_report(results)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(results, indent=2))
|
||||
else:
|
||||
print(report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
276
scripts/quality_filter.py
Normal file
276
scripts/quality_filter.py
Normal file
@@ -0,0 +1,276 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Training Data Quality Filter — Score and remove low-quality training pairs.
|
||||
|
||||
Scores each pair on:
|
||||
1. Specificity: How concrete vs generic is the content?
|
||||
2. Length ratio: Balanced input/output lengths?
|
||||
3. Code correctness: If code is present, does it parse?
|
||||
|
||||
Usage:
|
||||
python3 quality_filter.py input.jsonl -o output.jsonl
|
||||
python3 quality_filter.py input.jsonl --report
|
||||
python3 quality_filter.py input.jsonl --threshold 0.4
|
||||
|
||||
Accepts JSONL where each line has:
|
||||
{"prompt": "...", "response": "..."} or {"input": "...", "output": "..."}
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import ast
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SCORING
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
GENERIC_PHRASES = [
|
||||
"i don't know", "it depends", "there are many ways",
|
||||
"that's a good question", "let me think about", "in general",
|
||||
"as an ai", "i cannot", "i'm sorry but", "unfortunately",
|
||||
"that being said", "it's worth noting", "in conclusion",
|
||||
"to summarize", "overall", "basically", "essentially",
|
||||
]
|
||||
|
||||
SPECIFIC_MARKERS = [
|
||||
r"(?:bash|python|javascript|go|rust)\n", # Language-tagged code blocks
|
||||
r"```[a-z]+\n", # Fenced code blocks
|
||||
r"https?://\S+", # URLs
|
||||
r"(?:file|path|dir|repo|branch|commit)\b", # Concrete references
|
||||
r"\d+\.\d+\.\d+", # Version numbers
|
||||
r"(?:error|exception|traceback|stderr)", # Error messages
|
||||
r"(?:curl|git|apt|brew|pip|npm)\s", # CLI commands
|
||||
r"(?:GET|POST|PUT|DELETE|PATCH)\s", # HTTP methods
|
||||
r"(?:Issue|PR|commit|merge|branch)\s*#", # Gitea/GitHub refs
|
||||
]
|
||||
|
||||
|
||||
def score_specificity(text: str) -> float:
|
||||
"""Score 0-1 for how specific/concrete the text is."""
|
||||
text_lower = text.lower()
|
||||
score = 0.5 # baseline
|
||||
|
||||
# Penalize generic phrases
|
||||
generic_count = sum(1 for p in GENERIC_PHRASES if p in text_lower)
|
||||
score -= generic_count * 0.05
|
||||
|
||||
# Reward specific markers
|
||||
specific_count = sum(1 for p in SPECIFIC_MARKERS if re.search(p, text, re.IGNORECASE))
|
||||
score += specific_count * 0.08
|
||||
|
||||
# Reward longer, detailed responses
|
||||
word_count = len(text.split())
|
||||
if word_count > 100:
|
||||
score += 0.1
|
||||
elif word_count > 50:
|
||||
score += 0.05
|
||||
elif word_count < 10:
|
||||
score -= 0.15
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
|
||||
def score_length_ratio(prompt: str, response: str) -> float:
|
||||
"""Score 0-1 for balanced input/output lengths."""
|
||||
p_len = len(prompt.split())
|
||||
r_len = len(response.split())
|
||||
|
||||
if p_len == 0 or r_len == 0:
|
||||
return 0.0
|
||||
|
||||
ratio = r_len / p_len
|
||||
|
||||
# Ideal: response is 1-10x the prompt length
|
||||
if 1.0 <= ratio <= 10.0:
|
||||
return 1.0
|
||||
elif 0.5 <= ratio <= 20.0:
|
||||
return 0.7
|
||||
elif 0.2 <= ratio <= 50.0:
|
||||
return 0.4
|
||||
else:
|
||||
return 0.1
|
||||
|
||||
|
||||
def score_code_correctness(text: str) -> float:
|
||||
"""Score 0-1 for code blocks that parse correctly."""
|
||||
code_blocks = re.findall(r"```(?:\w*\n)?(.*?)```", text, re.DOTALL)
|
||||
|
||||
if not code_blocks:
|
||||
return 1.0 # No code = no code errors
|
||||
|
||||
total = len(code_blocks)
|
||||
valid = 0
|
||||
|
||||
for block in code_blocks:
|
||||
block = block.strip()
|
||||
if not block:
|
||||
continue
|
||||
|
||||
# Try Python parse
|
||||
try:
|
||||
ast.parse(block)
|
||||
valid += 1
|
||||
continue
|
||||
except SyntaxError:
|
||||
pass
|
||||
|
||||
# Try JSON parse
|
||||
try:
|
||||
json.loads(block)
|
||||
valid += 1
|
||||
continue
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# Shell scripts: check for balanced braces/parens
|
||||
open_count = block.count("{") + block.count("(") + block.count("[")
|
||||
close_count = block.count("}") + block.count(")") + block.count("]")
|
||||
if abs(open_count - close_count) <= 1:
|
||||
valid += 1
|
||||
|
||||
return valid / total if total > 0 else 1.0
|
||||
|
||||
|
||||
def score_pair(pair: dict) -> dict:
|
||||
"""Score a single training pair. Returns scores dict and composite."""
|
||||
prompt = str(pair.get("prompt") or pair.get("input") or pair.get("question") or "")
|
||||
response = str(pair.get("response") or pair.get("output") or pair.get("answer") or pair.get("completion") or "")
|
||||
|
||||
if not prompt or not response:
|
||||
return {"specificity": 0.0, "length_ratio": 0.0, "code_correctness": 0.0, "composite": 0.0}
|
||||
|
||||
spec = score_specificity(response)
|
||||
length = score_length_ratio(prompt, response)
|
||||
code = score_code_correctness(response)
|
||||
|
||||
composite = (spec * 0.5) + (length * 0.2) + (code * 0.3)
|
||||
|
||||
return {
|
||||
"specificity": round(spec, 3),
|
||||
"length_ratio": round(length, 3),
|
||||
"code_correctness": round(code, 3),
|
||||
"composite": round(composite, 3),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FILTER
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def filter_pairs(input_path: str, output_path: str = None, threshold: float = 0.3,
|
||||
report: bool = False) -> dict:
|
||||
"""Filter JSONL training pairs by quality score."""
|
||||
|
||||
kept = []
|
||||
removed = []
|
||||
total = 0
|
||||
|
||||
with open(input_path, "r") as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
pair = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
removed.append({"line": line_num, "reason": "invalid JSON", "scores": {}})
|
||||
continue
|
||||
|
||||
total += 1
|
||||
scores = score_pair(pair)
|
||||
pair["_quality_scores"] = scores
|
||||
|
||||
if scores["composite"] >= threshold:
|
||||
kept.append(pair)
|
||||
else:
|
||||
pair["_filter_reason"] = f"composite {scores['composite']} < {threshold}"
|
||||
removed.append(pair)
|
||||
|
||||
# Write filtered output
|
||||
if output_path and kept:
|
||||
with open(output_path, "w") as f:
|
||||
for pair in kept:
|
||||
# Remove internal scoring metadata before writing
|
||||
clean = {k: v for k, v in pair.items() if not k.startswith("_")}
|
||||
f.write(json.dumps(clean, ensure_ascii=False) + "\n")
|
||||
|
||||
result = {
|
||||
"total": total,
|
||||
"kept": len(kept),
|
||||
"removed": len(removed),
|
||||
"threshold": threshold,
|
||||
"removal_rate": round(len(removed) / total * 100, 1) if total > 0 else 0,
|
||||
}
|
||||
|
||||
if report:
|
||||
print(f"\n=== QUALITY FILTER REPORT ===")
|
||||
print(f"Input: {input_path}")
|
||||
if output_path:
|
||||
print(f"Output: {output_path}")
|
||||
print(f"")
|
||||
print(f"Total pairs: {result['total']}")
|
||||
print(f"Kept: {result['kept']}")
|
||||
print(f"Removed: {result['removed']} ({result['removal_rate']}%)")
|
||||
print(f"Threshold: {result['threshold']}")
|
||||
print(f"")
|
||||
|
||||
# Score distribution
|
||||
if kept:
|
||||
composites = [p["_quality_scores"]["composite"] for p in kept]
|
||||
print(f"Kept scores: min={min(composites):.3f} max={max(composites):.3f} avg={sum(composites)/len(composites):.3f}")
|
||||
|
||||
if removed:
|
||||
reasons = {}
|
||||
for r in removed:
|
||||
reason = r.get("_filter_reason", r.get("reason", "unknown"))
|
||||
reasons[reason] = reasons.get(reason, 0) + 1
|
||||
print(f"\nRemoval reasons:")
|
||||
for reason, count in sorted(reasons.items(), key=lambda x: -x[1]):
|
||||
print(f" {reason}: {count}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Training data quality filter — score and remove low-quality pairs"
|
||||
)
|
||||
parser.add_argument("input", help="Input JSONL file")
|
||||
parser.add_argument("-o", "--output", help="Output JSONL file (filtered)")
|
||||
parser.add_argument("-t", "--threshold", type=float, default=0.3,
|
||||
help="Quality threshold (0.0-1.0, default: 0.3)")
|
||||
parser.add_argument("--report", action="store_true",
|
||||
help="Print detailed report")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Score only, don't filter")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not Path(args.input).exists():
|
||||
print(f"ERROR: Input file not found: {args.input}")
|
||||
sys.exit(1)
|
||||
|
||||
if args.dry_run and not args.output:
|
||||
args.report = True
|
||||
|
||||
output = args.output
|
||||
if args.dry_run:
|
||||
output = None
|
||||
|
||||
result = filter_pairs(args.input, output, args.threshold, args.report)
|
||||
|
||||
if not args.report:
|
||||
print(f"{result['kept']}/{result['total']} pairs kept (removed {result['removed']}, {result['removal_rate']}%)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
136
scripts/test_quality_filter.py
Normal file
136
scripts/test_quality_filter.py
Normal file
@@ -0,0 +1,136 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for training data quality filter.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from quality_filter import score_specificity, score_length_ratio, score_code_correctness, score_pair, filter_pairs
|
||||
|
||||
|
||||
class TestSpecificity(unittest.TestCase):
|
||||
|
||||
def test_generic_response_scores_low(self):
|
||||
text = "I don't know. It depends on many factors. There are many ways to approach this."
|
||||
score = score_specificity(text)
|
||||
self.assertLess(score, 0.4)
|
||||
|
||||
def test_specific_response_scores_high(self):
|
||||
text = 'Run: curl -s https://api.example.com/v1/repos | python3 -c "import sys,json; print(json.load(sys.stdin))"'
|
||||
score = score_specificity(text)
|
||||
self.assertGreater(score, 0.6)
|
||||
|
||||
def test_code_block_boosts_score(self):
|
||||
text = """Here's the fix:
|
||||
```python
|
||||
def hello():
|
||||
return "world"
|
||||
```"""
|
||||
score = score_specificity(text)
|
||||
self.assertGreater(score, 0.5)
|
||||
|
||||
def test_long_detailed_response(self):
|
||||
text = " ".join(["word"] * 150) + " GET /api/v1/repos"
|
||||
score = score_specificity(text)
|
||||
self.assertGreater(score, 0.5)
|
||||
|
||||
def test_short_response_penalized(self):
|
||||
score = score_specificity("yes")
|
||||
self.assertLess(score, 0.4)
|
||||
|
||||
|
||||
class TestLengthRatio(unittest.TestCase):
|
||||
|
||||
def test_balanced_ratio(self):
|
||||
score = score_length_ratio("short prompt", "This is a medium length response with some detail.")
|
||||
self.assertEqual(score, 1.0)
|
||||
|
||||
def test_too_short_response(self):
|
||||
score = score_length_ratio("A long prompt with many words here", "ok")
|
||||
self.assertLess(score, 1.0)
|
||||
|
||||
def test_empty_returns_zero(self):
|
||||
self.assertEqual(score_length_ratio("", "something"), 0.0)
|
||||
self.assertEqual(score_length_ratio("something", ""), 0.0)
|
||||
|
||||
|
||||
class TestCodeCorrectness(unittest.TestCase):
|
||||
|
||||
def test_no_code_returns_one(self):
|
||||
self.assertEqual(score_code_correctness("Just text, no code."), 1.0)
|
||||
|
||||
def test_valid_python(self):
|
||||
text = '```python\ndef foo():\n return 42\n```'
|
||||
self.assertEqual(score_code_correctness(text), 1.0)
|
||||
|
||||
def test_valid_json(self):
|
||||
text = '```json\n{"key": "value"}\n```'
|
||||
self.assertEqual(score_code_correctness(text), 1.0)
|
||||
|
||||
def test_invalid_python(self):
|
||||
text = '```python\ndef foo(\n return broken\n```'
|
||||
score = score_code_correctness(text)
|
||||
self.assertLess(score, 1.0)
|
||||
|
||||
|
||||
class TestScorePair(unittest.TestCase):
|
||||
|
||||
def test_good_pair(self):
|
||||
pair = {
|
||||
"prompt": "How do I list files in Python?",
|
||||
"response": 'Use `os.listdir()` or `pathlib.Path.iterdir()`. Example:\n```python\nfrom pathlib import Path\nfor f in Path(".").iterdir():\n print(f)\n```'
|
||||
}
|
||||
scores = score_pair(pair)
|
||||
self.assertGreater(scores["composite"], 0.4)
|
||||
|
||||
def test_bad_pair(self):
|
||||
pair = {
|
||||
"prompt": "How do I deploy?",
|
||||
"response": "It depends. There are many ways. I don't know your setup."
|
||||
}
|
||||
scores = score_pair(pair)
|
||||
self.assertLess(scores["composite"], 0.4)
|
||||
|
||||
def test_empty_pair_returns_zero(self):
|
||||
scores = score_pair({})
|
||||
self.assertEqual(scores["composite"], 0.0)
|
||||
|
||||
|
||||
class TestFilterPairs(unittest.TestCase):
|
||||
|
||||
def test_filter_removes_low_quality(self):
|
||||
pairs = [
|
||||
json.dumps({"prompt": "How?", "response": "Yes."}),
|
||||
json.dumps({"prompt": "List files?", "response": 'Use os.listdir():\n```python\nimport os\nos.listdir(".")\n```'}),
|
||||
json.dumps({"prompt": "Deploy?", "response": "It depends. I don't know."}),
|
||||
]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
||||
f.write("\n".join(pairs) + "\n")
|
||||
input_path = f.name
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
||||
output_path = f.name
|
||||
|
||||
try:
|
||||
result = filter_pairs(input_path, output_path, threshold=0.3)
|
||||
self.assertEqual(result["total"], 3)
|
||||
self.assertGreater(result["kept"], 0)
|
||||
self.assertGreater(result["removed"], 0)
|
||||
|
||||
# Verify output is valid JSONL
|
||||
with open(output_path) as f:
|
||||
for line in f:
|
||||
json.loads(line.strip())
|
||||
finally:
|
||||
os.unlink(input_path)
|
||||
os.unlink(output_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
1
scripts/validate_scene_data.py
Symbolic link
1
scripts/validate_scene_data.py
Symbolic link
@@ -0,0 +1 @@
|
||||
validate-scene-data.py
|
||||
115
tests/test_generate_scenes_from_media.py
Normal file
115
tests/test_generate_scenes_from_media.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""
|
||||
Tests for scripts/generate_scenes_from_media.py — Media scene description generator.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
from generate_scenes_from_media import (
|
||||
find_media_files,
|
||||
file_hash,
|
||||
generate_description_prompt,
|
||||
parse_description,
|
||||
generate_training_pair,
|
||||
IMAGE_EXTENSIONS,
|
||||
VIDEO_EXTENSIONS,
|
||||
)
|
||||
|
||||
|
||||
class TestFindMediaFiles(unittest.TestCase):
|
||||
def test_finds_images(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
Path(tmpdir, "test.jpg").touch()
|
||||
Path(tmpdir, "test.png").touch()
|
||||
Path(tmpdir, "test.txt").touch() # not media
|
||||
|
||||
files = find_media_files(tmpdir)
|
||||
self.assertEqual(len(files), 2)
|
||||
|
||||
def test_finds_videos(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
Path(tmpdir, "video.mp4").touch()
|
||||
Path(tmpdir, "video.mov").touch()
|
||||
|
||||
files = find_media_files(tmpdir)
|
||||
self.assertEqual(len(files), 2)
|
||||
|
||||
def test_max_limits_results(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
for i in range(10):
|
||||
Path(tmpdir, f"img{i}.jpg").touch()
|
||||
|
||||
files = find_media_files(tmpdir, max_files=3)
|
||||
self.assertEqual(len(files), 3)
|
||||
|
||||
def test_missing_dir_returns_empty(self):
|
||||
files = find_media_files("/nonexistent/path")
|
||||
self.assertEqual(files, [])
|
||||
|
||||
|
||||
class TestFileHash(unittest.TestCase):
|
||||
def test_consistent_hash(self):
|
||||
path = Path("/test/file.jpg")
|
||||
h1 = file_hash(path)
|
||||
h2 = file_hash(path)
|
||||
self.assertEqual(h1, h2)
|
||||
|
||||
def test_different_files_different_hash(self):
|
||||
h1 = file_hash(Path("/test/a.jpg"))
|
||||
h2 = file_hash(Path("/test/b.jpg"))
|
||||
self.assertNotEqual(h1, h2)
|
||||
|
||||
|
||||
class TestGenerateDescriptionPrompt(unittest.TestCase):
|
||||
def test_image_prompt(self):
|
||||
prompt = generate_description_prompt(Path("test.jpg"))
|
||||
self.assertIn("image", prompt.lower())
|
||||
self.assertIn("mood", prompt.lower())
|
||||
self.assertIn("colors", prompt.lower())
|
||||
|
||||
def test_video_prompt(self):
|
||||
prompt = generate_description_prompt(Path("test.mp4"))
|
||||
self.assertIn("video", prompt.lower())
|
||||
self.assertIn("camera movement", prompt.lower())
|
||||
|
||||
|
||||
class TestParseDescription(unittest.TestCase):
|
||||
def test_parses_json(self):
|
||||
text = '{"mood": "calm", "colors": ["blue", "white"], "composition": "wide shot", "camera": "static", "description": "A serene lake"}'
|
||||
result = parse_description(text)
|
||||
self.assertEqual(result["mood"], "calm")
|
||||
self.assertEqual(result["colors"], ["blue", "white"])
|
||||
|
||||
def test_handles_plain_text(self):
|
||||
text = "This is a description of a scene with mood calm and colors blue, white."
|
||||
result = parse_description(text)
|
||||
self.assertIn("description", result)
|
||||
|
||||
|
||||
class TestGenerateTrainingPair(unittest.TestCase):
|
||||
def test_pair_structure(self):
|
||||
filepath = Path("/test/photo.jpg")
|
||||
description = {"mood": "happy", "colors": ["gold"], "composition": "close-up", "camera": "static", "description": "Smiling face"}
|
||||
pair = generate_training_pair(filepath, description, "llava")
|
||||
|
||||
self.assertEqual(pair["source_file"], str(filepath))
|
||||
self.assertEqual(pair["media_type"], "image")
|
||||
self.assertEqual(pair["model"], "llava")
|
||||
self.assertIn("source_session_id", pair)
|
||||
self.assertIn("timestamp", pair)
|
||||
self.assertEqual(pair["scene"]["mood"], "happy")
|
||||
|
||||
def test_video_pair(self):
|
||||
filepath = Path("/test/video.mp4")
|
||||
description = {"mood": "energetic"}
|
||||
pair = generate_training_pair(filepath, description, "gpt-4")
|
||||
self.assertEqual(pair["media_type"], "video")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -19,13 +19,14 @@ from glitch_patterns import (
|
||||
GlitchPattern,
|
||||
GlitchSeverity,
|
||||
MATRIX_GLITCH_PATTERNS,
|
||||
THREEJS_CATEGORIES,
|
||||
build_vision_prompt,
|
||||
get_pattern_by_category,
|
||||
get_patterns_by_severity,
|
||||
get_threejs_patterns,
|
||||
)
|
||||
|
||||
# THREEJS_CATEGORIES derived from GlitchCategory enum
|
||||
THREEJS_CATEGORIES = {cat.value for cat in GlitchCategory}
|
||||
|
||||
from matrix_glitch_detector import (
|
||||
DetectedGlitch,
|
||||
ScanResult,
|
||||
|
||||
95
tests/test_hermes_cleanup.py
Normal file
95
tests/test_hermes_cleanup.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""
|
||||
Tests for bin/hermes_cleanup.py — Stale process detection and cleanup.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "bin"))
|
||||
|
||||
from hermes_cleanup import (
|
||||
get_process_age_hours,
|
||||
get_child_pids,
|
||||
identify_stale_sessions,
|
||||
kill_session,
|
||||
generate_report,
|
||||
)
|
||||
|
||||
|
||||
class TestGetProcessAgeHours(unittest.TestCase):
|
||||
@patch("hermes_cleanup.subprocess.run")
|
||||
def test_returns_age(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="3600\n")
|
||||
age = get_process_age_hours(1234)
|
||||
self.assertAlmostEqual(age, 1.0, delta=0.01)
|
||||
|
||||
@patch("hermes_cleanup.subprocess.run")
|
||||
def test_returns_none_on_error(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||
age = get_process_age_hours(9999)
|
||||
self.assertIsNone(age)
|
||||
|
||||
|
||||
class TestGetChildPids(unittest.TestCase):
|
||||
@patch("hermes_cleanup.subprocess.run")
|
||||
def test_returns_child_pids(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="1001\n1002\n")
|
||||
pids = get_child_pids(1234)
|
||||
self.assertEqual(pids, [1001, 1002])
|
||||
|
||||
@patch("hermes_cleanup.subprocess.run")
|
||||
def test_returns_empty_on_no_children(self, mock_run):
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||
pids = get_child_pids(1234)
|
||||
self.assertEqual(pids, [])
|
||||
|
||||
|
||||
class TestKillSession(unittest.TestCase):
|
||||
def test_dry_run_does_not_kill(self):
|
||||
session = {
|
||||
"session_key": "test",
|
||||
"main_pid": 99999, # unlikely to exist
|
||||
"children": [],
|
||||
}
|
||||
result = kill_session(session, dry_run=True)
|
||||
self.assertTrue(result["dry_run"])
|
||||
self.assertIn(99999, result["killed"])
|
||||
|
||||
@patch("hermes_cleanup.os.kill")
|
||||
def test_kill_terminates_process(self, mock_kill):
|
||||
session = {
|
||||
"session_key": "test",
|
||||
"main_pid": 1234,
|
||||
"children": [1235],
|
||||
}
|
||||
result = kill_session(session, dry_run=False)
|
||||
self.assertFalse(result["dry_run"])
|
||||
self.assertEqual(mock_kill.call_count, 2)
|
||||
|
||||
|
||||
class TestGenerateReport(unittest.TestCase):
|
||||
def test_empty_report(self):
|
||||
report = generate_report([])
|
||||
self.assertIn("No stale sessions", report)
|
||||
|
||||
def test_report_with_stale(self):
|
||||
stale = [{
|
||||
"session_key": "test",
|
||||
"main_pid": 1234,
|
||||
"age_hours": 48.5,
|
||||
"cpu_percent": 0.1,
|
||||
"total_rss_kb": 20480,
|
||||
"total_rss_mb": 20.0,
|
||||
"process_count": 2,
|
||||
"command": "python3 -m hermes.cli chat",
|
||||
"children": [1235],
|
||||
}]
|
||||
report = generate_report(stale)
|
||||
self.assertIn("48.5h", report)
|
||||
self.assertIn("20.0 MB", report)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,139 +1,60 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for normalize-code-blocks.py — training data code block indentation fix (#750)."""
|
||||
"""
|
||||
Tests for scripts/normalize-code-blocks.py — Code block indentation normalization.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import textwrap
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
|
||||
from normalize_code_blocks import normalize_code_block, process_line, CODE_BLOCK_RE
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
from normalize_code_blocks import process_line
|
||||
|
||||
|
||||
class TestNormalizeCodeBlock:
|
||||
def test_basic_dedent(self):
|
||||
block = "```python\n from fastapi import FastAPI\n app = FastAPI()\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert " from fastapi" not in result
|
||||
assert "from fastapi" in result
|
||||
|
||||
def test_preserves_language_tag(self):
|
||||
block = "```python\n x = 1\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert result.startswith("```python")
|
||||
|
||||
def test_empty_block_unchanged(self):
|
||||
block = "```python\n \n \n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert result == block
|
||||
|
||||
def test_multiple_blocks(self):
|
||||
text = 'First: ```python\n x = 1\n``` and second: ```python\n y = 2\n```'
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, text)
|
||||
assert " x = 1" not in result
|
||||
assert " y = 2" not in result
|
||||
assert "x = 1" in result
|
||||
assert "y = 2" in result
|
||||
|
||||
def test_bash_block(self):
|
||||
block = "```bash\n echo hello\n ls -la\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert " echo" not in result
|
||||
assert "echo hello" in result
|
||||
|
||||
def test_unlabeled_block(self):
|
||||
block = "```\n some code\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert " some code" not in result
|
||||
|
||||
def test_mixed_indentation(self):
|
||||
block = "```python\n def foo():\n return 42\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
lines = result.split("\n")
|
||||
# First code line should not have leading spaces from embedding
|
||||
code_lines = [l for l in lines if l.strip() and not l.startswith("```")]
|
||||
assert code_lines[0].startswith("def")
|
||||
|
||||
def test_strips_leading_trailing_blanks(self):
|
||||
block = "```python\n\n x = 1\n\n```"
|
||||
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
|
||||
assert "\n\n" not in result.split("```python")[1].split("```")[0]
|
||||
|
||||
|
||||
class TestProcessLine:
|
||||
def test_valid_jsonl_with_code(self):
|
||||
obj = {"prompt": "write code", "response": "```python\n x = 1\n```"}
|
||||
line = json.dumps(obj)
|
||||
fixed, n = process_line(line)
|
||||
parsed = json.loads(fixed)
|
||||
assert n == 1
|
||||
assert " x = 1" not in parsed["response"]
|
||||
|
||||
def test_no_code_blocks(self):
|
||||
obj = {"text": "hello world"}
|
||||
line = json.dumps(obj)
|
||||
fixed, n = process_line(line)
|
||||
assert n == 0
|
||||
assert json.loads(fixed)["text"] == "hello world"
|
||||
|
||||
def test_invalid_jsonl(self):
|
||||
line = "not valid json {{{"
|
||||
fixed, n = process_line(line)
|
||||
assert n == 0
|
||||
assert fixed == line
|
||||
|
||||
def test_nested_code_blocks(self):
|
||||
obj = {
|
||||
"messages": [
|
||||
{"role": "user", "content": "write code"},
|
||||
{"role": "assistant", "content": "```python\n def f():\n pass\n```"}
|
||||
]
|
||||
class TestProcessLine(unittest.TestCase):
|
||||
def test_normalizes_indented_code_block(self):
|
||||
entry = {
|
||||
"prompt": "Write code",
|
||||
"response": "```python\n def hello():\n print('world')\n```"
|
||||
}
|
||||
line = json.dumps(obj)
|
||||
fixed, n = process_line(line)
|
||||
assert n == 1
|
||||
parsed = json.loads(fixed)
|
||||
assert " def f" not in parsed["messages"][1]["content"]
|
||||
line = json.dumps(entry)
|
||||
result, count = process_line(line)
|
||||
parsed = json.loads(result.strip())
|
||||
# Code block indentation should be normalized
|
||||
self.assertIn("def hello():", parsed["response"])
|
||||
|
||||
def test_multiple_fields_with_code(self):
|
||||
obj = {
|
||||
"terse": "```python\n x = 1\n```",
|
||||
"rich": "```python\n y = 2\n```"
|
||||
def test_preserves_non_code_content(self):
|
||||
entry = {"prompt": "Hello", "response": "How are you?"}
|
||||
line = json.dumps(entry)
|
||||
result, count = process_line(line)
|
||||
parsed = json.loads(result.strip())
|
||||
self.assertEqual(parsed["response"], "How are you?")
|
||||
|
||||
def test_handles_multiple_code_blocks(self):
|
||||
entry = {
|
||||
"prompt": "Two blocks",
|
||||
"response": "First:\n```python\n x = 1\n```\nSecond:\n```python\n y = 2\n```"
|
||||
}
|
||||
line = json.dumps(obj)
|
||||
fixed, n = process_line(line)
|
||||
parsed = json.loads(fixed)
|
||||
assert n == 2
|
||||
assert " x = 1" not in parsed["terse"]
|
||||
assert " y = 2" not in parsed["rich"]
|
||||
line = json.dumps(entry)
|
||||
result, count = process_line(line)
|
||||
parsed = json.loads(result.strip())
|
||||
self.assertIn("x = 1", parsed["response"])
|
||||
self.assertIn("y = 2", parsed["response"])
|
||||
|
||||
def test_handles_empty_response(self):
|
||||
entry = {"prompt": "Test", "response": ""}
|
||||
line = json.dumps(entry)
|
||||
result, count = process_line(line)
|
||||
parsed = json.loads(result.strip())
|
||||
self.assertEqual(parsed["response"], "")
|
||||
|
||||
class TestEndToEnd:
|
||||
def test_file_processing(self):
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
|
||||
f.write(json.dumps({"r": "```python\n x = 1\n```"}) + "\n")
|
||||
f.write(json.dumps({"r": "no code here"}) + "\n")
|
||||
f.write(json.dumps({"r": "```python\n def g():\n return 99\n```"}) + "\n")
|
||||
f.flush()
|
||||
|
||||
# Process using the script logic
|
||||
lines = Path(f.name).read_text().splitlines(keepends=True)
|
||||
fixed = []
|
||||
total = 0
|
||||
for line in lines:
|
||||
fl, n = process_line(line)
|
||||
fixed.append(fl)
|
||||
total += n
|
||||
|
||||
os.unlink(f.name)
|
||||
assert total == 2
|
||||
# Verify first line is fixed
|
||||
first = json.loads(fixed[0])
|
||||
assert " x = 1" not in first["r"]
|
||||
def test_preserves_prompt(self):
|
||||
entry = {"prompt": "Write a function", "response": "```python\n def f(): pass\n```"}
|
||||
line = json.dumps(entry)
|
||||
result, count = process_line(line)
|
||||
parsed = json.loads(result.strip())
|
||||
self.assertEqual(parsed["prompt"], "Write a function")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import unittest
|
||||
unittest.main()
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from scripts.pr_triage import categorize, refs, find_duplicates, health, is_safe_to_merge
|
||||
from scripts.pr_triage import categorize_pr, find_duplicates, find_referenced_issues
|
||||
|
||||
|
||||
class TestCategorize:
|
||||
@@ -12,23 +12,23 @@ class TestCategorize:
|
||||
|
||||
def test_training_data(self):
|
||||
pr = {"title": "Add DPO training data", "body": "", "labels": []}
|
||||
assert categorize(pr) == "training-data"
|
||||
assert categorize_pr(pr) == "training-data"
|
||||
|
||||
def test_bug_fix(self):
|
||||
pr = {"title": "fix: resolve crash on startup", "body": "", "labels": []}
|
||||
assert categorize(pr) == "bug-fix"
|
||||
assert categorize_pr(pr) == "bug-fix"
|
||||
|
||||
def test_feature(self):
|
||||
pr = {"title": "feat: add dark mode", "body": "", "labels": []}
|
||||
assert categorize(pr) == "feature"
|
||||
assert categorize_pr(pr) == "feature"
|
||||
|
||||
def test_maintenance(self):
|
||||
pr = {"title": "refactor: simplify auth flow", "body": "", "labels": []}
|
||||
assert categorize(pr) == "maintenance"
|
||||
assert categorize_pr(pr) == "maintenance"
|
||||
|
||||
def test_other(self):
|
||||
pr = {"title": "Update readme", "body": "", "labels": []}
|
||||
assert categorize(pr) == "other"
|
||||
assert categorize_pr(pr) == "other"
|
||||
|
||||
|
||||
class TestRefs:
|
||||
@@ -36,19 +36,19 @@ class TestRefs:
|
||||
|
||||
def test_extracts_from_title(self):
|
||||
pr = {"title": "fix: resolve #123", "body": ""}
|
||||
assert refs(pr) == [123]
|
||||
assert find_referenced_issues(pr) == [123]
|
||||
|
||||
def test_extracts_from_body(self):
|
||||
pr = {"title": "Fix", "body": "Closes #456, refs #789"}
|
||||
assert refs(pr) == [456, 789]
|
||||
assert find_referenced_issues(pr) == [456, 789]
|
||||
|
||||
def test_no_refs(self):
|
||||
def test_no_find_referenced_issues(self):
|
||||
pr = {"title": "Fix", "body": "No issue refs"}
|
||||
assert refs(pr) == []
|
||||
assert find_referenced_issues(pr) == []
|
||||
|
||||
def test_multiple_refs(self):
|
||||
def test_multiple_find_referenced_issues(self):
|
||||
pr = {"title": "#1 and #2", "body": "Also #3"}
|
||||
assert refs(pr) == [1, 2, 3]
|
||||
assert find_referenced_issues(pr) == [1, 2, 3]
|
||||
|
||||
|
||||
class TestFindDuplicates:
|
||||
|
||||
@@ -341,6 +341,44 @@ def backfill_provenance(
|
||||
return stats
|
||||
|
||||
|
||||
|
||||
|
||||
class ProvenanceTracker:
|
||||
"""Track provenance metadata for training pairs."""
|
||||
|
||||
def __init__(self):
|
||||
self.stats = {
|
||||
"total_pairs": 0,
|
||||
"pairs_with_provenance": 0,
|
||||
"pairs_without_provenance": 0,
|
||||
}
|
||||
|
||||
def generate_pair_id(self, pair: dict) -> str:
|
||||
"""Generate a deterministic ID for a pair."""
|
||||
content = json.dumps(pair, sort_keys=True)
|
||||
return hashlib.sha256(content.encode()).hexdigest()[:16]
|
||||
|
||||
def process_pair(self, pair: dict) -> dict:
|
||||
"""Process a pair, adding provenance if missing."""
|
||||
self.stats["total_pairs"] += 1
|
||||
if "source_session_id" in pair and pair["source_session_id"]:
|
||||
self.stats["pairs_with_provenance"] += 1
|
||||
else:
|
||||
self.stats["pairs_without_provenance"] += 1
|
||||
pair = attach_provenance(pair, source="unknown", source_session_id="unknown", model="unknown")
|
||||
if "pair_id" not in pair:
|
||||
pair["pair_id"] = self.generate_pair_id(pair)
|
||||
return pair
|
||||
|
||||
def process_file(self, input_path: str, output_path: str = None) -> dict:
|
||||
"""Process a JSONL file, adding provenance to all pairs."""
|
||||
pairs = load_jsonl(input_path)
|
||||
processed = [self.process_pair(p) for p in pairs]
|
||||
if output_path:
|
||||
save_jsonl(processed, output_path)
|
||||
return self.stats
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
|
||||
Reference in New Issue
Block a user