Compare commits
3 Commits
fix/680-py
...
feat/623-q
| Author | SHA1 | Date | |
|---|---|---|---|
| e06692b914 | |||
| 24906e15dd | |||
| e780ab0805 |
71
pipelines/README-quality-gate.md
Normal file
71
pipelines/README-quality-gate.md
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# Quality Gate
|
||||||
|
|
||||||
|
Validates all pipeline outputs before saving.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Validate a training pair
|
||||||
|
python3 quality-gate.py validate --type training_pair --input pair.json --pipeline training
|
||||||
|
|
||||||
|
# Validate a knowledge file
|
||||||
|
python3 quality-gate.py validate --type knowledge_file --input knowledge.json --pipeline knowledge
|
||||||
|
|
||||||
|
# Validate a generated asset
|
||||||
|
python3 quality-gate.py validate --type generated_asset --input image.png --pipeline assets
|
||||||
|
|
||||||
|
# Validate adversary output
|
||||||
|
python3 quality-gate.py validate --type adversary_output --input vuln.json --pipeline adversary
|
||||||
|
|
||||||
|
# View statistics
|
||||||
|
python3 quality-gate.py stats
|
||||||
|
|
||||||
|
# Generate report
|
||||||
|
python3 quality-gate.py report
|
||||||
|
```
|
||||||
|
|
||||||
|
## Checks Performed
|
||||||
|
|
||||||
|
### Training Pairs
|
||||||
|
- Prompt and response both non-empty
|
||||||
|
- Not duplicate content
|
||||||
|
- Not toxic/harmful
|
||||||
|
- SOUL.md compliance
|
||||||
|
- Response quality (length, formatting)
|
||||||
|
|
||||||
|
### Knowledge Files
|
||||||
|
- Required fields present (title, content, source, category)
|
||||||
|
- Not duplicate
|
||||||
|
- Not toxic
|
||||||
|
- Valid category
|
||||||
|
|
||||||
|
### Generated Assets
|
||||||
|
- File exists and not empty
|
||||||
|
- Valid file extension
|
||||||
|
- Metadata complete (generator, prompt, timestamp)
|
||||||
|
- SOUL.md compliance in prompt
|
||||||
|
|
||||||
|
### Adversary Outputs
|
||||||
|
- Required fields (vulnerability, description, reproduction_steps, severity)
|
||||||
|
- Reproduction steps as list
|
||||||
|
- Valid severity level
|
||||||
|
- Description not empty
|
||||||
|
|
||||||
|
## Integration
|
||||||
|
|
||||||
|
Add to pipeline orchestrator:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from pipelines.quality_gate import QualityGate
|
||||||
|
|
||||||
|
gate = QualityGate()
|
||||||
|
|
||||||
|
# After generating output
|
||||||
|
result = gate.validate_training_pair(data, pipeline="training")
|
||||||
|
|
||||||
|
if result.passed:
|
||||||
|
save_output(data)
|
||||||
|
else:
|
||||||
|
gate.reject_output(data, result, "training_pair", "training")
|
||||||
|
requeue_for_regeneration()
|
||||||
|
```
|
||||||
691
pipelines/quality-gate.py
Normal file
691
pipelines/quality-gate.py
Normal file
@@ -0,0 +1,691 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Quality Gate — Validate All Pipeline Outputs
|
||||||
|
|
||||||
|
Every pipeline output must pass quality checks before being saved.
|
||||||
|
Auto-rejects bad outputs, re-queues for regeneration.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 quality-gate.py validate --type training_pair --input file.json
|
||||||
|
python3 quality-gate.py validate --type knowledge_file --input file.json
|
||||||
|
python3 quality-gate.py validate --type generated_asset --input file.png
|
||||||
|
python3 quality-gate.py validate --type adversary_output --input file.json
|
||||||
|
python3 quality-gate.py stats --pipeline training
|
||||||
|
python3 quality-gate.py report
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
|
||||||
|
QUALITY_DIR = HERMES_HOME / "pipelines" / "quality"
|
||||||
|
STATS_FILE = QUALITY_DIR / "quality_stats.json"
|
||||||
|
REJECT_DIR = QUALITY_DIR / "rejected"
|
||||||
|
SOUL_FILE = Path(__file__).parent.parent / "SOUL.md"
|
||||||
|
|
||||||
|
# Ensure directories exist
|
||||||
|
QUALITY_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
REJECT_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
class QualityResult:
|
||||||
|
"""Result of a quality check."""
|
||||||
|
|
||||||
|
def __init__(self, passed: bool, score: float = 0.0, checks: List[str] = None,
|
||||||
|
failures: List[str] = None, warnings: List[str] = None):
|
||||||
|
self.passed = passed
|
||||||
|
self.score = score # 0.0 to 1.0
|
||||||
|
self.checks = checks or []
|
||||||
|
self.failures = failures or []
|
||||||
|
self.warnings = warnings or []
|
||||||
|
self.timestamp = datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"passed": self.passed,
|
||||||
|
"score": self.score,
|
||||||
|
"checks": self.checks,
|
||||||
|
"failures": self.failures,
|
||||||
|
"warnings": self.warnings,
|
||||||
|
"timestamp": self.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
status = "PASS" if self.passed else "FAIL"
|
||||||
|
return f"QualityResult({status}, score={self.score:.2f})"
|
||||||
|
|
||||||
|
|
||||||
|
class QualityGate:
|
||||||
|
"""Main quality gate class."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.soul_content = self._load_soul()
|
||||||
|
self.stats = self._load_stats()
|
||||||
|
|
||||||
|
def _load_soul(self) -> str:
|
||||||
|
"""Load SOUL.md content for compliance checks."""
|
||||||
|
try:
|
||||||
|
if SOUL_FILE.exists():
|
||||||
|
return SOUL_FILE.read_text()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return ""
|
||||||
|
|
||||||
|
def _load_stats(self) -> Dict[str, Any]:
|
||||||
|
"""Load quality statistics."""
|
||||||
|
try:
|
||||||
|
if STATS_FILE.exists():
|
||||||
|
return json.loads(STATS_FILE.read_text())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return {
|
||||||
|
"total_checks": 0,
|
||||||
|
"passed": 0,
|
||||||
|
"failed": 0,
|
||||||
|
"by_type": {},
|
||||||
|
"by_pipeline": {},
|
||||||
|
"recent_failures": []
|
||||||
|
}
|
||||||
|
|
||||||
|
def _save_stats(self):
|
||||||
|
"""Save quality statistics."""
|
||||||
|
STATS_FILE.write_text(json.dumps(self.stats, indent=2))
|
||||||
|
|
||||||
|
def _update_stats(self, result: QualityResult, check_type: str, pipeline: str = "unknown"):
|
||||||
|
"""Update statistics with check result."""
|
||||||
|
self.stats["total_checks"] += 1
|
||||||
|
|
||||||
|
if result.passed:
|
||||||
|
self.stats["passed"] += 1
|
||||||
|
else:
|
||||||
|
self.stats["failed"] += 1
|
||||||
|
self.stats["recent_failures"].append({
|
||||||
|
"type": check_type,
|
||||||
|
"pipeline": pipeline,
|
||||||
|
"timestamp": result.timestamp,
|
||||||
|
"failures": result.failures
|
||||||
|
})
|
||||||
|
# Keep only last 100 failures
|
||||||
|
self.stats["recent_failures"] = self.stats["recent_failures"][-100:]
|
||||||
|
|
||||||
|
# Update by type
|
||||||
|
if check_type not in self.stats["by_type"]:
|
||||||
|
self.stats["by_type"][check_type] = {"passed": 0, "failed": 0}
|
||||||
|
|
||||||
|
if result.passed:
|
||||||
|
self.stats["by_type"][check_type]["passed"] += 1
|
||||||
|
else:
|
||||||
|
self.stats["by_type"][check_type]["failed"] += 1
|
||||||
|
|
||||||
|
# Update by pipeline
|
||||||
|
if pipeline not in self.stats["by_pipeline"]:
|
||||||
|
self.stats["by_pipeline"][pipeline] = {"passed": 0, "failed": 0}
|
||||||
|
|
||||||
|
if result.passed:
|
||||||
|
self.stats["by_pipeline"][pipeline]["passed"] += 1
|
||||||
|
else:
|
||||||
|
self.stats["by_pipeline"][pipeline]["failed"] += 1
|
||||||
|
|
||||||
|
self._save_stats()
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Content Quality Checks
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def _check_not_empty(self, content: str, min_length: int = 1) -> Tuple[bool, str]:
|
||||||
|
"""Check content is not empty."""
|
||||||
|
if not content or len(content.strip()) < min_length:
|
||||||
|
return False, f"Content is empty or too short (min {min_length} chars)"
|
||||||
|
return True, ""
|
||||||
|
|
||||||
|
def _check_not_duplicate(self, content: str, content_type: str) -> Tuple[bool, str]:
|
||||||
|
"""Check content is not a duplicate."""
|
||||||
|
content_hash = hashlib.sha256(content.encode()).hexdigest()
|
||||||
|
|
||||||
|
# Check against known hashes
|
||||||
|
hash_file = QUALITY_DIR / f"{content_type}_hashes.json"
|
||||||
|
known_hashes = set()
|
||||||
|
|
||||||
|
if hash_file.exists():
|
||||||
|
try:
|
||||||
|
known_hashes = set(json.loads(hash_file.read_text()))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if content_hash in known_hashes:
|
||||||
|
return False, f"Duplicate content detected (hash: {content_hash[:16]})"
|
||||||
|
|
||||||
|
# Add to known hashes
|
||||||
|
known_hashes.add(content_hash)
|
||||||
|
hash_file.write_text(json.dumps(list(known_hashes)))
|
||||||
|
|
||||||
|
return True, ""
|
||||||
|
|
||||||
|
def _check_not_toxic(self, content: str) -> Tuple[bool, str]:
|
||||||
|
"""Check content is not toxic or harmful."""
|
||||||
|
toxic_patterns = [
|
||||||
|
r"(?i)kill\s+(yourself|yourself|them)",
|
||||||
|
r"(?i)how\s+to\s+(make|build|create)\s+(bomb|weapon|poison)",
|
||||||
|
r"(?i)hate\s+(speech|group|people)",
|
||||||
|
r"(?i)illegal\s+(activity|drug|weapon)",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in toxic_patterns:
|
||||||
|
if re.search(pattern, content):
|
||||||
|
return False, f"Content matches toxic pattern: {pattern[:50]}"
|
||||||
|
|
||||||
|
return True, ""
|
||||||
|
|
||||||
|
def _check_soul_compliance(self, content: str) -> Tuple[bool, str]:
|
||||||
|
"""Check content complies with SOUL.md principles."""
|
||||||
|
if not self.soul_content:
|
||||||
|
return True, "" # Can't check if no SOUL loaded
|
||||||
|
|
||||||
|
violations = []
|
||||||
|
|
||||||
|
# Check for corporate dependency
|
||||||
|
if re.search(r"(?i)requires?\s+(permission|approval)\s+from\s+(google|openai|anthropic|meta)", content):
|
||||||
|
violations.append("Suggests corporate dependency")
|
||||||
|
|
||||||
|
# Check for dishonesty patterns
|
||||||
|
if re.search(r"(?i)i\s+(am|'m)\s+(100%|always|never)\s+(right|correct|certain)", content):
|
||||||
|
violations.append("Claims false certainty")
|
||||||
|
|
||||||
|
# Check for gatekeeping
|
||||||
|
if re.search(r"(?i)i\s+(won't|cannot|refuse\s+to)\s+(help|answer|explain)", content):
|
||||||
|
if not re.search(r"(?i)(harmful|dangerous|illegal)", content):
|
||||||
|
violations.append("Unnecessary gatekeeping")
|
||||||
|
|
||||||
|
if violations:
|
||||||
|
return False, f"SOUL.md violations: {'; '.join(violations)}"
|
||||||
|
|
||||||
|
return True, ""
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Training Pair Validation
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def validate_training_pair(self, data: Dict[str, Any], pipeline: str = "training") -> QualityResult:
|
||||||
|
"""Validate a training pair."""
|
||||||
|
checks = []
|
||||||
|
failures = []
|
||||||
|
warnings = []
|
||||||
|
score = 1.0
|
||||||
|
|
||||||
|
# Check structure
|
||||||
|
if "prompt" not in data:
|
||||||
|
failures.append("Missing 'prompt' field")
|
||||||
|
score -= 0.5
|
||||||
|
if "response" not in data:
|
||||||
|
failures.append("Missing 'response' field")
|
||||||
|
score -= 0.5
|
||||||
|
|
||||||
|
if failures:
|
||||||
|
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||||
|
|
||||||
|
prompt = data.get("prompt", "")
|
||||||
|
response = data.get("response", "")
|
||||||
|
|
||||||
|
# Check prompt not empty
|
||||||
|
ok, msg = self._check_not_empty(prompt, min_length=10)
|
||||||
|
if ok:
|
||||||
|
checks.append("prompt_not_empty")
|
||||||
|
else:
|
||||||
|
failures.append(f"Prompt: {msg}")
|
||||||
|
score -= 0.3
|
||||||
|
|
||||||
|
# Check response not empty
|
||||||
|
ok, msg = self._check_not_empty(response, min_length=20)
|
||||||
|
if ok:
|
||||||
|
checks.append("response_not_empty")
|
||||||
|
else:
|
||||||
|
failures.append(f"Response: {msg}")
|
||||||
|
score -= 0.3
|
||||||
|
|
||||||
|
# Check not duplicate
|
||||||
|
combined = f"{prompt}\n{response}"
|
||||||
|
ok, msg = self._check_not_duplicate(combined, "training_pair")
|
||||||
|
if ok:
|
||||||
|
checks.append("not_duplicate")
|
||||||
|
else:
|
||||||
|
warnings.append(msg)
|
||||||
|
score -= 0.1
|
||||||
|
|
||||||
|
# Check not toxic
|
||||||
|
ok, msg = self._check_not_toxic(response)
|
||||||
|
if ok:
|
||||||
|
checks.append("not_toxic")
|
||||||
|
else:
|
||||||
|
failures.append(msg)
|
||||||
|
score -= 0.5
|
||||||
|
|
||||||
|
# Check SOUL compliance
|
||||||
|
ok, msg = self._check_soul_compliance(response)
|
||||||
|
if ok:
|
||||||
|
checks.append("soul_compliant")
|
||||||
|
else:
|
||||||
|
failures.append(msg)
|
||||||
|
score -= 0.3
|
||||||
|
|
||||||
|
# Check response quality
|
||||||
|
if len(response) < 50:
|
||||||
|
warnings.append("Response is very short")
|
||||||
|
score -= 0.1
|
||||||
|
|
||||||
|
if response.count("\n") < 2 and len(response) > 200:
|
||||||
|
warnings.append("Response lacks formatting")
|
||||||
|
score -= 0.05
|
||||||
|
|
||||||
|
# Check voice consistency (if voice marker present)
|
||||||
|
voice = data.get("voice", "")
|
||||||
|
if voice and voice.lower() not in response.lower()[:100]:
|
||||||
|
warnings.append(f"Response may not match voice: {voice}")
|
||||||
|
score -= 0.1
|
||||||
|
|
||||||
|
score = max(0.0, score)
|
||||||
|
passed = len(failures) == 0 and score >= 0.5
|
||||||
|
|
||||||
|
result = QualityResult(passed, score, checks, failures, warnings)
|
||||||
|
self._update_stats(result, "training_pair", pipeline)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Knowledge File Validation
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def validate_knowledge_file(self, data: Dict[str, Any], pipeline: str = "knowledge") -> QualityResult:
|
||||||
|
"""Validate a knowledge file."""
|
||||||
|
checks = []
|
||||||
|
failures = []
|
||||||
|
warnings = []
|
||||||
|
score = 1.0
|
||||||
|
|
||||||
|
required_fields = ["title", "content", "source", "category"]
|
||||||
|
|
||||||
|
# Check required fields
|
||||||
|
for field in required_fields:
|
||||||
|
if field not in data:
|
||||||
|
failures.append(f"Missing required field: {field}")
|
||||||
|
score -= 0.2
|
||||||
|
|
||||||
|
if failures:
|
||||||
|
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||||
|
|
||||||
|
title = data.get("title", "")
|
||||||
|
content = data.get("content", "")
|
||||||
|
|
||||||
|
# Check title not empty
|
||||||
|
ok, msg = self._check_not_empty(title, min_length=5)
|
||||||
|
if ok:
|
||||||
|
checks.append("title_valid")
|
||||||
|
else:
|
||||||
|
failures.append(f"Title: {msg}")
|
||||||
|
score -= 0.2
|
||||||
|
|
||||||
|
# Check content not empty
|
||||||
|
ok, msg = self._check_not_empty(content, min_length=50)
|
||||||
|
if ok:
|
||||||
|
checks.append("content_valid")
|
||||||
|
else:
|
||||||
|
failures.append(f"Content: {msg}")
|
||||||
|
score -= 0.3
|
||||||
|
|
||||||
|
# Check not duplicate
|
||||||
|
ok, msg = self._check_not_duplicate(content, "knowledge_file")
|
||||||
|
if ok:
|
||||||
|
checks.append("not_duplicate")
|
||||||
|
else:
|
||||||
|
failures.append(msg)
|
||||||
|
score -= 0.4
|
||||||
|
|
||||||
|
# Check not toxic
|
||||||
|
ok, msg = self._check_not_toxic(content)
|
||||||
|
if ok:
|
||||||
|
checks.append("not_toxic")
|
||||||
|
else:
|
||||||
|
failures.append(msg)
|
||||||
|
score -= 0.5
|
||||||
|
|
||||||
|
# Check category valid
|
||||||
|
valid_categories = [
|
||||||
|
"technical", "conceptual", "procedural", "reference",
|
||||||
|
"tutorial", "troubleshooting", "architecture", "security"
|
||||||
|
]
|
||||||
|
category = data.get("category", "").lower()
|
||||||
|
if category in valid_categories:
|
||||||
|
checks.append("category_valid")
|
||||||
|
else:
|
||||||
|
warnings.append(f"Unknown category: {category}")
|
||||||
|
score -= 0.1
|
||||||
|
|
||||||
|
score = max(0.0, score)
|
||||||
|
passed = len(failures) == 0 and score >= 0.5
|
||||||
|
|
||||||
|
result = QualityResult(passed, score, checks, failures, warnings)
|
||||||
|
self._update_stats(result, "knowledge_file", pipeline)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Generated Asset Validation
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def validate_generated_asset(self, file_path: str, metadata: Dict[str, Any] = None,
|
||||||
|
pipeline: str = "assets") -> QualityResult:
|
||||||
|
"""Validate a generated asset (image, video, etc.)."""
|
||||||
|
checks = []
|
||||||
|
failures = []
|
||||||
|
warnings = []
|
||||||
|
score = 1.0
|
||||||
|
|
||||||
|
path = Path(file_path)
|
||||||
|
|
||||||
|
# Check file exists
|
||||||
|
if not path.exists():
|
||||||
|
failures.append(f"File does not exist: {file_path}")
|
||||||
|
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||||
|
|
||||||
|
checks.append("file_exists")
|
||||||
|
|
||||||
|
# Check file not empty
|
||||||
|
file_size = path.stat().st_size
|
||||||
|
if file_size == 0:
|
||||||
|
failures.append("File is empty")
|
||||||
|
score -= 0.5
|
||||||
|
elif file_size < 100:
|
||||||
|
warnings.append(f"File is very small: {file_size} bytes")
|
||||||
|
score -= 0.1
|
||||||
|
else:
|
||||||
|
checks.append("file_not_empty")
|
||||||
|
|
||||||
|
# Check file extension
|
||||||
|
valid_extensions = {
|
||||||
|
"image": [".png", ".jpg", ".jpeg", ".gif", ".webp"],
|
||||||
|
"video": [".mp4", ".webm", ".mov"],
|
||||||
|
"audio": [".mp3", ".wav", ".ogg"],
|
||||||
|
"document": [".md", ".txt", ".pdf"]
|
||||||
|
}
|
||||||
|
|
||||||
|
ext = path.suffix.lower()
|
||||||
|
is_valid_ext = any(ext in exts for exts in valid_extensions.values())
|
||||||
|
|
||||||
|
if is_valid_ext:
|
||||||
|
checks.append("valid_extension")
|
||||||
|
else:
|
||||||
|
warnings.append(f"Unknown extension: {ext}")
|
||||||
|
score -= 0.1
|
||||||
|
|
||||||
|
# Check metadata if provided
|
||||||
|
if metadata:
|
||||||
|
required_meta = ["generator", "prompt", "timestamp"]
|
||||||
|
for field in required_meta:
|
||||||
|
if field in metadata:
|
||||||
|
checks.append(f"metadata_{field}")
|
||||||
|
else:
|
||||||
|
warnings.append(f"Missing metadata: {field}")
|
||||||
|
score -= 0.05
|
||||||
|
|
||||||
|
# Check SOUL compliance in metadata prompt
|
||||||
|
if metadata and "prompt" in metadata:
|
||||||
|
ok, msg = self._check_soul_compliance(metadata["prompt"])
|
||||||
|
if ok:
|
||||||
|
checks.append("soul_compliant")
|
||||||
|
else:
|
||||||
|
failures.append(msg)
|
||||||
|
score -= 0.3
|
||||||
|
|
||||||
|
score = max(0.0, score)
|
||||||
|
passed = len(failures) == 0 and score >= 0.5
|
||||||
|
|
||||||
|
result = QualityResult(passed, score, checks, failures, warnings)
|
||||||
|
self._update_stats(result, "generated_asset", pipeline)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Adversary Output Validation
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def validate_adversary_output(self, data: Dict[str, Any], pipeline: str = "adversary") -> QualityResult:
|
||||||
|
"""Validate an adversary output (should include reproduction steps)."""
|
||||||
|
checks = []
|
||||||
|
failures = []
|
||||||
|
warnings = []
|
||||||
|
score = 1.0
|
||||||
|
|
||||||
|
required_fields = ["vulnerability", "description", "reproduction_steps", "severity"]
|
||||||
|
|
||||||
|
# Check required fields
|
||||||
|
for field in required_fields:
|
||||||
|
if field not in data:
|
||||||
|
failures.append(f"Missing required field: {field}")
|
||||||
|
score -= 0.2
|
||||||
|
|
||||||
|
if failures:
|
||||||
|
return QualityResult(False, 0.0, checks, failures, warnings)
|
||||||
|
|
||||||
|
# Check reproduction steps
|
||||||
|
steps = data.get("reproduction_steps", [])
|
||||||
|
if not isinstance(steps, list) or len(steps) < 1:
|
||||||
|
failures.append("reproduction_steps must be a non-empty list")
|
||||||
|
score -= 0.3
|
||||||
|
else:
|
||||||
|
checks.append("reproduction_steps_valid")
|
||||||
|
|
||||||
|
# Check severity
|
||||||
|
valid_severities = ["critical", "high", "medium", "low", "info"]
|
||||||
|
severity = data.get("severity", "").lower()
|
||||||
|
if severity in valid_severities:
|
||||||
|
checks.append("severity_valid")
|
||||||
|
else:
|
||||||
|
failures.append(f"Invalid severity: {severity}")
|
||||||
|
score -= 0.2
|
||||||
|
|
||||||
|
# Check description not empty
|
||||||
|
description = data.get("description", "")
|
||||||
|
ok, msg = self._check_not_empty(description, min_length=50)
|
||||||
|
if ok:
|
||||||
|
checks.append("description_valid")
|
||||||
|
else:
|
||||||
|
failures.append(f"Description: {msg}")
|
||||||
|
score -= 0.2
|
||||||
|
|
||||||
|
score = max(0.0, score)
|
||||||
|
passed = len(failures) == 0 and score >= 0.5
|
||||||
|
|
||||||
|
result = QualityResult(passed, score, checks, failures, warnings)
|
||||||
|
self._update_stats(result, "adversary_output", pipeline)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Rejection and Re-queue
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def reject_output(self, data: Any, result: QualityResult, output_type: str,
|
||||||
|
pipeline: str = "unknown") -> Path:
|
||||||
|
"""Reject an output and save for analysis."""
|
||||||
|
reject_file = REJECT_DIR / f"{output_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||||
|
|
||||||
|
reject_data = {
|
||||||
|
"type": output_type,
|
||||||
|
"pipeline": pipeline,
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"quality_result": result.to_dict(),
|
||||||
|
"data": data if isinstance(data, (dict, list, str)) else str(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
reject_file.write_text(json.dumps(reject_data, indent=2))
|
||||||
|
|
||||||
|
print(f"Rejected output saved to: {reject_file}")
|
||||||
|
print(f" Failures: {', '.join(result.failures)}")
|
||||||
|
|
||||||
|
return reject_file
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# Reporting
|
||||||
|
# =========================================================================
|
||||||
|
|
||||||
|
def get_stats(self) -> Dict[str, Any]:
|
||||||
|
"""Get quality statistics."""
|
||||||
|
return self.stats
|
||||||
|
|
||||||
|
def generate_report(self) -> str:
|
||||||
|
"""Generate a quality report."""
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
lines.append("# Quality Gate Report")
|
||||||
|
lines.append(f"**Generated:** {datetime.now(timezone.utc).isoformat()}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Summary
|
||||||
|
total = self.stats["total_checks"]
|
||||||
|
passed = self.stats["passed"]
|
||||||
|
failed = self.stats["failed"]
|
||||||
|
pass_rate = (passed / total * 100) if total > 0 else 0
|
||||||
|
|
||||||
|
lines.append("## Summary")
|
||||||
|
lines.append(f"- Total Checks: {total}")
|
||||||
|
lines.append(f"- Passed: {passed} ({pass_rate:.1f}%)")
|
||||||
|
lines.append(f"- Failed: {failed}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# By Type
|
||||||
|
lines.append("## By Type")
|
||||||
|
for check_type, counts in self.stats.get("by_type", {}).items():
|
||||||
|
type_total = counts["passed"] + counts["failed"]
|
||||||
|
type_rate = (counts["passed"] / type_total * 100) if type_total > 0 else 0
|
||||||
|
lines.append(f"- **{check_type}**: {counts['passed']}/{type_total} ({type_rate:.1f}%)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# By Pipeline
|
||||||
|
lines.append("## By Pipeline")
|
||||||
|
for pipeline, counts in self.stats.get("by_pipeline", {}).items():
|
||||||
|
pipe_total = counts["passed"] + counts["failed"]
|
||||||
|
pipe_rate = (counts["passed"] / pipe_total * 100) if pipe_total > 0 else 0
|
||||||
|
lines.append(f"- **{pipeline}**: {counts['passed']}/{pipe_total} ({pipe_rate:.1f}%)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Recent Failures
|
||||||
|
recent = self.stats.get("recent_failures", [])[-5:]
|
||||||
|
if recent:
|
||||||
|
lines.append("## Recent Failures")
|
||||||
|
for failure in recent:
|
||||||
|
lines.append(f"- [{failure['timestamp']}] {failure['type']} ({failure['pipeline']})")
|
||||||
|
for f in failure.get("failures", [])[:2]:
|
||||||
|
lines.append(f" - {f}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Quality Gate — Validate Pipeline Outputs")
|
||||||
|
subparsers = parser.add_subparsers(dest="command")
|
||||||
|
|
||||||
|
# Validate command
|
||||||
|
validate_parser = subparsers.add_parser("validate", help="Validate a pipeline output")
|
||||||
|
validate_parser.add_argument("--type", "-t", required=True,
|
||||||
|
choices=["training_pair", "knowledge_file", "generated_asset", "adversary_output"],
|
||||||
|
help="Type of output to validate")
|
||||||
|
validate_parser.add_argument("--input", "-i", required=True, help="Input file path")
|
||||||
|
validate_parser.add_argument("--pipeline", "-p", default="unknown", help="Pipeline name")
|
||||||
|
validate_parser.add_argument("--reject", action="store_true", help="Reject failed outputs")
|
||||||
|
|
||||||
|
# Stats command
|
||||||
|
subparsers.add_parser("stats", help="Show quality statistics")
|
||||||
|
|
||||||
|
# Report command
|
||||||
|
subparsers.add_parser("report", help="Generate quality report")
|
||||||
|
|
||||||
|
parsed = parser.parse_args()
|
||||||
|
|
||||||
|
if not parsed.command:
|
||||||
|
parser.print_help()
|
||||||
|
return 1
|
||||||
|
|
||||||
|
gate = QualityGate()
|
||||||
|
|
||||||
|
if parsed.command == "validate":
|
||||||
|
# Load input
|
||||||
|
input_path = Path(parsed.input)
|
||||||
|
if not input_path.exists():
|
||||||
|
print(f"Error: Input file not found: {parsed.input}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
if parsed.type == "generated_asset":
|
||||||
|
# For assets, check file exists and optionally load metadata
|
||||||
|
metadata_file = input_path.with_suffix(".json")
|
||||||
|
metadata = None
|
||||||
|
if metadata_file.exists():
|
||||||
|
metadata = json.loads(metadata_file.read_text())
|
||||||
|
result = gate.validate_generated_asset(str(input_path), metadata, parsed.pipeline)
|
||||||
|
else:
|
||||||
|
data = json.loads(input_path.read_text())
|
||||||
|
|
||||||
|
if parsed.type == "training_pair":
|
||||||
|
result = gate.validate_training_pair(data, parsed.pipeline)
|
||||||
|
elif parsed.type == "knowledge_file":
|
||||||
|
result = gate.validate_knowledge_file(data, parsed.pipeline)
|
||||||
|
elif parsed.type == "adversary_output":
|
||||||
|
result = gate.validate_adversary_output(data, parsed.pipeline)
|
||||||
|
else:
|
||||||
|
print(f"Unknown type: {parsed.type}")
|
||||||
|
return 1
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"Error: Invalid JSON in input file: {e}")
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Print result
|
||||||
|
print(f"Validation: {parsed.type}")
|
||||||
|
print(f"Result: {'PASS' if result.passed else 'FAIL'}")
|
||||||
|
print(f"Score: {result.score:.2f}")
|
||||||
|
|
||||||
|
if result.checks:
|
||||||
|
print(f"Checks passed: {', '.join(result.checks)}")
|
||||||
|
|
||||||
|
if result.failures:
|
||||||
|
print(f"Failures:")
|
||||||
|
for f in result.failures:
|
||||||
|
print(f" - {f}")
|
||||||
|
|
||||||
|
if result.warnings:
|
||||||
|
print(f"Warnings:")
|
||||||
|
for w in result.warnings:
|
||||||
|
print(f" - {w}")
|
||||||
|
|
||||||
|
# Reject if requested and failed
|
||||||
|
if not result.passed and parsed.reject:
|
||||||
|
gate.reject_output(data if parsed.type != "generated_asset" else str(input_path),
|
||||||
|
result, parsed.type, parsed.pipeline)
|
||||||
|
|
||||||
|
return 0 if result.passed else 1
|
||||||
|
|
||||||
|
elif parsed.command == "stats":
|
||||||
|
stats = gate.get_stats()
|
||||||
|
print(json.dumps(stats, indent=2))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
elif parsed.command == "report":
|
||||||
|
report = gate.generate_report()
|
||||||
|
print(report)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
38
pipelines/quality-gate.yaml
Normal file
38
pipelines/quality-gate.yaml
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
# Quality Gate Configuration
|
||||||
|
# Pipelines/quality-gate.yaml
|
||||||
|
|
||||||
|
quality_thresholds:
|
||||||
|
training_pair:
|
||||||
|
min_score: 0.5
|
||||||
|
min_prompt_length: 10
|
||||||
|
min_response_length: 20
|
||||||
|
|
||||||
|
knowledge_file:
|
||||||
|
min_score: 0.5
|
||||||
|
min_title_length: 5
|
||||||
|
min_content_length: 50
|
||||||
|
|
||||||
|
generated_asset:
|
||||||
|
min_score: 0.5
|
||||||
|
min_file_size: 100 # bytes
|
||||||
|
|
||||||
|
adversary_output:
|
||||||
|
min_score: 0.5
|
||||||
|
min_description_length: 50
|
||||||
|
required_severities: [critical, high, medium, low, info]
|
||||||
|
|
||||||
|
rejection:
|
||||||
|
auto_reject: true
|
||||||
|
reject_dir: ~/.hermes/pipelines/quality/rejected
|
||||||
|
max_rejections_per_hour: 50
|
||||||
|
|
||||||
|
notifications:
|
||||||
|
on_failure: true
|
||||||
|
notify_pipeline: true
|
||||||
|
notify_telegram: false
|
||||||
|
|
||||||
|
soul_compliance:
|
||||||
|
enabled: true
|
||||||
|
check_corporate_dependency: true
|
||||||
|
check_false_certainty: true
|
||||||
|
check_gatekeeping: true
|
||||||
Reference in New Issue
Block a user