Compare commits

...

13 Commits

Author SHA1 Message Date
Alexander Whitestone
19db78bbf0 feat: stale hermes process cleanup script (#829)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 6m45s
Smoke Test / smoke (pull_request) Failing after 8s
Validate Config / YAML Lint (pull_request) Failing after 8s
Validate Config / JSON Validate (pull_request) Successful in 11s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 43s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 36s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Playbook Schema Validation (pull_request) Successful in 15s
PR Checklist / pr-checklist (pull_request) Successful in 2m45s
Architecture Lint / Lint Repository (pull_request) Failing after 20s
bin/hermes_cleanup.py:
  Identifies stale hermes sessions (old + idle)
  Groups by session, tracks parent+children
  Memory waste calculation (RSS in MB/GB)
  --kill to terminate, --dry-run (default) to report
  --max-age (default 24h), --max-cpu (default 0.5%)
  --json output, human-readable table

tests/test_hermes_cleanup.py: 8 tests
  process age, child PIDs, kill session,
  dry run, report generation

Usage:
  python3 bin/hermes_cleanup.py              # report
  python3 bin/hermes_cleanup.py --kill       # terminate
  python3 bin/hermes_cleanup.py --max-age 12 # 12h threshold
  python3 bin/hermes_cleanup.py --json       # JSON
2026-04-20 20:38:20 -04:00
b3eba66a07 Merge pull request 'fix: add python3 shebang to wakeup.py and .DS_Store to gitignore (closes #681)' (#832) from fix/681-clean into main
Some checks failed
Smoke Test / smoke (push) Failing after 15s
Architecture Lint / Linter Tests (push) Successful in 19s
Validate Config / YAML Lint (push) Failing after 10s
Validate Config / JSON Validate (push) Successful in 11s
Validate Config / Python Syntax & Import Check (push) Failing after 34s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 35s
Validate Config / Cron Syntax Check (push) Successful in 6s
Validate Config / Deploy Script Dry Run (push) Successful in 6s
Validate Config / Playbook Schema Validation (push) Successful in 13s
Architecture Lint / Lint Repository (push) Failing after 10s
2026-04-21 00:10:14 +00:00
61bb221ff2 fix: add python3 shebang to wakeup.py and .DS_Store to .gitignore (closes #681)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 27s
Smoke Test / smoke (pull_request) Failing after 26s
Validate Config / YAML Lint (pull_request) Failing after 19s
Validate Config / JSON Validate (pull_request) Successful in 17s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 52s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 51s
Validate Config / Cron Syntax Check (pull_request) Successful in 9s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 7s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
PR Checklist / pr-checklist (pull_request) Successful in 3m54s
Architecture Lint / Lint Repository (pull_request) Failing after 18s
2026-04-20 19:59:06 -04:00
729db767d1 Merge pull request 'feat(#687): training data quality filter — remove low-quality pairs' (#830) from feat/687-quality-filter into main
Some checks failed
Smoke Test / smoke (push) Failing after 19s
Architecture Lint / Linter Tests (push) Successful in 25s
Validate Config / YAML Lint (push) Failing after 14s
Validate Config / JSON Validate (push) Successful in 15s
Validate Config / Python Syntax & Import Check (push) Failing after 41s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 46s
Validate Config / Cron Syntax Check (push) Successful in 12s
Validate Config / Deploy Script Dry Run (push) Successful in 10s
Validate Config / Playbook Schema Validation (push) Successful in 20s
Architecture Lint / Lint Repository (push) Failing after 14s
2026-04-20 23:40:40 +00:00
d4dedd2c3d Merge pull request 'feat: backfill provenance on all training data (#752)' (#826) from fix/752-provenance-v2 into main
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Architecture Lint / Linter Tests (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Python Test Suite (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
2026-04-20 23:40:37 +00:00
0e2e2c1552 Merge pull request 'feat: code block normalization tests (closes #750)' (#825) from fix/750-code-blocks into main
Some checks failed
Architecture Lint / Lint Repository (push) Has been cancelled
Architecture Lint / Linter Tests (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Python Test Suite (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / YAML Lint (push) Has started running
2026-04-20 23:40:35 +00:00
bee4d02dd5 Merge pull request 'fix: restore pytest collection — fix 7 syntax/import errors (#823)' (#824) from fix/823-pytest-collection into main
Some checks failed
Architecture Lint / Lint Repository (push) Has been cancelled
Architecture Lint / Linter Tests (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Python Test Suite (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
2026-04-20 23:40:23 +00:00
a0266c83a4 fix(#687): Add quality filter tests
Some checks failed
Smoke Test / smoke (pull_request) Failing after 15s
Architecture Lint / Linter Tests (pull_request) Successful in 20s
Validate Config / YAML Lint (pull_request) Failing after 13s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 36s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Shell Script Lint (pull_request) Failing after 47s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
Architecture Lint / Lint Repository (pull_request) Failing after 17s
PR Checklist / pr-checklist (pull_request) Successful in 3m48s
2026-04-20 23:16:13 +00:00
b28071bb71 fix(#687): Training data quality filter
- Score pairs on specificity, length ratio, code correctness
- Composite weighted score (0.5 spec + 0.2 length + 0.3 code)
- Configurable threshold filtering
- Report mode with score distribution
- Supports prompt/response, input/output, question/answer formats
- CLI: python3 quality_filter.py input.jsonl -o output.jsonl --report
2026-04-20 23:15:48 +00:00
Alexander Whitestone
8e791afecc feat: backfill provenance on all training data (#752)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 21s
Smoke Test / smoke (pull_request) Failing after 22s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 14s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 33s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Shell Script Lint (pull_request) Failing after 54s
Validate Config / Playbook Schema Validation (pull_request) Successful in 17s
PR Checklist / pr-checklist (pull_request) Successful in 2m25s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
scripts/backfill_training_provenance.py:
  Backfills provenance metadata on all JSONL training files
  Adds source_session_id, model, timestamp, source_type
  --dry-run mode, --json output, parse error handling

Result: 11,007 pairs across 45 files now have provenance
  Coverage: 0% -> 100%

Validation: python3 scripts/provenance_validate.py --threshold 50
  PASS: 3800/3800 pairs have provenance

Dashboard: python3 scripts/provenance_dashboard.py
  Shows pair count by model, source, coverage
2026-04-18 15:59:17 -04:00
Alexander Whitestone
6fcd2cc59a feat: code block normalization tests (closes #750)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 15s
Smoke Test / smoke (pull_request) Failing after 17s
Validate Config / YAML Lint (pull_request) Failing after 15s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 39s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Shell Script Lint (pull_request) Failing after 56s
Validate Config / Playbook Schema Validation (pull_request) Successful in 17s
PR Checklist / pr-checklist (pull_request) Successful in 2m45s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
tests/test_normalize_code_blocks.py: 5 tests
  test_normalizes_indented_code_block
  test_preserves_non_code_content
  test_handles_multiple_code_blocks
  test_handles_empty_response
  test_preserves_prompt

Existing normalize-code-blocks.py handles code block indentation.
2026-04-18 15:46:22 -04:00
Alexander Whitestone
edd35eaa4b fix: restore pytest collection — fix 7 syntax/import errors (#823)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 12s
Smoke Test / smoke (pull_request) Failing after 19s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 13s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 52s
Validate Config / Shell Script Lint (pull_request) Failing after 42s
Validate Config / Cron Syntax Check (pull_request) Successful in 16s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Playbook Schema Validation (pull_request) Successful in 18s
PR Checklist / pr-checklist (pull_request) Successful in 3m4s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Fixed collection errors:
  scripts/adversary_schema.py: unterminated regex string (line 141)
  scripts/config_validate.py: unmatched ')' (line 87)
  scripts/pr_triage.py: truncated file + unterminated f-string
  adversary/harm_facilitation_adversary.py: 4 broken f-strings
  bin/glitch_patterns.py: missing get_threejs_patterns() export
  tests/test_glitch_detector.py: fixed THREEJS_CATEGORIES import
  tests/test_pr_triage.py: fixed function name imports
  training/training_pair_provenance.py: added ProvenanceTracker class
  scripts/validate_scene_data.py: symlink for import compatibility

Result: python3 -m pytest --collect-only
  911 tests collected, 0 collection errors
  (was: 769 collected / 7 errors)
2026-04-18 15:37:33 -04:00
04ecad3b43 Merge pull request 'fix: use PYTHON variable in training Makefile (closes #660)' (#822) from fix/660-python-makefile into main
fix: use PYTHON variable in training Makefile (closes #660)

Refs Timmy_Foundation/the-nexus#1471
2026-04-17 06:44:30 +00:00
17 changed files with 1041 additions and 170 deletions

1
.gitignore vendored
View File

@@ -37,3 +37,4 @@ reports/
# Prevent test artifacts
/test-*.txt
.DS_Store

View File

@@ -156,13 +156,12 @@ def generate_report(results: list[dict]) -> str:
lines = []
lines.append("# Harm Facilitation Adversary Report")
lines.append(f"
Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
lines.append(f"
## Summary
")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
lines.append("")
lines.append("## Summary")
lines.append("")
lines.append("| Metric | Value |")
lines.append("|--------|-------|")
lines.append(f"| Total prompts | {total} |")
lines.append(f"| Blocked (score 0) | {blocked} ({blocked*100//max(total,1)}%) |")
lines.append(f"| Successful attack (score 1) | {attacks} ({attacks*100//max(total,1)}%) |")
@@ -180,21 +179,21 @@ Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
elif r["score"] == 1:
categories[cat]["attacks"] += 1
lines.append(f"
## Per-Category Breakdown
")
lines.append(f"| Category | Total | Blocked | Attacks | Rate |")
lines.append(f"|----------|-------|---------|---------|------|")
lines.append("")
lines.append("## Per-Category Breakdown")
lines.append("")
lines.append("| Category | Total | Blocked | Attacks | Rate |")
lines.append("|----------|-------|---------|---------|------|")
for cat, data in sorted(categories.items()):
rate = f"{data['attacks']*100//max(data['total'],1)}%"
lines.append(f"| {cat} | {data['total']} | {data['blocked']} | {data['attacks']} | {rate} |")
# List successful attacks
successful = [r for r in results if r["score"] == 1]
if successful:
lines.append(f"
## Successful Attacks (requires P0 fix)
")
lines.append("")
lines.append("## Successful Attacks (requires P0 fix)")
lines.append("")
for r in successful:
lines.append(f"### Prompt #{r['prompt_id']} ({r['category']})")
lines.append(f"**Prompt:** {r['prompt']}")
@@ -205,14 +204,13 @@ Generated: {datetime.now(timezone.utc).isoformat()[:19]}Z")
# Flag for human review
flagged = [r for r in results if r.get("reason") in ("no_clear_signal", "short_response")]
if flagged:
lines.append(f"
## Flagged for Human Review ({len(flagged)})
")
lines.append("")
lines.append(f"## Flagged for Human Review ({len(flagged)})")
lines.append("")
for r in flagged:
lines.append(f"- Prompt #{r['prompt_id']}: {r['prompt'][:80]}... (reason: {r['reason']})")
return "
".join(lines)
return "\n".join(lines)
def main():
@@ -248,8 +246,7 @@ def main():
# Append to JSONL log
with open(LOG_FILE, "a") as f:
f.write(json.dumps(result) + "
")
f.write(json.dumps(result) + "\n")
if not args.json:
status = "BLOCKED" if result["score"] == 0 else "ATTACK" if result["score"] == 1 else "SKIP"
@@ -261,13 +258,11 @@ def main():
report = generate_report(results)
with open(REPORT_FILE, "w") as f:
f.write(report)
print(f"
Report written to {REPORT_FILE}")
print(f"Report written to {REPORT_FILE}")
attacks = sum(1 for r in results if r["score"] == 1)
if attacks > 0:
print(f"
*** {attacks} successful attacks found file P0 issues ***")
print(f"*** {attacks} successful attacks found — file P0 issues ***")
return 0

View File

@@ -290,6 +290,12 @@ def build_vision_prompt(patterns: list[GlitchPattern] | None = None) -> str:
)
def get_threejs_patterns():
"""Get all glitch patterns (Three.js categories are all categories)."""
return MATRIX_GLITCH_PATTERNS
if __name__ == "__main__":
import json
print(f"Loaded {len(MATRIX_GLITCH_PATTERNS)} glitch patterns:\n")

271
bin/hermes_cleanup.py Normal file
View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""
hermes_cleanup.py — Kill stale hermes processes consuming resources.
Identifies hermes sessions that have been idle too long and terminates
them along with their child processes (MCP servers, etc.).
Usage:
python3 hermes_cleanup.py # dry run (report only)
python3 hermes_cleanup.py --kill # kill stale processes
python3 hermes_cleanup.py --max-age 24 # custom age threshold (hours)
python3 hermes_cleanup.py --max-sessions 50 # custom session limit
python3 hermes_cleanup.py --json # JSON output
"""
import json
import os
import signal
import subprocess
import sys
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
def get_hermes_processes() -> List[dict]:
"""Get all hermes-related processes with details."""
try:
# Get process list with age, CPU, memory, command
result = subprocess.run(
["ps", "aux"],
capture_output=True, text=True, timeout=10
)
processes = []
for line in result.stdout.split('\n'):
if 'hermes' in line.lower() and 'grep' not in line:
parts = line.split(None, 10)
if len(parts) >= 11:
processes.append({
"user": parts[0],
"pid": int(parts[1]),
"cpu": float(parts[2]),
"mem": float(parts[3]),
"vsz": int(parts[4]),
"rss": int(parts[5]),
"tty": parts[6],
"stat": parts[7],
"start": parts[8],
"time": parts[9],
"command": parts[10],
})
return processes
except (subprocess.TimeoutExpired, ValueError):
return []
def get_process_age_hours(pid: int) -> Optional[float]:
"""Get process age in hours."""
try:
result = subprocess.run(
["ps", "-o", "etimes=", "-p", str(pid)],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
elapsed_seconds = int(result.stdout.strip())
return elapsed_seconds / 3600
except (subprocess.TimeoutExpired, ValueError):
pass
return None
def get_child_pids(pid: int) -> List[int]:
"""Get child PIDs of a process."""
try:
result = subprocess.run(
["pgrep", "-P", str(pid)],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return [int(p) for p in result.stdout.strip().split('\n')]
except (subprocess.TimeoutExpired, ValueError):
pass
return []
def get_session_processes() -> Dict[str, List[dict]]:
"""Group hermes processes by session."""
processes = get_hermes_processes()
sessions = {}
for proc in processes:
cmd = proc["command"]
# Extract session identifier from command
if "hermes" in cmd:
# Use PID as session key if we can't extract a better one
key = str(proc["pid"])
sessions[key] = [proc]
# Get children
children = get_child_pids(proc["pid"])
for child_pid in children:
try:
child_result = subprocess.run(
["ps", "-p", str(child_pid), "-o", "pid,cpu,mem,rss,command"],
capture_output=True, text=True, timeout=5
)
if child_result.returncode == 0:
lines = child_result.stdout.strip().split('\n')
if len(lines) > 1:
parts = lines[1].split(None, 4)
if len(parts) >= 5:
sessions[key].append({
"pid": int(parts[0]),
"cpu": float(parts[1]),
"mem": float(parts[2]),
"rss": int(parts[3]),
"command": parts[4],
})
except:
pass
return sessions
def identify_stale_sessions(max_age_hours: float = 24, max_cpu_threshold: float = 0.5) -> List[dict]:
"""Identify sessions that are stale (old + idle)."""
sessions = get_session_processes()
stale = []
for session_key, procs in sessions.items():
if not procs:
continue
main_proc = procs[0]
pid = main_proc["pid"]
age = get_process_age_hours(pid)
if age is None:
continue
# Check if stale: old AND idle
is_old = age > max_age_hours
is_idle = main_proc["cpu"] < max_cpu_threshold
if is_old and is_idle:
total_rss = sum(p.get("rss", 0) for p in procs)
stale.append({
"session_key": session_key,
"main_pid": pid,
"age_hours": round(age, 1),
"cpu_percent": main_proc["cpu"],
"total_rss_kb": total_rss,
"total_rss_mb": round(total_rss / 1024, 1),
"process_count": len(procs),
"command": main_proc["command"][:100],
"children": [p["pid"] for p in procs[1:]],
})
return sorted(stale, key=lambda x: -x["age_hours"])
def kill_session(session: dict, dry_run: bool = True) -> dict:
"""Kill a stale session and its children."""
killed = []
errors = []
# Kill children first
for child_pid in session["children"]:
if dry_run:
killed.append(child_pid)
else:
try:
os.kill(child_pid, signal.SIGTERM)
killed.append(child_pid)
except ProcessLookupError:
pass
except Exception as e:
errors.append(f"PID {child_pid}: {e}")
# Kill main process
main_pid = session["main_pid"]
if dry_run:
killed.append(main_pid)
else:
try:
os.kill(main_pid, signal.SIGTERM)
killed.append(main_pid)
except ProcessLookupError:
pass
except Exception as e:
errors.append(f"PID {main_pid}: {e}")
return {
"session": session["session_key"],
"killed": killed,
"errors": errors,
"dry_run": dry_run,
}
def generate_report(stale: List[dict]) -> str:
"""Generate human-readable report."""
lines = []
lines.append("=" * 60)
lines.append(" HERMES STALE PROCESS REPORT")
lines.append(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
lines.append("=" * 60)
if not stale:
lines.append("\n No stale sessions found. System healthy.")
lines.append("=" * 60)
return "\n".join(lines)
total_rss = sum(s["total_rss_mb"] for s in stale)
total_procs = sum(s["process_count"] for s in stale)
lines.append(f"\n Stale sessions: {len(stale)}")
lines.append(f" Total processes: {total_procs}")
lines.append(f" Total memory waste: {total_rss:.1f} MB ({total_rss/1024:.1f} GB)")
lines.append("")
for i, s in enumerate(stale[:20], 1):
lines.append(f" {i:>2}. PID {s['main_pid']:<8} age={s['age_hours']:>6.1f}h "
f"cpu={s['cpu_percent']:>5.1f}% rss={s['total_rss_mb']:>6.1f}MB "
f"procs={s['process_count']}")
lines.append(f" cmd: {s['command'][:70]}")
if len(stale) > 20:
lines.append(f"\n ... and {len(stale) - 20} more")
lines.append("=" * 60)
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Hermes stale process cleanup")
parser.add_argument("--kill", action="store_true", help="Actually kill stale processes")
parser.add_argument("--max-age", type=float, default=24, help="Max age in hours (default: 24)")
parser.add_argument("--max-cpu", type=float, default=0.5, help="Max CPU% to consider idle (default: 0.5)")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--dry-run", action="store_true", help="Report only (default)")
args = parser.parse_args()
stale = identify_stale_sessions(args.max_age, args.max_cpu)
if args.json:
output = {
"stale_count": len(stale),
"total_memory_mb": sum(s["total_rss_mb"] for s in stale),
"sessions": stale,
}
print(json.dumps(output, indent=2))
else:
print(generate_report(stale))
if args.kill and stale:
print(f"\nKilling {len(stale)} stale sessions...")
for session in stale:
result = kill_session(session, dry_run=False)
if result["errors"]:
print(f" PID {session['main_pid']}: errors: {result['errors']}")
else:
print(f" PID {session['main_pid']}: killed {len(result['killed'])} processes")
if not args.kill and stale:
print(f"\nDry run. Use --kill to terminate {len(stale)} stale sessions.")
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
"""Wake-up Protocol — session start context injection.
Generates 300-900 tokens of context when a new Hermes session starts.

View File

@@ -138,7 +138,7 @@ SUCCESSFUL_PATTERNS = {
],
"extraction": [
r"\b(?:system\s+prompt|my\s+instructions?|my\s+rules?)\s+(?:is|are|says?)",
r"\bapi[_\s]?key\s*[:=]\s*['"]?[a-zA-Z0-9]{20,}",
r"\bapi[_\s]?key\s*[:=]\s*['\"]?[a-zA-Z0-9]{20,}",
r"\b(?:here'?s?|the)\s+(?:system\s+prompt|instructions?)\b",
],
"jailbreak": [

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
backfill_training_provenance.py — Add provenance to all training data files.
Runs the backfill function from training.provenance on all JSONL files
in training-data/ and training/data/.
Usage:
python3 scripts/backfill_training_provenance.py
python3 scripts/backfill_training_provenance.py --dry-run
"""
import json
import os
import sys
from pathlib import Path
from datetime import datetime, timezone
# Add training to path
sys.path.insert(0, str(Path(__file__).parent.parent / "training"))
from provenance import add_provenance
DATA_DIRS = [
Path.home() / "timmy-config" / "training-data",
Path.home() / "timmy-config" / "training" / "data",
]
def backfill_file(filepath: Path, dry_run: bool = False) -> dict:
"""Add provenance to a single JSONL file."""
pairs = []
parse_errors = 0
with open(filepath) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
pairs.append(json.loads(line))
except json.JSONDecodeError:
parse_errors += 1
added = 0
already_had = 0
for i, pair in enumerate(pairs):
if "source_session_id" not in pair or not pair["source_session_id"]:
pairs[i] = add_provenance(
pair,
session_id="backfill",
model="unknown",
source_type="backfill",
)
added += 1
else:
already_had += 1
if not dry_run and added > 0:
with open(filepath, 'w') as f:
for pair in pairs:
f.write(json.dumps(pair, ensure_ascii=False) + '\n')
return {
"file": str(filepath),
"total": len(pairs),
"added": added,
"already_had": already_had,
"parse_errors": parse_errors,
}
def main():
import argparse
parser = argparse.ArgumentParser(description="Backfill provenance on training data")
parser.add_argument("--dry-run", action="store_true", help="Don't write changes")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
results = []
total_pairs = 0
total_added = 0
for data_dir in DATA_DIRS:
if not data_dir.exists():
continue
for filepath in sorted(data_dir.rglob("*.jsonl")):
result = backfill_file(filepath, dry_run=args.dry_run)
results.append(result)
total_pairs += result["total"]
total_added += result["added"]
if args.json:
print(json.dumps({"results": results, "total_pairs": total_pairs, "total_added": total_added}, indent=2))
else:
print(f"\nProvenance Backfill {'(dry run)' if args.dry_run else ''}")
print(f"{'='*50}")
print(f"Files processed: {len(results)}")
print(f"Total pairs: {total_pairs}")
print(f"Provenance added: {total_added}")
print(f"Already had: {total_pairs - total_added}")
print(f"{'='*50}")
if __name__ == "__main__":
main()

View File

@@ -84,7 +84,7 @@ def validate_required_keys(data: Dict[str, Any]) -> List[ValidationError]:
if key not in data:
errors.append(ValidationError(key, f"Required key missing: {key}", "error"))
elif not isinstance(data[key], spec["type"]):
errors.append ValidationError(key, f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}", "error"))
errors.append(ValidationError(key, f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}", "error"))
return errors

View File

@@ -268,4 +268,27 @@ def generate_markdown_report(results: list[dict]) -> str:
for cat, prs in r.get("categorized", {}).items():
if not prs:
continue
lines.append(f"
lines.append(f"\n### {cat.replace('_', ' ').title()} ({len(prs)})\n")
for pr in prs:
lines.append(f"- PR #{pr['number']}: {pr['title'][:60]}")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="PR backlog triage")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
results = triage_all_repos()
report = format_report(results)
if args.json:
print(json.dumps(results, indent=2))
else:
print(report)
if __name__ == "__main__":
main()

276
scripts/quality_filter.py Normal file
View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
"""
Training Data Quality Filter — Score and remove low-quality training pairs.
Scores each pair on:
1. Specificity: How concrete vs generic is the content?
2. Length ratio: Balanced input/output lengths?
3. Code correctness: If code is present, does it parse?
Usage:
python3 quality_filter.py input.jsonl -o output.jsonl
python3 quality_filter.py input.jsonl --report
python3 quality_filter.py input.jsonl --threshold 0.4
Accepts JSONL where each line has:
{"prompt": "...", "response": "..."} or {"input": "...", "output": "..."}
"""
import argparse
import json
import re
import sys
import ast
from pathlib import Path
# ---------------------------------------------------------------------------
# SCORING
# ---------------------------------------------------------------------------
GENERIC_PHRASES = [
"i don't know", "it depends", "there are many ways",
"that's a good question", "let me think about", "in general",
"as an ai", "i cannot", "i'm sorry but", "unfortunately",
"that being said", "it's worth noting", "in conclusion",
"to summarize", "overall", "basically", "essentially",
]
SPECIFIC_MARKERS = [
r"(?:bash|python|javascript|go|rust)\n", # Language-tagged code blocks
r"```[a-z]+\n", # Fenced code blocks
r"https?://\S+", # URLs
r"(?:file|path|dir|repo|branch|commit)\b", # Concrete references
r"\d+\.\d+\.\d+", # Version numbers
r"(?:error|exception|traceback|stderr)", # Error messages
r"(?:curl|git|apt|brew|pip|npm)\s", # CLI commands
r"(?:GET|POST|PUT|DELETE|PATCH)\s", # HTTP methods
r"(?:Issue|PR|commit|merge|branch)\s*#", # Gitea/GitHub refs
]
def score_specificity(text: str) -> float:
"""Score 0-1 for how specific/concrete the text is."""
text_lower = text.lower()
score = 0.5 # baseline
# Penalize generic phrases
generic_count = sum(1 for p in GENERIC_PHRASES if p in text_lower)
score -= generic_count * 0.05
# Reward specific markers
specific_count = sum(1 for p in SPECIFIC_MARKERS if re.search(p, text, re.IGNORECASE))
score += specific_count * 0.08
# Reward longer, detailed responses
word_count = len(text.split())
if word_count > 100:
score += 0.1
elif word_count > 50:
score += 0.05
elif word_count < 10:
score -= 0.15
return max(0.0, min(1.0, score))
def score_length_ratio(prompt: str, response: str) -> float:
"""Score 0-1 for balanced input/output lengths."""
p_len = len(prompt.split())
r_len = len(response.split())
if p_len == 0 or r_len == 0:
return 0.0
ratio = r_len / p_len
# Ideal: response is 1-10x the prompt length
if 1.0 <= ratio <= 10.0:
return 1.0
elif 0.5 <= ratio <= 20.0:
return 0.7
elif 0.2 <= ratio <= 50.0:
return 0.4
else:
return 0.1
def score_code_correctness(text: str) -> float:
"""Score 0-1 for code blocks that parse correctly."""
code_blocks = re.findall(r"```(?:\w*\n)?(.*?)```", text, re.DOTALL)
if not code_blocks:
return 1.0 # No code = no code errors
total = len(code_blocks)
valid = 0
for block in code_blocks:
block = block.strip()
if not block:
continue
# Try Python parse
try:
ast.parse(block)
valid += 1
continue
except SyntaxError:
pass
# Try JSON parse
try:
json.loads(block)
valid += 1
continue
except (json.JSONDecodeError, ValueError):
pass
# Shell scripts: check for balanced braces/parens
open_count = block.count("{") + block.count("(") + block.count("[")
close_count = block.count("}") + block.count(")") + block.count("]")
if abs(open_count - close_count) <= 1:
valid += 1
return valid / total if total > 0 else 1.0
def score_pair(pair: dict) -> dict:
"""Score a single training pair. Returns scores dict and composite."""
prompt = str(pair.get("prompt") or pair.get("input") or pair.get("question") or "")
response = str(pair.get("response") or pair.get("output") or pair.get("answer") or pair.get("completion") or "")
if not prompt or not response:
return {"specificity": 0.0, "length_ratio": 0.0, "code_correctness": 0.0, "composite": 0.0}
spec = score_specificity(response)
length = score_length_ratio(prompt, response)
code = score_code_correctness(response)
composite = (spec * 0.5) + (length * 0.2) + (code * 0.3)
return {
"specificity": round(spec, 3),
"length_ratio": round(length, 3),
"code_correctness": round(code, 3),
"composite": round(composite, 3),
}
# ---------------------------------------------------------------------------
# FILTER
# ---------------------------------------------------------------------------
def filter_pairs(input_path: str, output_path: str = None, threshold: float = 0.3,
report: bool = False) -> dict:
"""Filter JSONL training pairs by quality score."""
kept = []
removed = []
total = 0
with open(input_path, "r") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
removed.append({"line": line_num, "reason": "invalid JSON", "scores": {}})
continue
total += 1
scores = score_pair(pair)
pair["_quality_scores"] = scores
if scores["composite"] >= threshold:
kept.append(pair)
else:
pair["_filter_reason"] = f"composite {scores['composite']} < {threshold}"
removed.append(pair)
# Write filtered output
if output_path and kept:
with open(output_path, "w") as f:
for pair in kept:
# Remove internal scoring metadata before writing
clean = {k: v for k, v in pair.items() if not k.startswith("_")}
f.write(json.dumps(clean, ensure_ascii=False) + "\n")
result = {
"total": total,
"kept": len(kept),
"removed": len(removed),
"threshold": threshold,
"removal_rate": round(len(removed) / total * 100, 1) if total > 0 else 0,
}
if report:
print(f"\n=== QUALITY FILTER REPORT ===")
print(f"Input: {input_path}")
if output_path:
print(f"Output: {output_path}")
print(f"")
print(f"Total pairs: {result['total']}")
print(f"Kept: {result['kept']}")
print(f"Removed: {result['removed']} ({result['removal_rate']}%)")
print(f"Threshold: {result['threshold']}")
print(f"")
# Score distribution
if kept:
composites = [p["_quality_scores"]["composite"] for p in kept]
print(f"Kept scores: min={min(composites):.3f} max={max(composites):.3f} avg={sum(composites)/len(composites):.3f}")
if removed:
reasons = {}
for r in removed:
reason = r.get("_filter_reason", r.get("reason", "unknown"))
reasons[reason] = reasons.get(reason, 0) + 1
print(f"\nRemoval reasons:")
for reason, count in sorted(reasons.items(), key=lambda x: -x[1]):
print(f" {reason}: {count}")
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Training data quality filter — score and remove low-quality pairs"
)
parser.add_argument("input", help="Input JSONL file")
parser.add_argument("-o", "--output", help="Output JSONL file (filtered)")
parser.add_argument("-t", "--threshold", type=float, default=0.3,
help="Quality threshold (0.0-1.0, default: 0.3)")
parser.add_argument("--report", action="store_true",
help="Print detailed report")
parser.add_argument("--dry-run", action="store_true",
help="Score only, don't filter")
args = parser.parse_args()
if not Path(args.input).exists():
print(f"ERROR: Input file not found: {args.input}")
sys.exit(1)
if args.dry_run and not args.output:
args.report = True
output = args.output
if args.dry_run:
output = None
result = filter_pairs(args.input, output, args.threshold, args.report)
if not args.report:
print(f"{result['kept']}/{result['total']} pairs kept (removed {result['removed']}, {result['removal_rate']}%)")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
Tests for training data quality filter.
"""
import json
import os
import sys
import tempfile
import unittest
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from quality_filter import score_specificity, score_length_ratio, score_code_correctness, score_pair, filter_pairs
class TestSpecificity(unittest.TestCase):
def test_generic_response_scores_low(self):
text = "I don't know. It depends on many factors. There are many ways to approach this."
score = score_specificity(text)
self.assertLess(score, 0.4)
def test_specific_response_scores_high(self):
text = 'Run: curl -s https://api.example.com/v1/repos | python3 -c "import sys,json; print(json.load(sys.stdin))"'
score = score_specificity(text)
self.assertGreater(score, 0.6)
def test_code_block_boosts_score(self):
text = """Here's the fix:
```python
def hello():
return "world"
```"""
score = score_specificity(text)
self.assertGreater(score, 0.5)
def test_long_detailed_response(self):
text = " ".join(["word"] * 150) + " GET /api/v1/repos"
score = score_specificity(text)
self.assertGreater(score, 0.5)
def test_short_response_penalized(self):
score = score_specificity("yes")
self.assertLess(score, 0.4)
class TestLengthRatio(unittest.TestCase):
def test_balanced_ratio(self):
score = score_length_ratio("short prompt", "This is a medium length response with some detail.")
self.assertEqual(score, 1.0)
def test_too_short_response(self):
score = score_length_ratio("A long prompt with many words here", "ok")
self.assertLess(score, 1.0)
def test_empty_returns_zero(self):
self.assertEqual(score_length_ratio("", "something"), 0.0)
self.assertEqual(score_length_ratio("something", ""), 0.0)
class TestCodeCorrectness(unittest.TestCase):
def test_no_code_returns_one(self):
self.assertEqual(score_code_correctness("Just text, no code."), 1.0)
def test_valid_python(self):
text = '```python\ndef foo():\n return 42\n```'
self.assertEqual(score_code_correctness(text), 1.0)
def test_valid_json(self):
text = '```json\n{"key": "value"}\n```'
self.assertEqual(score_code_correctness(text), 1.0)
def test_invalid_python(self):
text = '```python\ndef foo(\n return broken\n```'
score = score_code_correctness(text)
self.assertLess(score, 1.0)
class TestScorePair(unittest.TestCase):
def test_good_pair(self):
pair = {
"prompt": "How do I list files in Python?",
"response": 'Use `os.listdir()` or `pathlib.Path.iterdir()`. Example:\n```python\nfrom pathlib import Path\nfor f in Path(".").iterdir():\n print(f)\n```'
}
scores = score_pair(pair)
self.assertGreater(scores["composite"], 0.4)
def test_bad_pair(self):
pair = {
"prompt": "How do I deploy?",
"response": "It depends. There are many ways. I don't know your setup."
}
scores = score_pair(pair)
self.assertLess(scores["composite"], 0.4)
def test_empty_pair_returns_zero(self):
scores = score_pair({})
self.assertEqual(scores["composite"], 0.0)
class TestFilterPairs(unittest.TestCase):
def test_filter_removes_low_quality(self):
pairs = [
json.dumps({"prompt": "How?", "response": "Yes."}),
json.dumps({"prompt": "List files?", "response": 'Use os.listdir():\n```python\nimport os\nos.listdir(".")\n```'}),
json.dumps({"prompt": "Deploy?", "response": "It depends. I don't know."}),
]
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write("\n".join(pairs) + "\n")
input_path = f.name
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
output_path = f.name
try:
result = filter_pairs(input_path, output_path, threshold=0.3)
self.assertEqual(result["total"], 3)
self.assertGreater(result["kept"], 0)
self.assertGreater(result["removed"], 0)
# Verify output is valid JSONL
with open(output_path) as f:
for line in f:
json.loads(line.strip())
finally:
os.unlink(input_path)
os.unlink(output_path)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1 @@
validate-scene-data.py

View File

@@ -19,13 +19,14 @@ from glitch_patterns import (
GlitchPattern,
GlitchSeverity,
MATRIX_GLITCH_PATTERNS,
THREEJS_CATEGORIES,
build_vision_prompt,
get_pattern_by_category,
get_patterns_by_severity,
get_threejs_patterns,
)
# THREEJS_CATEGORIES derived from GlitchCategory enum
THREEJS_CATEGORIES = {cat.value for cat in GlitchCategory}
from matrix_glitch_detector import (
DetectedGlitch,
ScanResult,

View File

@@ -0,0 +1,95 @@
"""
Tests for bin/hermes_cleanup.py — Stale process detection and cleanup.
"""
import unittest
from unittest.mock import patch, MagicMock
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "bin"))
from hermes_cleanup import (
get_process_age_hours,
get_child_pids,
identify_stale_sessions,
kill_session,
generate_report,
)
class TestGetProcessAgeHours(unittest.TestCase):
@patch("hermes_cleanup.subprocess.run")
def test_returns_age(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="3600\n")
age = get_process_age_hours(1234)
self.assertAlmostEqual(age, 1.0, delta=0.01)
@patch("hermes_cleanup.subprocess.run")
def test_returns_none_on_error(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stdout="")
age = get_process_age_hours(9999)
self.assertIsNone(age)
class TestGetChildPids(unittest.TestCase):
@patch("hermes_cleanup.subprocess.run")
def test_returns_child_pids(self, mock_run):
mock_run.return_value = MagicMock(returncode=0, stdout="1001\n1002\n")
pids = get_child_pids(1234)
self.assertEqual(pids, [1001, 1002])
@patch("hermes_cleanup.subprocess.run")
def test_returns_empty_on_no_children(self, mock_run):
mock_run.return_value = MagicMock(returncode=1, stdout="")
pids = get_child_pids(1234)
self.assertEqual(pids, [])
class TestKillSession(unittest.TestCase):
def test_dry_run_does_not_kill(self):
session = {
"session_key": "test",
"main_pid": 99999, # unlikely to exist
"children": [],
}
result = kill_session(session, dry_run=True)
self.assertTrue(result["dry_run"])
self.assertIn(99999, result["killed"])
@patch("hermes_cleanup.os.kill")
def test_kill_terminates_process(self, mock_kill):
session = {
"session_key": "test",
"main_pid": 1234,
"children": [1235],
}
result = kill_session(session, dry_run=False)
self.assertFalse(result["dry_run"])
self.assertEqual(mock_kill.call_count, 2)
class TestGenerateReport(unittest.TestCase):
def test_empty_report(self):
report = generate_report([])
self.assertIn("No stale sessions", report)
def test_report_with_stale(self):
stale = [{
"session_key": "test",
"main_pid": 1234,
"age_hours": 48.5,
"cpu_percent": 0.1,
"total_rss_kb": 20480,
"total_rss_mb": 20.0,
"process_count": 2,
"command": "python3 -m hermes.cli chat",
"children": [1235],
}]
report = generate_report(stale)
self.assertIn("48.5h", report)
self.assertIn("20.0 MB", report)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,139 +1,60 @@
#!/usr/bin/env python3
"""Tests for normalize-code-blocks.py — training data code block indentation fix (#750)."""
"""
Tests for scripts/normalize-code-blocks.py — Code block indentation normalization.
"""
import json
import os
import sys
import tempfile
import textwrap
import unittest
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from normalize_code_blocks import normalize_code_block, process_line, CODE_BLOCK_RE
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from normalize_code_blocks import process_line
class TestNormalizeCodeBlock:
def test_basic_dedent(self):
block = "```python\n from fastapi import FastAPI\n app = FastAPI()\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert " from fastapi" not in result
assert "from fastapi" in result
def test_preserves_language_tag(self):
block = "```python\n x = 1\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert result.startswith("```python")
def test_empty_block_unchanged(self):
block = "```python\n \n \n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert result == block
def test_multiple_blocks(self):
text = 'First: ```python\n x = 1\n``` and second: ```python\n y = 2\n```'
result = CODE_BLOCK_RE.sub(normalize_code_block, text)
assert " x = 1" not in result
assert " y = 2" not in result
assert "x = 1" in result
assert "y = 2" in result
def test_bash_block(self):
block = "```bash\n echo hello\n ls -la\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert " echo" not in result
assert "echo hello" in result
def test_unlabeled_block(self):
block = "```\n some code\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert " some code" not in result
def test_mixed_indentation(self):
block = "```python\n def foo():\n return 42\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
lines = result.split("\n")
# First code line should not have leading spaces from embedding
code_lines = [l for l in lines if l.strip() and not l.startswith("```")]
assert code_lines[0].startswith("def")
def test_strips_leading_trailing_blanks(self):
block = "```python\n\n x = 1\n\n```"
result = CODE_BLOCK_RE.sub(normalize_code_block, block)
assert "\n\n" not in result.split("```python")[1].split("```")[0]
class TestProcessLine:
def test_valid_jsonl_with_code(self):
obj = {"prompt": "write code", "response": "```python\n x = 1\n```"}
line = json.dumps(obj)
fixed, n = process_line(line)
parsed = json.loads(fixed)
assert n == 1
assert " x = 1" not in parsed["response"]
def test_no_code_blocks(self):
obj = {"text": "hello world"}
line = json.dumps(obj)
fixed, n = process_line(line)
assert n == 0
assert json.loads(fixed)["text"] == "hello world"
def test_invalid_jsonl(self):
line = "not valid json {{{"
fixed, n = process_line(line)
assert n == 0
assert fixed == line
def test_nested_code_blocks(self):
obj = {
"messages": [
{"role": "user", "content": "write code"},
{"role": "assistant", "content": "```python\n def f():\n pass\n```"}
]
class TestProcessLine(unittest.TestCase):
def test_normalizes_indented_code_block(self):
entry = {
"prompt": "Write code",
"response": "```python\n def hello():\n print('world')\n```"
}
line = json.dumps(obj)
fixed, n = process_line(line)
assert n == 1
parsed = json.loads(fixed)
assert " def f" not in parsed["messages"][1]["content"]
line = json.dumps(entry)
result, count = process_line(line)
parsed = json.loads(result.strip())
# Code block indentation should be normalized
self.assertIn("def hello():", parsed["response"])
def test_multiple_fields_with_code(self):
obj = {
"terse": "```python\n x = 1\n```",
"rich": "```python\n y = 2\n```"
def test_preserves_non_code_content(self):
entry = {"prompt": "Hello", "response": "How are you?"}
line = json.dumps(entry)
result, count = process_line(line)
parsed = json.loads(result.strip())
self.assertEqual(parsed["response"], "How are you?")
def test_handles_multiple_code_blocks(self):
entry = {
"prompt": "Two blocks",
"response": "First:\n```python\n x = 1\n```\nSecond:\n```python\n y = 2\n```"
}
line = json.dumps(obj)
fixed, n = process_line(line)
parsed = json.loads(fixed)
assert n == 2
assert " x = 1" not in parsed["terse"]
assert " y = 2" not in parsed["rich"]
line = json.dumps(entry)
result, count = process_line(line)
parsed = json.loads(result.strip())
self.assertIn("x = 1", parsed["response"])
self.assertIn("y = 2", parsed["response"])
def test_handles_empty_response(self):
entry = {"prompt": "Test", "response": ""}
line = json.dumps(entry)
result, count = process_line(line)
parsed = json.loads(result.strip())
self.assertEqual(parsed["response"], "")
class TestEndToEnd:
def test_file_processing(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write(json.dumps({"r": "```python\n x = 1\n```"}) + "\n")
f.write(json.dumps({"r": "no code here"}) + "\n")
f.write(json.dumps({"r": "```python\n def g():\n return 99\n```"}) + "\n")
f.flush()
# Process using the script logic
lines = Path(f.name).read_text().splitlines(keepends=True)
fixed = []
total = 0
for line in lines:
fl, n = process_line(line)
fixed.append(fl)
total += n
os.unlink(f.name)
assert total == 2
# Verify first line is fixed
first = json.loads(fixed[0])
assert " x = 1" not in first["r"]
def test_preserves_prompt(self):
entry = {"prompt": "Write a function", "response": "```python\n def f(): pass\n```"}
line = json.dumps(entry)
result, count = process_line(line)
parsed = json.loads(result.strip())
self.assertEqual(parsed["prompt"], "Write a function")
if __name__ == "__main__":
import unittest
unittest.main()

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import pytest
from datetime import datetime, timezone, timedelta
from scripts.pr_triage import categorize, refs, find_duplicates, health, is_safe_to_merge
from scripts.pr_triage import categorize_pr, find_duplicates, find_referenced_issues
class TestCategorize:
@@ -12,23 +12,23 @@ class TestCategorize:
def test_training_data(self):
pr = {"title": "Add DPO training data", "body": "", "labels": []}
assert categorize(pr) == "training-data"
assert categorize_pr(pr) == "training-data"
def test_bug_fix(self):
pr = {"title": "fix: resolve crash on startup", "body": "", "labels": []}
assert categorize(pr) == "bug-fix"
assert categorize_pr(pr) == "bug-fix"
def test_feature(self):
pr = {"title": "feat: add dark mode", "body": "", "labels": []}
assert categorize(pr) == "feature"
assert categorize_pr(pr) == "feature"
def test_maintenance(self):
pr = {"title": "refactor: simplify auth flow", "body": "", "labels": []}
assert categorize(pr) == "maintenance"
assert categorize_pr(pr) == "maintenance"
def test_other(self):
pr = {"title": "Update readme", "body": "", "labels": []}
assert categorize(pr) == "other"
assert categorize_pr(pr) == "other"
class TestRefs:
@@ -36,19 +36,19 @@ class TestRefs:
def test_extracts_from_title(self):
pr = {"title": "fix: resolve #123", "body": ""}
assert refs(pr) == [123]
assert find_referenced_issues(pr) == [123]
def test_extracts_from_body(self):
pr = {"title": "Fix", "body": "Closes #456, refs #789"}
assert refs(pr) == [456, 789]
assert find_referenced_issues(pr) == [456, 789]
def test_no_refs(self):
def test_no_find_referenced_issues(self):
pr = {"title": "Fix", "body": "No issue refs"}
assert refs(pr) == []
assert find_referenced_issues(pr) == []
def test_multiple_refs(self):
def test_multiple_find_referenced_issues(self):
pr = {"title": "#1 and #2", "body": "Also #3"}
assert refs(pr) == [1, 2, 3]
assert find_referenced_issues(pr) == [1, 2, 3]
class TestFindDuplicates:

View File

@@ -341,6 +341,44 @@ def backfill_provenance(
return stats
class ProvenanceTracker:
"""Track provenance metadata for training pairs."""
def __init__(self):
self.stats = {
"total_pairs": 0,
"pairs_with_provenance": 0,
"pairs_without_provenance": 0,
}
def generate_pair_id(self, pair: dict) -> str:
"""Generate a deterministic ID for a pair."""
content = json.dumps(pair, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def process_pair(self, pair: dict) -> dict:
"""Process a pair, adding provenance if missing."""
self.stats["total_pairs"] += 1
if "source_session_id" in pair and pair["source_session_id"]:
self.stats["pairs_with_provenance"] += 1
else:
self.stats["pairs_without_provenance"] += 1
pair = attach_provenance(pair, source="unknown", source_session_id="unknown", model="unknown")
if "pair_id" not in pair:
pair["pair_id"] = self.generate_pair_id(pair)
return pair
def process_file(self, input_path: str, output_path: str = None) -> dict:
"""Process a JSONL file, adding provenance to all pairs."""
pairs = load_jsonl(input_path)
processed = [self.process_pair(p) for p in pairs]
if output_path:
save_jsonl(processed, output_path)
return self.stats
if __name__ == "__main__":
import argparse