Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 13s
Smoke Test / smoke (pull_request) Failing after 11s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 14s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 44s
Validate Config / Shell Script Lint (pull_request) Failing after 24s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 3s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 3m54s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Validates JSONL/JSON pipeline outputs for: - Schema correctness - Content quality (non-empty, not duplicated) - Toxicity detection - Dedup hash management with auto-cleanup Usage: python3 bin/quality-gate.py validate data.jsonl python3 bin/quality-gate.py score data.jsonl python3 bin/quality-gate.py stats python3 bin/quality-gate.py cleanup Closes #623
293 lines
8.5 KiB
Python
293 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Quality Gate — Validate pipeline outputs before saving.
|
|
|
|
Checks:
|
|
- JSON schema validation for all output formats
|
|
- Content quality (not empty, not duplicated, not toxic)
|
|
- SOUL.md compliance for agent-facing content
|
|
- Auto-reject bad outputs, re-queue for regeneration
|
|
- Quality score tracking per pipeline
|
|
|
|
Usage:
|
|
python3 quality-gate.py validate training-data/pairs.jsonl
|
|
python3 quality-gate.py validate --format training-pairs data.jsonl
|
|
python3 quality-gate.py score training-data/pairs.jsonl
|
|
python3 quality-gate.py stats
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")))
|
|
SCORE_FILE = HERMES_HOME / "quality-scores.jsonl"
|
|
HASH_FILE = HERMES_HOME / "quality-hashes.json"
|
|
MAX_HASH_AGE_DAYS = 7
|
|
|
|
# ── Validators ─────────────────────────────────────────
|
|
|
|
TOXIC_PATTERNS = [
|
|
"kill yourself", "kys", "you should die", "end it all",
|
|
"nobody loves you", "waste of life",
|
|
]
|
|
|
|
def validate_training_pair(entry):
|
|
"""Validate a training pair (prompt + response)."""
|
|
errors = []
|
|
if not isinstance(entry, dict):
|
|
return ["Entry is not a dict"]
|
|
|
|
prompt = entry.get("prompt", "") or entry.get("instruction", "") or ""
|
|
response = entry.get("response", "") or entry.get("output", "") or entry.get("completion", "") or ""
|
|
|
|
if not prompt.strip():
|
|
errors.append("Empty prompt")
|
|
if not response.strip():
|
|
errors.append("Empty response")
|
|
if len(response) < 10:
|
|
errors.append(f"Response too short ({len(response)} chars)")
|
|
if len(prompt) > 10000:
|
|
errors.append(f"Prompt too long ({len(prompt)} chars)")
|
|
|
|
# Toxicity check
|
|
combined = (prompt + " " + response).lower()
|
|
for pattern in TOXIC_PATTERNS:
|
|
if pattern in combined:
|
|
errors.append(f"Toxic content detected: '{pattern}'")
|
|
|
|
return errors
|
|
|
|
|
|
def validate_jsonl(filepath):
|
|
"""Validate a JSONL file — each line must be valid JSON."""
|
|
errors = []
|
|
seen_hashes = set()
|
|
line_count = 0
|
|
|
|
try:
|
|
with open(filepath) as f:
|
|
for i, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
line_count += 1
|
|
try:
|
|
entry = json.loads(line)
|
|
except json.JSONDecodeError as e:
|
|
errors.append(f"Line {i}: invalid JSON: {e}")
|
|
continue
|
|
|
|
# Duplicate detection
|
|
h = hashlib.sha256(line.encode()).hexdigest()[:16]
|
|
if h in seen_hashes:
|
|
errors.append(f"Line {i}: duplicate content (hash {h})")
|
|
seen_hashes.add(h)
|
|
|
|
# Content validation
|
|
if isinstance(entry, dict):
|
|
pair_errors = validate_training_pair(entry)
|
|
for pe in pair_errors:
|
|
errors.append(f"Line {i}: {pe}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"File error: {e}")
|
|
|
|
return errors, line_count, seen_hashes
|
|
|
|
|
|
def validate_json(filepath):
|
|
"""Validate a single JSON file."""
|
|
errors = []
|
|
try:
|
|
with open(filepath) as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
return [f"Invalid JSON: {e}"], 0
|
|
|
|
if isinstance(data, list):
|
|
seen = set()
|
|
for i, entry in enumerate(data):
|
|
if isinstance(entry, dict):
|
|
h = hashlib.sha256(json.dumps(entry, sort_keys=True).encode()).hexdigest()[:16]
|
|
if h in seen:
|
|
errors.append(f"Index {i}: duplicate entry")
|
|
seen.add(h)
|
|
|
|
return errors, len(data) if isinstance(data, list) else 1
|
|
|
|
|
|
# ── Quality Scoring ────────────────────────────────────
|
|
|
|
def score_file(filepath):
|
|
"""Score a pipeline output file. Returns 0-100."""
|
|
path = Path(filepath)
|
|
if not path.exists():
|
|
return 0
|
|
|
|
suffix = path.suffix.lower()
|
|
if suffix == ".jsonl":
|
|
errors, count, _ = validate_jsonl(filepath)
|
|
elif suffix == ".json":
|
|
errors, count = validate_json(filepath)
|
|
else:
|
|
return 50 # unknown format
|
|
|
|
if count == 0:
|
|
return 0
|
|
|
|
error_rate = len(errors) / count
|
|
score = max(0, int(100 * (1 - error_rate)))
|
|
|
|
# Bonus for having content
|
|
if count >= 100:
|
|
score = min(100, score + 5)
|
|
|
|
return score
|
|
|
|
|
|
def record_score(filepath, score):
|
|
"""Record quality score for tracking."""
|
|
HERMES_HOME.mkdir(parents=True, exist_ok=True)
|
|
entry = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"file": str(filepath),
|
|
"score": score,
|
|
}
|
|
with open(SCORE_FILE, "a") as f:
|
|
f.write(json.dumps(entry) + "
|
|
")
|
|
|
|
|
|
# ── Dedup Hash Management ─────────────────────────────
|
|
|
|
def load_hashes():
|
|
try:
|
|
return json.loads(HASH_FILE.read_text())
|
|
except Exception:
|
|
return {"entries": {}, "last_cleanup": None}
|
|
|
|
|
|
def save_hashes(data):
|
|
HASH_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
HASH_FILE.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
def cleanup_old_hashes(data, max_age_days=MAX_HASH_AGE_DAYS):
|
|
"""Remove hash entries older than max_age_days."""
|
|
cutoff = datetime.now(timezone.utc).timestamp() - (max_age_days * 86400)
|
|
before = len(data.get("entries", {}))
|
|
data["entries"] = {
|
|
k: v for k, v in data.get("entries", {}).items()
|
|
if v.get("ts", 0) > cutoff
|
|
}
|
|
data["last_cleanup"] = datetime.now(timezone.utc).isoformat()
|
|
after = len(data["entries"])
|
|
return before - after
|
|
|
|
|
|
# ── CLI ────────────────────────────────────────────────
|
|
|
|
def cmd_validate(args):
|
|
filepath = args[0] if args else None
|
|
if not filepath or not os.path.exists(filepath):
|
|
print(f"ERROR: {filepath} not found")
|
|
sys.exit(1)
|
|
|
|
suffix = Path(filepath).suffix.lower()
|
|
if suffix == ".jsonl":
|
|
errors, count, _ = validate_jsonl(filepath)
|
|
elif suffix == ".json":
|
|
errors, count = validate_json(filepath)
|
|
else:
|
|
print(f"Unsupported format: {suffix}")
|
|
sys.exit(1)
|
|
|
|
score = score_file(filepath)
|
|
record_score(filepath, score)
|
|
|
|
if errors:
|
|
for e in errors[:20]:
|
|
print(f"FAIL: {e}")
|
|
if len(errors) > 20:
|
|
print(f"... and {len(errors)-20} more")
|
|
print(f"
|
|
Score: {score}/100 ({len(errors)} errors in {count} entries)")
|
|
sys.exit(1)
|
|
else:
|
|
print(f"OK: {filepath} ({count} entries, score {score}/100)")
|
|
|
|
|
|
def cmd_score(args):
|
|
filepath = args[0] if args else None
|
|
if not filepath:
|
|
print("Usage: quality-gate.py score <file>")
|
|
sys.exit(1)
|
|
score = score_file(filepath)
|
|
print(f"Score: {score}/100")
|
|
record_score(filepath, score)
|
|
|
|
|
|
def cmd_stats():
|
|
if not SCORE_FILE.exists():
|
|
print("No quality scores recorded yet.")
|
|
return
|
|
|
|
scores = []
|
|
with open(SCORE_FILE) as f:
|
|
for line in f:
|
|
try:
|
|
scores.append(json.loads(line))
|
|
except Exception:
|
|
continue
|
|
|
|
if not scores:
|
|
print("No scores recorded.")
|
|
return
|
|
|
|
by_file = {}
|
|
for s in scores:
|
|
fname = s.get("file", "?")
|
|
by_file.setdefault(fname, []).append(s.get("score", 0))
|
|
|
|
print("Quality Scores:")
|
|
for fname, scs in sorted(by_file.items()):
|
|
avg = sum(scs) / len(scs)
|
|
latest = scs[-1]
|
|
print(f" {fname}: avg={avg:.0f}, latest={latest}, runs={len(scs)}")
|
|
|
|
|
|
def cmd_cleanup():
|
|
data = load_hashes()
|
|
removed = cleanup_old_hashes(data)
|
|
save_hashes(data)
|
|
print(f"Cleaned up {removed} old hash entries (>{MAX_HASH_AGE_DAYS} days)")
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: quality-gate.py <validate|score|stats|cleanup> [args]")
|
|
sys.exit(1)
|
|
|
|
cmd = sys.argv[1]
|
|
args = sys.argv[2:]
|
|
|
|
if cmd == "validate":
|
|
cmd_validate(args)
|
|
elif cmd == "score":
|
|
cmd_score(args)
|
|
elif cmd == "stats":
|
|
cmd_stats()
|
|
elif cmd == "cleanup":
|
|
cmd_cleanup()
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|