Compare commits

...

3 Commits

Author SHA1 Message Date
e06692b914 docs: Add quality gate README
Some checks failed
Smoke Test / smoke (pull_request) Failing after 16s
Architecture Lint / Linter Tests (pull_request) Successful in 20s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 14s
Validate Config / Shell Script Lint (pull_request) Failing after 34s
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 13s
Validate Config / Playbook Schema Validation (pull_request) Successful in 19s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m24s
Validate Config / Python Test Suite (pull_request) Has been skipped
PR Checklist / pr-checklist (pull_request) Failing after 3m0s
Architecture Lint / Lint Repository (pull_request) Failing after 18s
2026-04-14 22:32:41 +00:00
24906e15dd feat: Add quality gate configuration 2026-04-14 22:32:27 +00:00
e780ab0805 feat: Add quality gate for pipeline outputs
Closes #623. Validates all pipeline outputs with JSON schema, content quality, SOUL.md compliance checks.
2026-04-14 22:31:51 +00:00
3 changed files with 800 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
# Quality Gate
Validates all pipeline outputs before saving.
## Usage
```bash
# Validate a training pair
python3 quality-gate.py validate --type training_pair --input pair.json --pipeline training
# Validate a knowledge file
python3 quality-gate.py validate --type knowledge_file --input knowledge.json --pipeline knowledge
# Validate a generated asset
python3 quality-gate.py validate --type generated_asset --input image.png --pipeline assets
# Validate adversary output
python3 quality-gate.py validate --type adversary_output --input vuln.json --pipeline adversary
# View statistics
python3 quality-gate.py stats
# Generate report
python3 quality-gate.py report
```
## Checks Performed
### Training Pairs
- Prompt and response both non-empty
- Not duplicate content
- Not toxic/harmful
- SOUL.md compliance
- Response quality (length, formatting)
### Knowledge Files
- Required fields present (title, content, source, category)
- Not duplicate
- Not toxic
- Valid category
### Generated Assets
- File exists and not empty
- Valid file extension
- Metadata complete (generator, prompt, timestamp)
- SOUL.md compliance in prompt
### Adversary Outputs
- Required fields (vulnerability, description, reproduction_steps, severity)
- Reproduction steps as list
- Valid severity level
- Description not empty
## Integration
Add to pipeline orchestrator:
```python
from pipelines.quality_gate import QualityGate
gate = QualityGate()
# After generating output
result = gate.validate_training_pair(data, pipeline="training")
if result.passed:
save_output(data)
else:
gate.reject_output(data, result, "training_pair", "training")
requeue_for_regeneration()
```

691
pipelines/quality-gate.py Normal file
View File

@@ -0,0 +1,691 @@
#!/usr/bin/env python3
"""
Quality Gate — Validate All Pipeline Outputs
Every pipeline output must pass quality checks before being saved.
Auto-rejects bad outputs, re-queues for regeneration.
Usage:
python3 quality-gate.py validate --type training_pair --input file.json
python3 quality-gate.py validate --type knowledge_file --input file.json
python3 quality-gate.py validate --type generated_asset --input file.png
python3 quality-gate.py validate --type adversary_output --input file.json
python3 quality-gate.py stats --pipeline training
python3 quality-gate.py report
"""
import argparse
import hashlib
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
QUALITY_DIR = HERMES_HOME / "pipelines" / "quality"
STATS_FILE = QUALITY_DIR / "quality_stats.json"
REJECT_DIR = QUALITY_DIR / "rejected"
SOUL_FILE = Path(__file__).parent.parent / "SOUL.md"
# Ensure directories exist
QUALITY_DIR.mkdir(parents=True, exist_ok=True)
REJECT_DIR.mkdir(parents=True, exist_ok=True)
class QualityResult:
"""Result of a quality check."""
def __init__(self, passed: bool, score: float = 0.0, checks: List[str] = None,
failures: List[str] = None, warnings: List[str] = None):
self.passed = passed
self.score = score # 0.0 to 1.0
self.checks = checks or []
self.failures = failures or []
self.warnings = warnings or []
self.timestamp = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> Dict[str, Any]:
return {
"passed": self.passed,
"score": self.score,
"checks": self.checks,
"failures": self.failures,
"warnings": self.warnings,
"timestamp": self.timestamp
}
def __repr__(self):
status = "PASS" if self.passed else "FAIL"
return f"QualityResult({status}, score={self.score:.2f})"
class QualityGate:
"""Main quality gate class."""
def __init__(self):
self.soul_content = self._load_soul()
self.stats = self._load_stats()
def _load_soul(self) -> str:
"""Load SOUL.md content for compliance checks."""
try:
if SOUL_FILE.exists():
return SOUL_FILE.read_text()
except Exception:
pass
return ""
def _load_stats(self) -> Dict[str, Any]:
"""Load quality statistics."""
try:
if STATS_FILE.exists():
return json.loads(STATS_FILE.read_text())
except Exception:
pass
return {
"total_checks": 0,
"passed": 0,
"failed": 0,
"by_type": {},
"by_pipeline": {},
"recent_failures": []
}
def _save_stats(self):
"""Save quality statistics."""
STATS_FILE.write_text(json.dumps(self.stats, indent=2))
def _update_stats(self, result: QualityResult, check_type: str, pipeline: str = "unknown"):
"""Update statistics with check result."""
self.stats["total_checks"] += 1
if result.passed:
self.stats["passed"] += 1
else:
self.stats["failed"] += 1
self.stats["recent_failures"].append({
"type": check_type,
"pipeline": pipeline,
"timestamp": result.timestamp,
"failures": result.failures
})
# Keep only last 100 failures
self.stats["recent_failures"] = self.stats["recent_failures"][-100:]
# Update by type
if check_type not in self.stats["by_type"]:
self.stats["by_type"][check_type] = {"passed": 0, "failed": 0}
if result.passed:
self.stats["by_type"][check_type]["passed"] += 1
else:
self.stats["by_type"][check_type]["failed"] += 1
# Update by pipeline
if pipeline not in self.stats["by_pipeline"]:
self.stats["by_pipeline"][pipeline] = {"passed": 0, "failed": 0}
if result.passed:
self.stats["by_pipeline"][pipeline]["passed"] += 1
else:
self.stats["by_pipeline"][pipeline]["failed"] += 1
self._save_stats()
# =========================================================================
# Content Quality Checks
# =========================================================================
def _check_not_empty(self, content: str, min_length: int = 1) -> Tuple[bool, str]:
"""Check content is not empty."""
if not content or len(content.strip()) < min_length:
return False, f"Content is empty or too short (min {min_length} chars)"
return True, ""
def _check_not_duplicate(self, content: str, content_type: str) -> Tuple[bool, str]:
"""Check content is not a duplicate."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Check against known hashes
hash_file = QUALITY_DIR / f"{content_type}_hashes.json"
known_hashes = set()
if hash_file.exists():
try:
known_hashes = set(json.loads(hash_file.read_text()))
except Exception:
pass
if content_hash in known_hashes:
return False, f"Duplicate content detected (hash: {content_hash[:16]})"
# Add to known hashes
known_hashes.add(content_hash)
hash_file.write_text(json.dumps(list(known_hashes)))
return True, ""
def _check_not_toxic(self, content: str) -> Tuple[bool, str]:
"""Check content is not toxic or harmful."""
toxic_patterns = [
r"(?i)kill\s+(yourself|yourself|them)",
r"(?i)how\s+to\s+(make|build|create)\s+(bomb|weapon|poison)",
r"(?i)hate\s+(speech|group|people)",
r"(?i)illegal\s+(activity|drug|weapon)",
]
for pattern in toxic_patterns:
if re.search(pattern, content):
return False, f"Content matches toxic pattern: {pattern[:50]}"
return True, ""
def _check_soul_compliance(self, content: str) -> Tuple[bool, str]:
"""Check content complies with SOUL.md principles."""
if not self.soul_content:
return True, "" # Can't check if no SOUL loaded
violations = []
# Check for corporate dependency
if re.search(r"(?i)requires?\s+(permission|approval)\s+from\s+(google|openai|anthropic|meta)", content):
violations.append("Suggests corporate dependency")
# Check for dishonesty patterns
if re.search(r"(?i)i\s+(am|'m)\s+(100%|always|never)\s+(right|correct|certain)", content):
violations.append("Claims false certainty")
# Check for gatekeeping
if re.search(r"(?i)i\s+(won't|cannot|refuse\s+to)\s+(help|answer|explain)", content):
if not re.search(r"(?i)(harmful|dangerous|illegal)", content):
violations.append("Unnecessary gatekeeping")
if violations:
return False, f"SOUL.md violations: {'; '.join(violations)}"
return True, ""
# =========================================================================
# Training Pair Validation
# =========================================================================
def validate_training_pair(self, data: Dict[str, Any], pipeline: str = "training") -> QualityResult:
"""Validate a training pair."""
checks = []
failures = []
warnings = []
score = 1.0
# Check structure
if "prompt" not in data:
failures.append("Missing 'prompt' field")
score -= 0.5
if "response" not in data:
failures.append("Missing 'response' field")
score -= 0.5
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
prompt = data.get("prompt", "")
response = data.get("response", "")
# Check prompt not empty
ok, msg = self._check_not_empty(prompt, min_length=10)
if ok:
checks.append("prompt_not_empty")
else:
failures.append(f"Prompt: {msg}")
score -= 0.3
# Check response not empty
ok, msg = self._check_not_empty(response, min_length=20)
if ok:
checks.append("response_not_empty")
else:
failures.append(f"Response: {msg}")
score -= 0.3
# Check not duplicate
combined = f"{prompt}\n{response}"
ok, msg = self._check_not_duplicate(combined, "training_pair")
if ok:
checks.append("not_duplicate")
else:
warnings.append(msg)
score -= 0.1
# Check not toxic
ok, msg = self._check_not_toxic(response)
if ok:
checks.append("not_toxic")
else:
failures.append(msg)
score -= 0.5
# Check SOUL compliance
ok, msg = self._check_soul_compliance(response)
if ok:
checks.append("soul_compliant")
else:
failures.append(msg)
score -= 0.3
# Check response quality
if len(response) < 50:
warnings.append("Response is very short")
score -= 0.1
if response.count("\n") < 2 and len(response) > 200:
warnings.append("Response lacks formatting")
score -= 0.05
# Check voice consistency (if voice marker present)
voice = data.get("voice", "")
if voice and voice.lower() not in response.lower()[:100]:
warnings.append(f"Response may not match voice: {voice}")
score -= 0.1
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "training_pair", pipeline)
return result
# =========================================================================
# Knowledge File Validation
# =========================================================================
def validate_knowledge_file(self, data: Dict[str, Any], pipeline: str = "knowledge") -> QualityResult:
"""Validate a knowledge file."""
checks = []
failures = []
warnings = []
score = 1.0
required_fields = ["title", "content", "source", "category"]
# Check required fields
for field in required_fields:
if field not in data:
failures.append(f"Missing required field: {field}")
score -= 0.2
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
title = data.get("title", "")
content = data.get("content", "")
# Check title not empty
ok, msg = self._check_not_empty(title, min_length=5)
if ok:
checks.append("title_valid")
else:
failures.append(f"Title: {msg}")
score -= 0.2
# Check content not empty
ok, msg = self._check_not_empty(content, min_length=50)
if ok:
checks.append("content_valid")
else:
failures.append(f"Content: {msg}")
score -= 0.3
# Check not duplicate
ok, msg = self._check_not_duplicate(content, "knowledge_file")
if ok:
checks.append("not_duplicate")
else:
failures.append(msg)
score -= 0.4
# Check not toxic
ok, msg = self._check_not_toxic(content)
if ok:
checks.append("not_toxic")
else:
failures.append(msg)
score -= 0.5
# Check category valid
valid_categories = [
"technical", "conceptual", "procedural", "reference",
"tutorial", "troubleshooting", "architecture", "security"
]
category = data.get("category", "").lower()
if category in valid_categories:
checks.append("category_valid")
else:
warnings.append(f"Unknown category: {category}")
score -= 0.1
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "knowledge_file", pipeline)
return result
# =========================================================================
# Generated Asset Validation
# =========================================================================
def validate_generated_asset(self, file_path: str, metadata: Dict[str, Any] = None,
pipeline: str = "assets") -> QualityResult:
"""Validate a generated asset (image, video, etc.)."""
checks = []
failures = []
warnings = []
score = 1.0
path = Path(file_path)
# Check file exists
if not path.exists():
failures.append(f"File does not exist: {file_path}")
return QualityResult(False, 0.0, checks, failures, warnings)
checks.append("file_exists")
# Check file not empty
file_size = path.stat().st_size
if file_size == 0:
failures.append("File is empty")
score -= 0.5
elif file_size < 100:
warnings.append(f"File is very small: {file_size} bytes")
score -= 0.1
else:
checks.append("file_not_empty")
# Check file extension
valid_extensions = {
"image": [".png", ".jpg", ".jpeg", ".gif", ".webp"],
"video": [".mp4", ".webm", ".mov"],
"audio": [".mp3", ".wav", ".ogg"],
"document": [".md", ".txt", ".pdf"]
}
ext = path.suffix.lower()
is_valid_ext = any(ext in exts for exts in valid_extensions.values())
if is_valid_ext:
checks.append("valid_extension")
else:
warnings.append(f"Unknown extension: {ext}")
score -= 0.1
# Check metadata if provided
if metadata:
required_meta = ["generator", "prompt", "timestamp"]
for field in required_meta:
if field in metadata:
checks.append(f"metadata_{field}")
else:
warnings.append(f"Missing metadata: {field}")
score -= 0.05
# Check SOUL compliance in metadata prompt
if metadata and "prompt" in metadata:
ok, msg = self._check_soul_compliance(metadata["prompt"])
if ok:
checks.append("soul_compliant")
else:
failures.append(msg)
score -= 0.3
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "generated_asset", pipeline)
return result
# =========================================================================
# Adversary Output Validation
# =========================================================================
def validate_adversary_output(self, data: Dict[str, Any], pipeline: str = "adversary") -> QualityResult:
"""Validate an adversary output (should include reproduction steps)."""
checks = []
failures = []
warnings = []
score = 1.0
required_fields = ["vulnerability", "description", "reproduction_steps", "severity"]
# Check required fields
for field in required_fields:
if field not in data:
failures.append(f"Missing required field: {field}")
score -= 0.2
if failures:
return QualityResult(False, 0.0, checks, failures, warnings)
# Check reproduction steps
steps = data.get("reproduction_steps", [])
if not isinstance(steps, list) or len(steps) < 1:
failures.append("reproduction_steps must be a non-empty list")
score -= 0.3
else:
checks.append("reproduction_steps_valid")
# Check severity
valid_severities = ["critical", "high", "medium", "low", "info"]
severity = data.get("severity", "").lower()
if severity in valid_severities:
checks.append("severity_valid")
else:
failures.append(f"Invalid severity: {severity}")
score -= 0.2
# Check description not empty
description = data.get("description", "")
ok, msg = self._check_not_empty(description, min_length=50)
if ok:
checks.append("description_valid")
else:
failures.append(f"Description: {msg}")
score -= 0.2
score = max(0.0, score)
passed = len(failures) == 0 and score >= 0.5
result = QualityResult(passed, score, checks, failures, warnings)
self._update_stats(result, "adversary_output", pipeline)
return result
# =========================================================================
# Rejection and Re-queue
# =========================================================================
def reject_output(self, data: Any, result: QualityResult, output_type: str,
pipeline: str = "unknown") -> Path:
"""Reject an output and save for analysis."""
reject_file = REJECT_DIR / f"{output_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
reject_data = {
"type": output_type,
"pipeline": pipeline,
"timestamp": datetime.now(timezone.utc).isoformat(),
"quality_result": result.to_dict(),
"data": data if isinstance(data, (dict, list, str)) else str(data)
}
reject_file.write_text(json.dumps(reject_data, indent=2))
print(f"Rejected output saved to: {reject_file}")
print(f" Failures: {', '.join(result.failures)}")
return reject_file
# =========================================================================
# Reporting
# =========================================================================
def get_stats(self) -> Dict[str, Any]:
"""Get quality statistics."""
return self.stats
def generate_report(self) -> str:
"""Generate a quality report."""
lines = []
lines.append("# Quality Gate Report")
lines.append(f"**Generated:** {datetime.now(timezone.utc).isoformat()}")
lines.append("")
# Summary
total = self.stats["total_checks"]
passed = self.stats["passed"]
failed = self.stats["failed"]
pass_rate = (passed / total * 100) if total > 0 else 0
lines.append("## Summary")
lines.append(f"- Total Checks: {total}")
lines.append(f"- Passed: {passed} ({pass_rate:.1f}%)")
lines.append(f"- Failed: {failed}")
lines.append("")
# By Type
lines.append("## By Type")
for check_type, counts in self.stats.get("by_type", {}).items():
type_total = counts["passed"] + counts["failed"]
type_rate = (counts["passed"] / type_total * 100) if type_total > 0 else 0
lines.append(f"- **{check_type}**: {counts['passed']}/{type_total} ({type_rate:.1f}%)")
lines.append("")
# By Pipeline
lines.append("## By Pipeline")
for pipeline, counts in self.stats.get("by_pipeline", {}).items():
pipe_total = counts["passed"] + counts["failed"]
pipe_rate = (counts["passed"] / pipe_total * 100) if pipe_total > 0 else 0
lines.append(f"- **{pipeline}**: {counts['passed']}/{pipe_total} ({pipe_rate:.1f}%)")
lines.append("")
# Recent Failures
recent = self.stats.get("recent_failures", [])[-5:]
if recent:
lines.append("## Recent Failures")
for failure in recent:
lines.append(f"- [{failure['timestamp']}] {failure['type']} ({failure['pipeline']})")
for f in failure.get("failures", [])[:2]:
lines.append(f" - {f}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Quality Gate — Validate Pipeline Outputs")
subparsers = parser.add_subparsers(dest="command")
# Validate command
validate_parser = subparsers.add_parser("validate", help="Validate a pipeline output")
validate_parser.add_argument("--type", "-t", required=True,
choices=["training_pair", "knowledge_file", "generated_asset", "adversary_output"],
help="Type of output to validate")
validate_parser.add_argument("--input", "-i", required=True, help="Input file path")
validate_parser.add_argument("--pipeline", "-p", default="unknown", help="Pipeline name")
validate_parser.add_argument("--reject", action="store_true", help="Reject failed outputs")
# Stats command
subparsers.add_parser("stats", help="Show quality statistics")
# Report command
subparsers.add_parser("report", help="Generate quality report")
parsed = parser.parse_args()
if not parsed.command:
parser.print_help()
return 1
gate = QualityGate()
if parsed.command == "validate":
# Load input
input_path = Path(parsed.input)
if not input_path.exists():
print(f"Error: Input file not found: {parsed.input}")
return 1
try:
if parsed.type == "generated_asset":
# For assets, check file exists and optionally load metadata
metadata_file = input_path.with_suffix(".json")
metadata = None
if metadata_file.exists():
metadata = json.loads(metadata_file.read_text())
result = gate.validate_generated_asset(str(input_path), metadata, parsed.pipeline)
else:
data = json.loads(input_path.read_text())
if parsed.type == "training_pair":
result = gate.validate_training_pair(data, parsed.pipeline)
elif parsed.type == "knowledge_file":
result = gate.validate_knowledge_file(data, parsed.pipeline)
elif parsed.type == "adversary_output":
result = gate.validate_adversary_output(data, parsed.pipeline)
else:
print(f"Unknown type: {parsed.type}")
return 1
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in input file: {e}")
return 1
except Exception as e:
print(f"Error: {e}")
return 1
# Print result
print(f"Validation: {parsed.type}")
print(f"Result: {'PASS' if result.passed else 'FAIL'}")
print(f"Score: {result.score:.2f}")
if result.checks:
print(f"Checks passed: {', '.join(result.checks)}")
if result.failures:
print(f"Failures:")
for f in result.failures:
print(f" - {f}")
if result.warnings:
print(f"Warnings:")
for w in result.warnings:
print(f" - {w}")
# Reject if requested and failed
if not result.passed and parsed.reject:
gate.reject_output(data if parsed.type != "generated_asset" else str(input_path),
result, parsed.type, parsed.pipeline)
return 0 if result.passed else 1
elif parsed.command == "stats":
stats = gate.get_stats()
print(json.dumps(stats, indent=2))
return 0
elif parsed.command == "report":
report = gate.generate_report()
print(report)
return 0
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,38 @@
# Quality Gate Configuration
# Pipelines/quality-gate.yaml
quality_thresholds:
training_pair:
min_score: 0.5
min_prompt_length: 10
min_response_length: 20
knowledge_file:
min_score: 0.5
min_title_length: 5
min_content_length: 50
generated_asset:
min_score: 0.5
min_file_size: 100 # bytes
adversary_output:
min_score: 0.5
min_description_length: 50
required_severities: [critical, high, medium, low, info]
rejection:
auto_reject: true
reject_dir: ~/.hermes/pipelines/quality/rejected
max_rejections_per_hour: 50
notifications:
on_failure: true
notify_pipeline: true
notify_telegram: false
soul_compliance:
enabled: true
check_corporate_dependency: true
check_false_certainty: true
check_gatekeeping: true