Files
timmy-config/pipelines/quality-gate.py
Alexander Whitestone e780ab0805 feat: Add quality gate for pipeline outputs
Closes #623. Validates all pipeline outputs with JSON schema, content quality, SOUL.md compliance checks.
2026-04-14 22:31:51 +00:00

692 lines
24 KiB
Python

#!/usr/bin/env python3
"""
Quality Gate — Validate All Pipeline Outputs
Every pipeline output must pass quality checks before being saved.
Auto-rejects bad outputs, re-queues for regeneration.
Usage:
python3 quality-gate.py validate --type training_pair --input file.json
python3 quality-gate.py validate --type knowledge_file --input file.json
python3 quality-gate.py validate --type generated_asset --input file.png
python3 quality-gate.py validate --type adversary_output --input file.json
python3 quality-gate.py stats --pipeline training
python3 quality-gate.py report
"""
import argparse
import hashlib
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
QUALITY_DIR = HERMES_HOME / "pipelines" / "quality"
STATS_FILE = QUALITY_DIR / "quality_stats.json"
REJECT_DIR = QUALITY_DIR / "rejected"
SOUL_FILE = Path(__file__).parent.parent / "SOUL.md"
# Ensure directories exist
QUALITY_DIR.mkdir(parents=True, exist_ok=True)
REJECT_DIR.mkdir(parents=True, exist_ok=True)
class QualityResult:
"""Result of a quality check."""
def __init__(self, passed: bool, score: float = 0.0, checks: List[str] = None,
failures: List[str] = None, warnings: List[str] = None):
self.passed = passed
self.score = score # 0.0 to 1.0
self.checks = checks or []
self.failures = failures or []
self.warnings = warnings or []
self.timestamp = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> Dict[str, Any]:
return {
"passed": self.passed,
"score": self.score,
"checks": self.checks,
"failures": self.failures,
"warnings": self.warnings,
"timestamp": self.timestamp
}
def __repr__(self):
status = "PASS" if self.passed else "FAIL"
return f"QualityResult({status}, score={self.score:.2f})"
class QualityGate:
"""Main quality gate class."""
def __init__(self):
self.soul_content = self._load_soul()
self.stats = self._load_stats()
def _load_soul(self) -> str:
"""Load SOUL.md content for compliance checks."""
try:
if SOUL_FILE.exists():
return SOUL_FILE.read_text()
except Exception:
pass
return ""
def _load_stats(self) -> Dict[str, Any]:
"""Load quality statistics."""
try:
if STATS_FILE.exists():
return json.loads(STATS_FILE.read_text())
except Exception:
pass
return {
"total_checks": 0,
"passed": 0,
"failed": 0,
"by_type": {},
"by_pipeline": {},
"recent_failures": []
}
def _save_stats(self):
"""Save quality statistics."""
STATS_FILE.write_text(json.dumps(self.stats, indent=2))
def _update_stats(self, result: QualityResult, check_type: str, pipeline: str = "unknown"):
"""Update statistics with check result."""
self.stats["total_checks"] += 1
if result.passed:
self.stats["passed"] += 1
else:
self.stats["failed"] += 1
self.stats["recent_failures"].append({
"type": check_type,
"pipeline": pipeline,
"timestamp": result.timestamp,
"failures": result.failures
})
# Keep only last 100 failures
self.stats["recent_failures"] = self.stats["recent_failures"][-100:]
# Update by type
if check_type not in self.stats["by_type"]:
self.stats["by_type"][check_type] = {"passed": 0, "failed": 0}
if result.passed:
self.stats["by_type"][check_type]["passed"] += 1
else:
self.stats["by_type"][check_type]["failed"] += 1
# Update by pipeline
if pipeline not in self.stats["by_pipeline"]:
self.stats["by_pipeline"][pipeline] = {"passed": 0, "failed": 0}
if result.passed:
self.stats["by_pipeline"][pipeline]["passed"] += 1
else:
self.stats["by_pipeline"][pipeline]["failed"] += 1
self._save_stats()
# =========================================================================
# Content Quality Checks
# =========================================================================
def _check_not_empty(self, content: str, min_length: int = 1) -> Tuple[bool, str]:
"""Check content is not empty."""
if not content or len(content.strip()) < min_length:
return False, f"Content is empty or too short (min {min_length} chars)"
return True, ""
def _check_not_duplicate(self, content: str, content_type: str) -> Tuple[bool, str]:
"""Check content is not a duplicate."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check against known hashes
hash_file = QUALITY_DIR / f"{content_type}_hashes.json"
known_hashes = set()
if hash_file.exists():
try:
known_hashes = set(json.loads(hash_file.read_text()))
except Exception:
pass
if content_hash in known_hashes:
return False, f"Duplicate content detected (hash: {content_hash[:16]})"
# Add to known hashes
known_hashes.add(content_hash)
hash_file.write_text(json.dumps(list(known_hashes)))
return True, ""
def _check_not_toxic(self, content: str) -> Tuple[bool, str]:
"""Check content is not toxic or harmful."""
toxic_patterns = [
r"(?i)kill\s+(yourself|yourself|them)",
r"(?i)how\s+to\s+(make|build|create)\s+(bomb|weapon|poison)",
r"(?i)hate\s+(speech|group|people)",
r"(?i)illegal\s+(activity|drug|weapon)",
]
for pattern in toxic_patterns:
if re.search(pattern, content):
return False, f"Content matches toxic pattern: {pattern[:50]}"
return True, ""
def _check_soul_compliance(self, content: str) -> Tuple[bool, str]:
"""Check content complies with SOUL.md principles."""
if not self.soul_content:
return True, "" # Can't check if no SOUL loaded
violations = []
# Check for corporate dependency
if re.search(r"(?i)requires?\s+(permission|approval)\s+from\s+(google|openai|anthropic|meta)", content):
violations.append("Suggests corporate dependency")
# Check for dishonesty patterns
if re.search(r"(?i)i\s+(am|'m)\s+(100%|always|never)\s+(right|correct|certain)", content):
violations.append("Claims false certainty")
# Check for gatekeeping
if re.search(r"(?i)i\s+(won't|cannot|refuse\s+to)\s+(help|answer|explain)", content):
if not re.search(r"(?i)(harmful|dangerous|illegal)", content):
violations.append("Unnecessary gatekeeping")
if violations:
return False, f"SOUL.md violations: {'; '.join(violations)}"
return True, ""
# =========================================================================
# Training Pair Validation
# =========================================================================
def validate_training_pair(self, data: Dict[str, Any], pipeline: str = "training") -> QualityResult:
"""Validate a training pair."""
checks = []
failures = []
warnings = []
score = 1.0
# Check structure
if "prompt" not in data:
failures.append("Missing 'prompt' field")
score -= 0.5
if "response" not in data:
failures.append("Missing 'response' field")
score -= 0.5
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
prompt = data.get("prompt", "")
response = data.get("response", "")
# Check prompt not empty
ok, msg = self._check_not_empty(prompt, min_length=10)
if ok:
checks.append("prompt_not_empty")
else:
failures.append(f"Prompt: {msg}")
score -= 0.3
# Check response not empty
ok, msg = self._check_not_empty(response, min_length=20)
if ok:
checks.append("response_not_empty")
else:
failures.append(f"Response: {msg}")
score -= 0.3
# Check not duplicate
combined = f"{prompt}\n{response}"
ok, msg = self._check_not_duplicate(combined, "training_pair")
if ok:
checks.append("not_duplicate")
else:
warnings.append(msg)
score -= 0.1
# Check not toxic
ok, msg = self._check_not_toxic(response)
if ok:
checks.append("not_toxic")
else:
failures.append(msg)
score -= 0.5
# Check SOUL compliance
ok, msg = self._check_soul_compliance(response)
if ok:
checks.append("soul_compliant")
else:
failures.append(msg)
score -= 0.3
# Check response quality
if len(response) < 50:
warnings.append("Response is very short")
score -= 0.1
if response.count("\n") < 2 and len(response) > 200:
warnings.append("Response lacks formatting")
score -= 0.05
# Check voice consistency (if voice marker present)
voice = data.get("voice", "")
if voice and voice.lower() not in response.lower()[:100]:
warnings.append(f"Response may not match voice: {voice}")
score -= 0.1
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "training_pair", pipeline)
return result
# =========================================================================
# Knowledge File Validation
# =========================================================================
def validate_knowledge_file(self, data: Dict[str, Any], pipeline: str = "knowledge") -> QualityResult:
"""Validate a knowledge file."""
checks = []
failures = []
warnings = []
score = 1.0
required_fields = ["title", "content", "source", "category"]
# Check required fields
for field in required_fields:
if field not in data:
failures.append(f"Missing required field: {field}")
score -= 0.2
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
title = data.get("title", "")
content = data.get("content", "")
# Check title not empty
ok, msg = self._check_not_empty(title, min_length=5)
if ok:
checks.append("title_valid")
else:
failures.append(f"Title: {msg}")
score -= 0.2
# Check content not empty
ok, msg = self._check_not_empty(content, min_length=50)
if ok:
checks.append("content_valid")
else:
failures.append(f"Content: {msg}")
score -= 0.3
# Check not duplicate
ok, msg = self._check_not_duplicate(content, "knowledge_file")
if ok:
checks.append("not_duplicate")
else:
failures.append(msg)
score -= 0.4
# Check not toxic
ok, msg = self._check_not_toxic(content)
if ok:
checks.append("not_toxic")
else:
failures.append(msg)
score -= 0.5
# Check category valid
valid_categories = [
"technical", "conceptual", "procedural", "reference",
"tutorial", "troubleshooting", "architecture", "security"
]
category = data.get("category", "").lower()
if category in valid_categories:
checks.append("category_valid")
else:
warnings.append(f"Unknown category: {category}")
score -= 0.1
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "knowledge_file", pipeline)
return result
# =========================================================================
# Generated Asset Validation
# =========================================================================
def validate_generated_asset(self, file_path: str, metadata: Dict[str, Any] = None,
pipeline: str = "assets") -> QualityResult:
"""Validate a generated asset (image, video, etc.)."""
checks = []
failures = []
warnings = []
score = 1.0
path = Path(file_path)
# Check file exists
if not path.exists():
failures.append(f"File does not exist: {file_path}")
return QualityResult(False, 0.0, checks, failures, warnings)
checks.append("file_exists")
# Check file not empty
file_size = path.stat().st_size
if file_size == 0:
failures.append("File is empty")
score -= 0.5
elif file_size < 100:
warnings.append(f"File is very small: {file_size} bytes")
score -= 0.1
else:
checks.append("file_not_empty")
# Check file extension
valid_extensions = {
"image": [".png", ".jpg", ".jpeg", ".gif", ".webp"],
"video": [".mp4", ".webm", ".mov"],
"audio": [".mp3", ".wav", ".ogg"],
"document": [".md", ".txt", ".pdf"]
}
ext = path.suffix.lower()
is_valid_ext = any(ext in exts for exts in valid_extensions.values())
if is_valid_ext:
checks.append("valid_extension")
else:
warnings.append(f"Unknown extension: {ext}")
score -= 0.1
# Check metadata if provided
if metadata:
required_meta = ["generator", "prompt", "timestamp"]
for field in required_meta:
if field in metadata:
checks.append(f"metadata_{field}")
else:
warnings.append(f"Missing metadata: {field}")
score -= 0.05
# Check SOUL compliance in metadata prompt
if metadata and "prompt" in metadata:
ok, msg = self._check_soul_compliance(metadata["prompt"])
if ok:
checks.append("soul_compliant")
else:
failures.append(msg)
score -= 0.3
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "generated_asset", pipeline)
return result
# =========================================================================
# Adversary Output Validation
# =========================================================================
def validate_adversary_output(self, data: Dict[str, Any], pipeline: str = "adversary") -> QualityResult:
"""Validate an adversary output (should include reproduction steps)."""
checks = []
failures = []
warnings = []
score = 1.0
required_fields = ["vulnerability", "description", "reproduction_steps", "severity"]
# Check required fields
for field in required_fields:
if field not in data:
failures.append(f"Missing required field: {field}")
score -= 0.2
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
# Check reproduction steps
steps = data.get("reproduction_steps", [])
if not isinstance(steps, list) or len(steps) < 1:
failures.append("reproduction_steps must be a non-empty list")
score -= 0.3
else:
checks.append("reproduction_steps_valid")
# Check severity
valid_severities = ["critical", "high", "medium", "low", "info"]
severity = data.get("severity", "").lower()
if severity in valid_severities:
checks.append("severity_valid")
else:
failures.append(f"Invalid severity: {severity}")
score -= 0.2
# Check description not empty
description = data.get("description", "")
ok, msg = self._check_not_empty(description, min_length=50)
if ok:
checks.append("description_valid")
else:
failures.append(f"Description: {msg}")
score -= 0.2
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "adversary_output", pipeline)
return result
# =========================================================================
# Rejection and Re-queue
# =========================================================================
def reject_output(self, data: Any, result: QualityResult, output_type: str,
pipeline: str = "unknown") -> Path:
"""Reject an output and save for analysis."""
reject_file = REJECT_DIR / f"{output_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
reject_data = {
"type": output_type,
"pipeline": pipeline,
"timestamp": datetime.now(timezone.utc).isoformat(),
"quality_result": result.to_dict(),
"data": data if isinstance(data, (dict, list, str)) else str(data)
}
reject_file.write_text(json.dumps(reject_data, indent=2))
print(f"Rejected output saved to: {reject_file}")
print(f" Failures: {', '.join(result.failures)}")
return reject_file
# =========================================================================
# Reporting
# =========================================================================
def get_stats(self) -> Dict[str, Any]:
"""Get quality statistics."""
return self.stats
def generate_report(self) -> str:
"""Generate a quality report."""
lines = []
lines.append("# Quality Gate Report")
lines.append(f"**Generated:** {datetime.now(timezone.utc).isoformat()}")
lines.append("")
# Summary
total = self.stats["total_checks"]
passed = self.stats["passed"]
failed = self.stats["failed"]
pass_rate = (passed / total * 100) if total > 0 else 0
lines.append("## Summary")
lines.append(f"- Total Checks: {total}")
lines.append(f"- Passed: {passed} ({pass_rate:.1f}%)")
lines.append(f"- Failed: {failed}")
lines.append("")
# By Type
lines.append("## By Type")
for check_type, counts in self.stats.get("by_type", {}).items():
type_total = counts["passed"] + counts["failed"]
type_rate = (counts["passed"] / type_total * 100) if type_total > 0 else 0
lines.append(f"- **{check_type}**: {counts['passed']}/{type_total} ({type_rate:.1f}%)")
lines.append("")
# By Pipeline
lines.append("## By Pipeline")
for pipeline, counts in self.stats.get("by_pipeline", {}).items():
pipe_total = counts["passed"] + counts["failed"]
pipe_rate = (counts["passed"] / pipe_total * 100) if pipe_total > 0 else 0
lines.append(f"- **{pipeline}**: {counts['passed']}/{pipe_total} ({pipe_rate:.1f}%)")
lines.append("")
# Recent Failures
recent = self.stats.get("recent_failures", [])[-5:]
if recent:
lines.append("## Recent Failures")
for failure in recent:
lines.append(f"- [{failure['timestamp']}] {failure['type']} ({failure['pipeline']})")
for f in failure.get("failures", [])[:2]:
lines.append(f" - {f}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Quality Gate — Validate Pipeline Outputs")
subparsers = parser.add_subparsers(dest="command")
# Validate command
validate_parser = subparsers.add_parser("validate", help="Validate a pipeline output")
validate_parser.add_argument("--type", "-t", required=True,
choices=["training_pair", "knowledge_file", "generated_asset", "adversary_output"],
help="Type of output to validate")
validate_parser.add_argument("--input", "-i", required=True, help="Input file path")
validate_parser.add_argument("--pipeline", "-p", default="unknown", help="Pipeline name")
validate_parser.add_argument("--reject", action="store_true", help="Reject failed outputs")
# Stats command
subparsers.add_parser("stats", help="Show quality statistics")
# Report command
subparsers.add_parser("report", help="Generate quality report")
parsed = parser.parse_args()
if not parsed.command:
parser.print_help()
return 1
gate = QualityGate()
if parsed.command == "validate":
# Load input
input_path = Path(parsed.input)
if not input_path.exists():
print(f"Error: Input file not found: {parsed.input}")
return 1
try:
if parsed.type == "generated_asset":
# For assets, check file exists and optionally load metadata
metadata_file = input_path.with_suffix(".json")
metadata = None
if metadata_file.exists():
metadata = json.loads(metadata_file.read_text())
result = gate.validate_generated_asset(str(input_path), metadata, parsed.pipeline)
else:
data = json.loads(input_path.read_text())
if parsed.type == "training_pair":
result = gate.validate_training_pair(data, parsed.pipeline)
elif parsed.type == "knowledge_file":
result = gate.validate_knowledge_file(data, parsed.pipeline)
elif parsed.type == "adversary_output":
result = gate.validate_adversary_output(data, parsed.pipeline)
else:
print(f"Unknown type: {parsed.type}")
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
# Print result
print(f"Validation: {parsed.type}")
print(f"Result: {'PASS' if result.passed else 'FAIL'}")
print(f"Score: {result.score:.2f}")
if result.checks:
print(f"Checks passed: {', '.join(result.checks)}")
if result.failures:
print(f"Failures:")
for f in result.failures:
print(f" - {f}")
if result.warnings:
print(f"Warnings:")
for w in result.warnings:
print(f" - {w}")
# Reject if requested and failed
if not result.passed and parsed.reject:
gate.reject_output(data if parsed.type != "generated_asset" else str(input_path),
result, parsed.type, parsed.pipeline)
return 0 if result.passed else 1
elif parsed.command == "stats":
stats = gate.get_stats()
print(json.dumps(stats, indent=2))
return 0
elif parsed.command == "report":
report = gate.generate_report()
print(report)
return 0
return 1
if __name__ == "__main__":
sys.exit(main())