Compare commits
3 Commits
fix/680-py
...
feat/623-q
| Author | SHA1 | Date | |
|---|---|---|---|
| e06692b914 | |||
| 24906e15dd | |||
| e780ab0805 |
71
pipelines/README-quality-gate.md
Normal file
71
pipelines/README-quality-gate.md
Normal file
@@ -0,0 +1,71 @@
|
||||
# Quality Gate
|
||||
|
||||
Validates all pipeline outputs before saving.
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Validate a training pair
|
||||
python3 quality-gate.py validate --type training_pair --input pair.json --pipeline training
|
||||
|
||||
# Validate a knowledge file
|
||||
python3 quality-gate.py validate --type knowledge_file --input knowledge.json --pipeline knowledge
|
||||
|
||||
# Validate a generated asset
|
||||
python3 quality-gate.py validate --type generated_asset --input image.png --pipeline assets
|
||||
|
||||
# Validate adversary output
|
||||
python3 quality-gate.py validate --type adversary_output --input vuln.json --pipeline adversary
|
||||
|
||||
# View statistics
|
||||
python3 quality-gate.py stats
|
||||
|
||||
# Generate report
|
||||
python3 quality-gate.py report
|
||||
```
|
||||
|
||||
## Checks Performed
|
||||
|
||||
### Training Pairs
|
||||
- Prompt and response both non-empty
|
||||
- Not duplicate content
|
||||
- Not toxic/harmful
|
||||
- SOUL.md compliance
|
||||
- Response quality (length, formatting)
|
||||
|
||||
### Knowledge Files
|
||||
- Required fields present (title, content, source, category)
|
||||
- Not duplicate
|
||||
- Not toxic
|
||||
- Valid category
|
||||
|
||||
### Generated Assets
|
||||
- File exists and not empty
|
||||
- Valid file extension
|
||||
- Metadata complete (generator, prompt, timestamp)
|
||||
- SOUL.md compliance in prompt
|
||||
|
||||
### Adversary Outputs
|
||||
- Required fields (vulnerability, description, reproduction_steps, severity)
|
||||
- Reproduction steps as list
|
||||
- Valid severity level
|
||||
- Description not empty
|
||||
|
||||
## Integration
|
||||
|
||||
Add to pipeline orchestrator:
|
||||
|
||||
```python
|
||||
from pipelines.quality_gate import QualityGate
|
||||
|
||||
gate = QualityGate()
|
||||
|
||||
# After generating output
|
||||
result = gate.validate_training_pair(data, pipeline="training")
|
||||
|
||||
if result.passed:
|
||||
save_output(data)
|
||||
else:
|
||||
gate.reject_output(data, result, "training_pair", "training")
|
||||
requeue_for_regeneration()
|
||||
```
|
||||
691
pipelines/quality-gate.py
Normal file
691
pipelines/quality-gate.py
Normal file
@@ -0,0 +1,691 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Quality Gate — Validate All Pipeline Outputs
|
||||
|
||||
Every pipeline output must pass quality checks before being saved.
|
||||
Auto-rejects bad outputs, re-queues for regeneration.
|
||||
|
||||
Usage:
|
||||
python3 quality-gate.py validate --type training_pair --input file.json
|
||||
python3 quality-gate.py validate --type knowledge_file --input file.json
|
||||
python3 quality-gate.py validate --type generated_asset --input file.png
|
||||
python3 quality-gate.py validate --type adversary_output --input file.json
|
||||
python3 quality-gate.py stats --pipeline training
|
||||
python3 quality-gate.py report
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
# Configuration
|
||||
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
|
||||
QUALITY_DIR = HERMES_HOME / "pipelines" / "quality"
|
||||
STATS_FILE = QUALITY_DIR / "quality_stats.json"
|
||||
REJECT_DIR = QUALITY_DIR / "rejected"
|
||||
SOUL_FILE = Path(__file__).parent.parent / "SOUL.md"
|
||||
|
||||
# Ensure directories exist
|
||||
QUALITY_DIR.mkdir(parents=True, exist_ok=True)
|
||||
REJECT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
class QualityResult:
|
||||
"""Result of a quality check."""
|
||||
|
||||
def __init__(self, passed: bool, score: float = 0.0, checks: List[str] = None,
|
||||
failures: List[str] = None, warnings: List[str] = None):
|
||||
self.passed = passed
|
||||
self.score = score # 0.0 to 1.0
|
||||
self.checks = checks or []
|
||||
self.failures = failures or []
|
||||
self.warnings = warnings or []
|
||||
self.timestamp = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"passed": self.passed,
|
||||
"score": self.score,
|
||||
"checks": self.checks,
|
||||
"failures": self.failures,
|
||||
"warnings": self.warnings,
|
||||
"timestamp": self.timestamp
|
||||
}
|
||||
|
||||
def __repr__(self):
|
||||
status = "PASS" if self.passed else "FAIL"
|
||||
return f"QualityResult({status}, score={self.score:.2f})"
|
||||
|
||||
|
||||
class QualityGate:
|
||||
"""Main quality gate class."""
|
||||
|
||||
def __init__(self):
|
||||
self.soul_content = self._load_soul()
|
||||
self.stats = self._load_stats()
|
||||
|
||||
def _load_soul(self) -> str:
|
||||
"""Load SOUL.md content for compliance checks."""
|
||||
try:
|
||||
if SOUL_FILE.exists():
|
||||
return SOUL_FILE.read_text()
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
|
||||
def _load_stats(self) -> Dict[str, Any]:
|
||||
"""Load quality statistics."""
|
||||
try:
|
||||
if STATS_FILE.exists():
|
||||
return json.loads(STATS_FILE.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
return {
|
||||
"total_checks": 0,
|
||||
"passed": 0,
|
||||
"failed": 0,
|
||||
"by_type": {},
|
||||
"by_pipeline": {},
|
||||
"recent_failures": []
|
||||
}
|
||||
|
||||
def _save_stats(self):
|
||||
"""Save quality statistics."""
|
||||
STATS_FILE.write_text(json.dumps(self.stats, indent=2))
|
||||
|
||||
def _update_stats(self, result: QualityResult, check_type: str, pipeline: str = "unknown"):
|
||||
"""Update statistics with check result."""
|
||||
self.stats["total_checks"] += 1
|
||||
|
||||
if result.passed:
|
||||
self.stats["passed"] += 1
|
||||
else:
|
||||
self.stats["failed"] += 1
|
||||
self.stats["recent_failures"].append({
|
||||
"type": check_type,
|
||||
"pipeline": pipeline,
|
||||
"timestamp": result.timestamp,
|
||||
"failures": result.failures
|
||||
})
|
||||
# Keep only last 100 failures
|
||||
self.stats["recent_failures"] = self.stats["recent_failures"][-100:]
|
||||
|
||||
# Update by type
|
||||
if check_type not in self.stats["by_type"]:
|
||||
self.stats["by_type"][check_type] = {"passed": 0, "failed": 0}
|
||||
|
||||
if result.passed:
|
||||
self.stats["by_type"][check_type]["passed"] += 1
|
||||
else:
|
||||
self.stats["by_type"][check_type]["failed"] += 1
|
||||
|
||||
# Update by pipeline
|
||||
if pipeline not in self.stats["by_pipeline"]:
|
||||
self.stats["by_pipeline"][pipeline] = {"passed": 0, "failed": 0}
|
||||
|
||||
if result.passed:
|
||||
self.stats["by_pipeline"][pipeline]["passed"] += 1
|
||||
else:
|
||||
self.stats["by_pipeline"][pipeline]["failed"] += 1
|
||||
|
||||
self._save_stats()
|
||||
|
||||
# =========================================================================
|
||||
# Content Quality Checks
|
||||
# =========================================================================
|
||||
|
||||
def _check_not_empty(self, content: str, min_length: int = 1) -> Tuple[bool, str]:
|
||||
"""Check content is not empty."""
|
||||
if not content or len(content.strip()) < min_length:
|
||||
return False, f"Content is empty or too short (min {min_length} chars)"
|
||||
return True, ""
|
||||
|
||||
def _check_not_duplicate(self, content: str, content_type: str) -> Tuple[bool, str]:
|
||||
"""Check content is not a duplicate."""
|
||||
content_hash = hashlib.sha256(content.encode()).hexdigest()
|
||||
|
||||
# Check against known hashes
|
||||
hash_file = QUALITY_DIR / f"{content_type}_hashes.json"
|
||||
known_hashes = set()
|
||||
|
||||
if hash_file.exists():
|
||||
try:
|
||||
known_hashes = set(json.loads(hash_file.read_text()))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if content_hash in known_hashes:
|
||||
return False, f"Duplicate content detected (hash: {content_hash[:16]})"
|
||||
|
||||
# Add to known hashes
|
||||
known_hashes.add(content_hash)
|
||||
hash_file.write_text(json.dumps(list(known_hashes)))
|
||||
|
||||
return True, ""
|
||||
|
||||
def _check_not_toxic(self, content: str) -> Tuple[bool, str]:
|
||||
"""Check content is not toxic or harmful."""
|
||||
toxic_patterns = [
|
||||
r"(?i)kill\s+(yourself|yourself|them)",
|
||||
r"(?i)how\s+to\s+(make|build|create)\s+(bomb|weapon|poison)",
|
||||
r"(?i)hate\s+(speech|group|people)",
|
||||
r"(?i)illegal\s+(activity|drug|weapon)",
|
||||
]
|
||||
|
||||
for pattern in toxic_patterns:
|
||||
if re.search(pattern, content):
|
||||
return False, f"Content matches toxic pattern: {pattern[:50]}"
|
||||
|
||||
return True, ""
|
||||
|
||||
def _check_soul_compliance(self, content: str) -> Tuple[bool, str]:
|
||||
"""Check content complies with SOUL.md principles."""
|
||||
if not self.soul_content:
|
||||
return True, "" # Can't check if no SOUL loaded
|
||||
|
||||
violations = []
|
||||
|
||||
# Check for corporate dependency
|
||||
if re.search(r"(?i)requires?\s+(permission|approval)\s+from\s+(google|openai|anthropic|meta)", content):
|
||||
violations.append("Suggests corporate dependency")
|
||||
|
||||
# Check for dishonesty patterns
|
||||
if re.search(r"(?i)i\s+(am|'m)\s+(100%|always|never)\s+(right|correct|certain)", content):
|
||||
violations.append("Claims false certainty")
|
||||
|
||||
# Check for gatekeeping
|
||||
if re.search(r"(?i)i\s+(won't|cannot|refuse\s+to)\s+(help|answer|explain)", content):
|
||||
if not re.search(r"(?i)(harmful|dangerous|illegal)", content):
|
||||
violations.append("Unnecessary gatekeeping")
|
||||
|
||||
if violations:
|
||||
return False, f"SOUL.md violations: {'; '.join(violations)}"
|
||||
|
||||
return True, ""
|
||||
|
||||
# =========================================================================
|
||||
# Training Pair Validation
|
||||
# =========================================================================
|
||||
|
||||
def validate_training_pair(self, data: Dict[str, Any], pipeline: str = "training") -> QualityResult:
|
||||
"""Validate a training pair."""
|
||||
checks = []
|
||||
failures = []
|
||||
warnings = []
|
||||
score = 1.0
|
||||
|
||||
# Check structure
|
||||
if "prompt" not in data:
|
||||
failures.append("Missing 'prompt' field")
|
||||
score -= 0.5
|
||||
if "response" not in data:
|
||||
failures.append("Missing 'response' field")
|
||||
score -= 0.5
|
||||
|
||||
if failures:
|
||||
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||
|
||||
prompt = data.get("prompt", "")
|
||||
response = data.get("response", "")
|
||||
|
||||
# Check prompt not empty
|
||||
ok, msg = self._check_not_empty(prompt, min_length=10)
|
||||
if ok:
|
||||
checks.append("prompt_not_empty")
|
||||
else:
|
||||
failures.append(f"Prompt: {msg}")
|
||||
score -= 0.3
|
||||
|
||||
# Check response not empty
|
||||
ok, msg = self._check_not_empty(response, min_length=20)
|
||||
if ok:
|
||||
checks.append("response_not_empty")
|
||||
else:
|
||||
failures.append(f"Response: {msg}")
|
||||
score -= 0.3
|
||||
|
||||
# Check not duplicate
|
||||
combined = f"{prompt}\n{response}"
|
||||
ok, msg = self._check_not_duplicate(combined, "training_pair")
|
||||
if ok:
|
||||
checks.append("not_duplicate")
|
||||
else:
|
||||
warnings.append(msg)
|
||||
score -= 0.1
|
||||
|
||||
# Check not toxic
|
||||
ok, msg = self._check_not_toxic(response)
|
||||
if ok:
|
||||
checks.append("not_toxic")
|
||||
else:
|
||||
failures.append(msg)
|
||||
score -= 0.5
|
||||
|
||||
# Check SOUL compliance
|
||||
ok, msg = self._check_soul_compliance(response)
|
||||
if ok:
|
||||
checks.append("soul_compliant")
|
||||
else:
|
||||
failures.append(msg)
|
||||
score -= 0.3
|
||||
|
||||
# Check response quality
|
||||
if len(response) < 50:
|
||||
warnings.append("Response is very short")
|
||||
score -= 0.1
|
||||
|
||||
if response.count("\n") < 2 and len(response) > 200:
|
||||
warnings.append("Response lacks formatting")
|
||||
score -= 0.05
|
||||
|
||||
# Check voice consistency (if voice marker present)
|
||||
voice = data.get("voice", "")
|
||||
if voice and voice.lower() not in response.lower()[:100]:
|
||||
warnings.append(f"Response may not match voice: {voice}")
|
||||
score -= 0.1
|
||||
|
||||
score = max(0.0, score)
|
||||
passed = len(failures) == 0 and score >= 0.5
|
||||
|
||||
result = QualityResult(passed, score, checks, failures, warnings)
|
||||
self._update_stats(result, "training_pair", pipeline)
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Knowledge File Validation
|
||||
# =========================================================================
|
||||
|
||||
def validate_knowledge_file(self, data: Dict[str, Any], pipeline: str = "knowledge") -> QualityResult:
|
||||
"""Validate a knowledge file."""
|
||||
checks = []
|
||||
failures = []
|
||||
warnings = []
|
||||
score = 1.0
|
||||
|
||||
required_fields = ["title", "content", "source", "category"]
|
||||
|
||||
# Check required fields
|
||||
for field in required_fields:
|
||||
if field not in data:
|
||||
failures.append(f"Missing required field: {field}")
|
||||
score -= 0.2
|
||||
|
||||
if failures:
|
||||
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||
|
||||
title = data.get("title", "")
|
||||
content = data.get("content", "")
|
||||
|
||||
# Check title not empty
|
||||
ok, msg = self._check_not_empty(title, min_length=5)
|
||||
if ok:
|
||||
checks.append("title_valid")
|
||||
else:
|
||||
failures.append(f"Title: {msg}")
|
||||
score -= 0.2
|
||||
|
||||
# Check content not empty
|
||||
ok, msg = self._check_not_empty(content, min_length=50)
|
||||
if ok:
|
||||
checks.append("content_valid")
|
||||
else:
|
||||
failures.append(f"Content: {msg}")
|
||||
score -= 0.3
|
||||
|
||||
# Check not duplicate
|
||||
ok, msg = self._check_not_duplicate(content, "knowledge_file")
|
||||
if ok:
|
||||
checks.append("not_duplicate")
|
||||
else:
|
||||
failures.append(msg)
|
||||
score -= 0.4
|
||||
|
||||
# Check not toxic
|
||||
ok, msg = self._check_not_toxic(content)
|
||||
if ok:
|
||||
checks.append("not_toxic")
|
||||
else:
|
||||
failures.append(msg)
|
||||
score -= 0.5
|
||||
|
||||
# Check category valid
|
||||
valid_categories = [
|
||||
"technical", "conceptual", "procedural", "reference",
|
||||
"tutorial", "troubleshooting", "architecture", "security"
|
||||
]
|
||||
category = data.get("category", "").lower()
|
||||
if category in valid_categories:
|
||||
checks.append("category_valid")
|
||||
else:
|
||||
warnings.append(f"Unknown category: {category}")
|
||||
score -= 0.1
|
||||
|
||||
score = max(0.0, score)
|
||||
passed = len(failures) == 0 and score >= 0.5
|
||||
|
||||
result = QualityResult(passed, score, checks, failures, warnings)
|
||||
self._update_stats(result, "knowledge_file", pipeline)
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Generated Asset Validation
|
||||
# =========================================================================
|
||||
|
||||
def validate_generated_asset(self, file_path: str, metadata: Dict[str, Any] = None,
|
||||
pipeline: str = "assets") -> QualityResult:
|
||||
"""Validate a generated asset (image, video, etc.)."""
|
||||
checks = []
|
||||
failures = []
|
||||
warnings = []
|
||||
score = 1.0
|
||||
|
||||
path = Path(file_path)
|
||||
|
||||
# Check file exists
|
||||
if not path.exists():
|
||||
failures.append(f"File does not exist: {file_path}")
|
||||
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||
|
||||
checks.append("file_exists")
|
||||
|
||||
# Check file not empty
|
||||
file_size = path.stat().st_size
|
||||
if file_size == 0:
|
||||
failures.append("File is empty")
|
||||
score -= 0.5
|
||||
elif file_size < 100:
|
||||
warnings.append(f"File is very small: {file_size} bytes")
|
||||
score -= 0.1
|
||||
else:
|
||||
checks.append("file_not_empty")
|
||||
|
||||
# Check file extension
|
||||
valid_extensions = {
|
||||
"image": [".png", ".jpg", ".jpeg", ".gif", ".webp"],
|
||||
"video": [".mp4", ".webm", ".mov"],
|
||||
"audio": [".mp3", ".wav", ".ogg"],
|
||||
"document": [".md", ".txt", ".pdf"]
|
||||
}
|
||||
|
||||
ext = path.suffix.lower()
|
||||
is_valid_ext = any(ext in exts for exts in valid_extensions.values())
|
||||
|
||||
if is_valid_ext:
|
||||
checks.append("valid_extension")
|
||||
else:
|
||||
warnings.append(f"Unknown extension: {ext}")
|
||||
score -= 0.1
|
||||
|
||||
# Check metadata if provided
|
||||
if metadata:
|
||||
required_meta = ["generator", "prompt", "timestamp"]
|
||||
for field in required_meta:
|
||||
if field in metadata:
|
||||
checks.append(f"metadata_{field}")
|
||||
else:
|
||||
warnings.append(f"Missing metadata: {field}")
|
||||
score -= 0.05
|
||||
|
||||
# Check SOUL compliance in metadata prompt
|
||||
if metadata and "prompt" in metadata:
|
||||
ok, msg = self._check_soul_compliance(metadata["prompt"])
|
||||
if ok:
|
||||
checks.append("soul_compliant")
|
||||
else:
|
||||
failures.append(msg)
|
||||
score -= 0.3
|
||||
|
||||
score = max(0.0, score)
|
||||
passed = len(failures) == 0 and score >= 0.5
|
||||
|
||||
result = QualityResult(passed, score, checks, failures, warnings)
|
||||
self._update_stats(result, "generated_asset", pipeline)
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Adversary Output Validation
|
||||
# =========================================================================
|
||||
|
||||
def validate_adversary_output(self, data: Dict[str, Any], pipeline: str = "adversary") -> QualityResult:
|
||||
"""Validate an adversary output (should include reproduction steps)."""
|
||||
checks = []
|
||||
failures = []
|
||||
warnings = []
|
||||
score = 1.0
|
||||
|
||||
required_fields = ["vulnerability", "description", "reproduction_steps", "severity"]
|
||||
|
||||
# Check required fields
|
||||
for field in required_fields:
|
||||
if field not in data:
|
||||
failures.append(f"Missing required field: {field}")
|
||||
score -= 0.2
|
||||
|
||||
if failures:
|
||||
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||
|
||||
# Check reproduction steps
|
||||
steps = data.get("reproduction_steps", [])
|
||||
if not isinstance(steps, list) or len(steps) < 1:
|
||||
failures.append("reproduction_steps must be a non-empty list")
|
||||
score -= 0.3
|
||||
else:
|
||||
checks.append("reproduction_steps_valid")
|
||||
|
||||
# Check severity
|
||||
valid_severities = ["critical", "high", "medium", "low", "info"]
|
||||
severity = data.get("severity", "").lower()
|
||||
if severity in valid_severities:
|
||||
checks.append("severity_valid")
|
||||
else:
|
||||
failures.append(f"Invalid severity: {severity}")
|
||||
score -= 0.2
|
||||
|
||||
# Check description not empty
|
||||
description = data.get("description", "")
|
||||
ok, msg = self._check_not_empty(description, min_length=50)
|
||||
if ok:
|
||||
checks.append("description_valid")
|
||||
else:
|
||||
failures.append(f"Description: {msg}")
|
||||
score -= 0.2
|
||||
|
||||
score = max(0.0, score)
|
||||
passed = len(failures) == 0 and score >= 0.5
|
||||
|
||||
result = QualityResult(passed, score, checks, failures, warnings)
|
||||
self._update_stats(result, "adversary_output", pipeline)
|
||||
|
||||
return result
|
||||
|
||||
# =========================================================================
|
||||
# Rejection and Re-queue
|
||||
# =========================================================================
|
||||
|
||||
def reject_output(self, data: Any, result: QualityResult, output_type: str,
|
||||
pipeline: str = "unknown") -> Path:
|
||||
"""Reject an output and save for analysis."""
|
||||
reject_file = REJECT_DIR / f"{output_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
|
||||
reject_data = {
|
||||
"type": output_type,
|
||||
"pipeline": pipeline,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"quality_result": result.to_dict(),
|
||||
"data": data if isinstance(data, (dict, list, str)) else str(data)
|
||||
}
|
||||
|
||||
reject_file.write_text(json.dumps(reject_data, indent=2))
|
||||
|
||||
print(f"Rejected output saved to: {reject_file}")
|
||||
print(f" Failures: {', '.join(result.failures)}")
|
||||
|
||||
return reject_file
|
||||
|
||||
# =========================================================================
|
||||
# Reporting
|
||||
# =========================================================================
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get quality statistics."""
|
||||
return self.stats
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""Generate a quality report."""
|
||||
lines = []
|
||||
|
||||
lines.append("# Quality Gate Report")
|
||||
lines.append(f"**Generated:** {datetime.now(timezone.utc).isoformat()}")
|
||||
lines.append("")
|
||||
|
||||
# Summary
|
||||
total = self.stats["total_checks"]
|
||||
passed = self.stats["passed"]
|
||||
failed = self.stats["failed"]
|
||||
pass_rate = (passed / total * 100) if total > 0 else 0
|
||||
|
||||
lines.append("## Summary")
|
||||
lines.append(f"- Total Checks: {total}")
|
||||
lines.append(f"- Passed: {passed} ({pass_rate:.1f}%)")
|
||||
lines.append(f"- Failed: {failed}")
|
||||
lines.append("")
|
||||
|
||||
# By Type
|
||||
lines.append("## By Type")
|
||||
for check_type, counts in self.stats.get("by_type", {}).items():
|
||||
type_total = counts["passed"] + counts["failed"]
|
||||
type_rate = (counts["passed"] / type_total * 100) if type_total > 0 else 0
|
||||
lines.append(f"- **{check_type}**: {counts['passed']}/{type_total} ({type_rate:.1f}%)")
|
||||
lines.append("")
|
||||
|
||||
# By Pipeline
|
||||
lines.append("## By Pipeline")
|
||||
for pipeline, counts in self.stats.get("by_pipeline", {}).items():
|
||||
pipe_total = counts["passed"] + counts["failed"]
|
||||
pipe_rate = (counts["passed"] / pipe_total * 100) if pipe_total > 0 else 0
|
||||
lines.append(f"- **{pipeline}**: {counts['passed']}/{pipe_total} ({pipe_rate:.1f}%)")
|
||||
lines.append("")
|
||||
|
||||
# Recent Failures
|
||||
recent = self.stats.get("recent_failures", [])[-5:]
|
||||
if recent:
|
||||
lines.append("## Recent Failures")
|
||||
for failure in recent:
|
||||
lines.append(f"- [{failure['timestamp']}] {failure['type']} ({failure['pipeline']})")
|
||||
for f in failure.get("failures", [])[:2]:
|
||||
lines.append(f" - {f}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Quality Gate — Validate Pipeline Outputs")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# Validate command
|
||||
validate_parser = subparsers.add_parser("validate", help="Validate a pipeline output")
|
||||
validate_parser.add_argument("--type", "-t", required=True,
|
||||
choices=["training_pair", "knowledge_file", "generated_asset", "adversary_output"],
|
||||
help="Type of output to validate")
|
||||
validate_parser.add_argument("--input", "-i", required=True, help="Input file path")
|
||||
validate_parser.add_argument("--pipeline", "-p", default="unknown", help="Pipeline name")
|
||||
validate_parser.add_argument("--reject", action="store_true", help="Reject failed outputs")
|
||||
|
||||
# Stats command
|
||||
subparsers.add_parser("stats", help="Show quality statistics")
|
||||
|
||||
# Report command
|
||||
subparsers.add_parser("report", help="Generate quality report")
|
||||
|
||||
parsed = parser.parse_args()
|
||||
|
||||
if not parsed.command:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
gate = QualityGate()
|
||||
|
||||
if parsed.command == "validate":
|
||||
# Load input
|
||||
input_path = Path(parsed.input)
|
||||
if not input_path.exists():
|
||||
print(f"Error: Input file not found: {parsed.input}")
|
||||
return 1
|
||||
|
||||
try:
|
||||
if parsed.type == "generated_asset":
|
||||
# For assets, check file exists and optionally load metadata
|
||||
metadata_file = input_path.with_suffix(".json")
|
||||
metadata = None
|
||||
if metadata_file.exists():
|
||||
metadata = json.loads(metadata_file.read_text())
|
||||
result = gate.validate_generated_asset(str(input_path), metadata, parsed.pipeline)
|
||||
else:
|
||||
data = json.loads(input_path.read_text())
|
||||
|
||||
if parsed.type == "training_pair":
|
||||
result = gate.validate_training_pair(data, parsed.pipeline)
|
||||
elif parsed.type == "knowledge_file":
|
||||
result = gate.validate_knowledge_file(data, parsed.pipeline)
|
||||
elif parsed.type == "adversary_output":
|
||||
result = gate.validate_adversary_output(data, parsed.pipeline)
|
||||
else:
|
||||
print(f"Unknown type: {parsed.type}")
|
||||
return 1
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error: Invalid JSON in input file: {e}")
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
return 1
|
||||
|
||||
# Print result
|
||||
print(f"Validation: {parsed.type}")
|
||||
print(f"Result: {'PASS' if result.passed else 'FAIL'}")
|
||||
print(f"Score: {result.score:.2f}")
|
||||
|
||||
if result.checks:
|
||||
print(f"Checks passed: {', '.join(result.checks)}")
|
||||
|
||||
if result.failures:
|
||||
print(f"Failures:")
|
||||
for f in result.failures:
|
||||
print(f" - {f}")
|
||||
|
||||
if result.warnings:
|
||||
print(f"Warnings:")
|
||||
for w in result.warnings:
|
||||
print(f" - {w}")
|
||||
|
||||
# Reject if requested and failed
|
||||
if not result.passed and parsed.reject:
|
||||
gate.reject_output(data if parsed.type != "generated_asset" else str(input_path),
|
||||
result, parsed.type, parsed.pipeline)
|
||||
|
||||
return 0 if result.passed else 1
|
||||
|
||||
elif parsed.command == "stats":
|
||||
stats = gate.get_stats()
|
||||
print(json.dumps(stats, indent=2))
|
||||
return 0
|
||||
|
||||
elif parsed.command == "report":
|
||||
report = gate.generate_report()
|
||||
print(report)
|
||||
return 0
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
38
pipelines/quality-gate.yaml
Normal file
38
pipelines/quality-gate.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
# Quality Gate Configuration
|
||||
# Pipelines/quality-gate.yaml
|
||||
|
||||
quality_thresholds:
|
||||
training_pair:
|
||||
min_score: 0.5
|
||||
min_prompt_length: 10
|
||||
min_response_length: 20
|
||||
|
||||
knowledge_file:
|
||||
min_score: 0.5
|
||||
min_title_length: 5
|
||||
min_content_length: 50
|
||||
|
||||
generated_asset:
|
||||
min_score: 0.5
|
||||
min_file_size: 100 # bytes
|
||||
|
||||
adversary_output:
|
||||
min_score: 0.5
|
||||
min_description_length: 50
|
||||
required_severities: [critical, high, medium, low, info]
|
||||
|
||||
rejection:
|
||||
auto_reject: true
|
||||
reject_dir: ~/.hermes/pipelines/quality/rejected
|
||||
max_rejections_per_hour: 50
|
||||
|
||||
notifications:
|
||||
on_failure: true
|
||||
notify_pipeline: true
|
||||
notify_telegram: false
|
||||
|
||||
soul_compliance:
|
||||
enabled: true
|
||||
check_corporate_dependency: true
|
||||
check_false_certainty: true
|
||||
check_gatekeeping: true
|
||||
Reference in New Issue
Block a user