#!/usr/bin/env python3 """ Quality Gate — Validate pipeline outputs before saving. Checks: - JSON schema validation for all output formats - Content quality (not empty, not duplicated, not toxic) - SOUL.md compliance for agent-facing content - Auto-reject bad outputs, re-queue for regeneration - Quality score tracking per pipeline Usage: python3 quality-gate.py validate training-data/pairs.jsonl python3 quality-gate.py validate --format training-pairs data.jsonl python3 quality-gate.py score training-data/pairs.jsonl python3 quality-gate.py stats """ import hashlib import json import os import sys from datetime import datetime, timezone from pathlib import Path HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) SCORE_FILE = HERMES_HOME / "quality-scores.jsonl" HASH_FILE = HERMES_HOME / "quality-hashes.json" MAX_HASH_AGE_DAYS = 7 # ── Validators ───────────────────────────────────────── TOXIC_PATTERNS = [ "kill yourself", "kys", "you should die", "end it all", "nobody loves you", "waste of life", ] def validate_training_pair(entry): """Validate a training pair (prompt + response).""" errors = [] if not isinstance(entry, dict): return ["Entry is not a dict"] prompt = entry.get("prompt", "") or entry.get("instruction", "") or "" response = entry.get("response", "") or entry.get("output", "") or entry.get("completion", "") or "" if not prompt.strip(): errors.append("Empty prompt") if not response.strip(): errors.append("Empty response") if len(response) < 10: errors.append(f"Response too short ({len(response)} chars)") if len(prompt) > 10000: errors.append(f"Prompt too long ({len(prompt)} chars)") # Toxicity check combined = (prompt + " " + response).lower() for pattern in TOXIC_PATTERNS: if pattern in combined: errors.append(f"Toxic content detected: '{pattern}'") return errors def validate_jsonl(filepath): """Validate a JSONL file — each line must be valid JSON.""" errors = [] seen_hashes = set() line_count = 0 try: with open(filepath) as f: for i, line in enumerate(f, 1): line = line.strip() if not line: continue line_count += 1 try: entry = json.loads(line) except json.JSONDecodeError as e: errors.append(f"Line {i}: invalid JSON: {e}") continue # Duplicate detection h = hashlib.sha256(line.encode()).hexdigest()[:16] if h in seen_hashes: errors.append(f"Line {i}: duplicate content (hash {h})") seen_hashes.add(h) # Content validation if isinstance(entry, dict): pair_errors = validate_training_pair(entry) for pe in pair_errors: errors.append(f"Line {i}: {pe}") except Exception as e: errors.append(f"File error: {e}") return errors, line_count, seen_hashes def validate_json(filepath): """Validate a single JSON file.""" errors = [] try: with open(filepath) as f: data = json.load(f) except json.JSONDecodeError as e: return [f"Invalid JSON: {e}"], 0 if isinstance(data, list): seen = set() for i, entry in enumerate(data): if isinstance(entry, dict): h = hashlib.sha256(json.dumps(entry, sort_keys=True).encode()).hexdigest()[:16] if h in seen: errors.append(f"Index {i}: duplicate entry") seen.add(h) return errors, len(data) if isinstance(data, list) else 1 # ── Quality Scoring ──────────────────────────────────── def score_file(filepath): """Score a pipeline output file. Returns 0-100.""" path = Path(filepath) if not path.exists(): return 0 suffix = path.suffix.lower() if suffix == ".jsonl": errors, count, _ = validate_jsonl(filepath) elif suffix == ".json": errors, count = validate_json(filepath) else: return 50 # unknown format if count == 0: return 0 error_rate = len(errors) / count score = max(0, int(100 * (1 - error_rate))) # Bonus for having content if count >= 100: score = min(100, score + 5) return score def record_score(filepath, score): """Record quality score for tracking.""" HERMES_HOME.mkdir(parents=True, exist_ok=True) entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "file": str(filepath), "score": score, } with open(SCORE_FILE, "a") as f: f.write(json.dumps(entry) + " ") # ── Dedup Hash Management ───────────────────────────── def load_hashes(): try: return json.loads(HASH_FILE.read_text()) except Exception: return {"entries": {}, "last_cleanup": None} def save_hashes(data): HASH_FILE.parent.mkdir(parents=True, exist_ok=True) HASH_FILE.write_text(json.dumps(data, indent=2)) def cleanup_old_hashes(data, max_age_days=MAX_HASH_AGE_DAYS): """Remove hash entries older than max_age_days.""" cutoff = datetime.now(timezone.utc).timestamp() - (max_age_days * 86400) before = len(data.get("entries", {})) data["entries"] = { k: v for k, v in data.get("entries", {}).items() if v.get("ts", 0) > cutoff } data["last_cleanup"] = datetime.now(timezone.utc).isoformat() after = len(data["entries"]) return before - after # ── CLI ──────────────────────────────────────────────── def cmd_validate(args): filepath = args[0] if args else None if not filepath or not os.path.exists(filepath): print(f"ERROR: {filepath} not found") sys.exit(1) suffix = Path(filepath).suffix.lower() if suffix == ".jsonl": errors, count, _ = validate_jsonl(filepath) elif suffix == ".json": errors, count = validate_json(filepath) else: print(f"Unsupported format: {suffix}") sys.exit(1) score = score_file(filepath) record_score(filepath, score) if errors: for e in errors[:20]: print(f"FAIL: {e}") if len(errors) > 20: print(f"... and {len(errors)-20} more") print(f" Score: {score}/100 ({len(errors)} errors in {count} entries)") sys.exit(1) else: print(f"OK: {filepath} ({count} entries, score {score}/100)") def cmd_score(args): filepath = args[0] if args else None if not filepath: print("Usage: quality-gate.py score ") sys.exit(1) score = score_file(filepath) print(f"Score: {score}/100") record_score(filepath, score) def cmd_stats(): if not SCORE_FILE.exists(): print("No quality scores recorded yet.") return scores = [] with open(SCORE_FILE) as f: for line in f: try: scores.append(json.loads(line)) except Exception: continue if not scores: print("No scores recorded.") return by_file = {} for s in scores: fname = s.get("file", "?") by_file.setdefault(fname, []).append(s.get("score", 0)) print("Quality Scores:") for fname, scs in sorted(by_file.items()): avg = sum(scs) / len(scs) latest = scs[-1] print(f" {fname}: avg={avg:.0f}, latest={latest}, runs={len(scs)}") def cmd_cleanup(): data = load_hashes() removed = cleanup_old_hashes(data) save_hashes(data) print(f"Cleaned up {removed} old hash entries (>{MAX_HASH_AGE_DAYS} days)") def main(): if len(sys.argv) < 2: print("Usage: quality-gate.py [args]") sys.exit(1) cmd = sys.argv[1] args = sys.argv[2:] if cmd == "validate": cmd_validate(args) elif cmd == "score": cmd_score(args) elif cmd == "stats": cmd_stats() elif cmd == "cleanup": cmd_cleanup() else: print(f"Unknown command: {cmd}") sys.exit(1) if __name__ == "__main__": main()