diff --git a/quality_gate.py b/quality_gate.py new file mode 100644 index 0000000..24f220c --- /dev/null +++ b/quality_gate.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +""" +quality_gate.py — Score and filter knowledge entries. + +Scores each entry on 4 dimensions: +- Specificity: concrete examples vs vague generalities +- Actionability: can this be used to do something? +- Freshness: is this still accurate? +- Source quality: was the model/provider reliable? + +Usage: + from quality_gate import score_entry, filter_entries, quality_report + + score = score_entry(entry) + filtered = filter_entries(entries, threshold=0.5) + report = quality_report(entries) +""" + +import json +import math +import re +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Any, Optional + +# Source quality scores (higher = more reliable) +SOURCE_QUALITY = { + "claude-sonnet": 0.9, + "claude-opus": 0.95, + "gpt-4": 0.85, + "gpt-4-turbo": 0.85, + "gpt-5": 0.9, + "mimo-v2-pro": 0.8, + "gemini-pro": 0.8, + "llama-3-70b": 0.75, + "llama-3-8b": 0.7, + "ollama": 0.6, + "unknown": 0.5, +} + +DEFAULT_SOURCE_QUALITY = 0.5 + +# Specificity indicators +SPECIFIC_INDICATORS = [ + r"\b\d+\.\d+", # decimal numbers + r"\b\d{4}-\d{2}-\d{2}", # dates + r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns + r"`[^`]+`", # code/commands + r"https?://", # URLs + r"\b(example|instance|specifically|concretely)\b", + r"\b(step \d|first|second|third)\b", + r"\b(exactly|precisely|measured|counted)\b", +] + +# Vagueness indicators (penalty) +VAGUE_INDICATORS = [ + r"\b(generally|usually|often|sometimes|might|could|perhaps)\b", + r"\b(various|several|many|some|few)\b", + r"\b(it depends|varies|differs)\b", + r"\b(basically|essentially|fundamentally)\b", + r"\b(everyone knows|it's obvious|clearly)\b", +] + +# Actionability indicators +ACTIONABLE_INDICATORS = [ + r"\b(run|execute|install|deploy|configure|set up)\b", + r"\b(use|apply|implement|create|build)\b", + r"\b(check|verify|test|validate|confirm)\b", + r"\b(fix|resolve|solve|debug|troubleshoot)\b", + r"\b(if .+ then|when .+ do|to .+ use)\b", + r"```[a-z]*\n", # code blocks + r"\$\s", # shell commands + r"\b\d+\.\s", # numbered steps +] + + +def score_specificity(content: str) -> float: + """Score specificity: 0=vague, 1=very specific.""" + content_lower = content.lower() + score = 0.5 # baseline + + # Check for specific indicators + specific_count = sum( + len(re.findall(p, content, re.IGNORECASE)) + for p in SPECIFIC_INDICATORS + ) + + # Check for vague indicators + vague_count = sum( + len(re.findall(p, content_lower)) + for p in VAGUE_INDICATORS + ) + + # Adjust score + score += min(specific_count * 0.05, 0.4) + score -= min(vague_count * 0.08, 0.3) + + # Length bonus (longer = more detail, up to a point) + word_count = len(content.split()) + if word_count > 50: + score += min((word_count - 50) * 0.001, 0.1) + + return max(0.0, min(1.0, score)) + + +def score_actionability(content: str) -> float: + """Score actionability: 0=abstract, 1=highly actionable.""" + content_lower = content.lower() + score = 0.3 # baseline (most knowledge is informational) + + # Check for actionable indicators + actionable_count = sum( + len(re.findall(p, content_lower)) + for p in ACTIONABLE_INDICATORS + ) + + score += min(actionable_count * 0.1, 0.6) + + # Code blocks are highly actionable + if "```" in content: + score += 0.2 + + # Numbered steps are actionable + if re.search(r"\d+\.\s+\w", content): + score += 0.1 + + return max(0.0, min(1.0, score)) + + +def score_freshness(timestamp: Optional[str]) -> float: + """Score freshness: 1=new, decays over time.""" + if not timestamp: + return 0.5 + + try: + if isinstance(timestamp, str): + ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) + else: + ts = timestamp + + now = datetime.now(timezone.utc) + age_days = (now - ts).days + + # Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days + score = math.exp(-age_days / 180) + return max(0.1, min(1.0, score)) + except (ValueError, TypeError): + return 0.5 + + +def score_source_quality(model: Optional[str]) -> float: + """Score source quality based on model/provider.""" + if not model: + return DEFAULT_SOURCE_QUALITY + + # Normalize model name + model_lower = model.lower() + for key, score in SOURCE_QUALITY.items(): + if key in model_lower: + return score + + return DEFAULT_SOURCE_QUALITY + + +def score_entry(entry: dict) -> float: + """ + Score a knowledge entry on quality (0.0-1.0). + + Weights: + - specificity: 0.3 + - actionability: 0.3 + - freshness: 0.2 + - source_quality: 0.2 + """ + content = entry.get("content", entry.get("text", entry.get("response", ""))) + model = entry.get("model", entry.get("provenance", {}).get("model")) + timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp")) + + specificity = score_specificity(content) + actionability = score_actionability(content) + freshness = score_freshness(timestamp) + source = score_source_quality(model) + + return round( + 0.3 * specificity + + 0.3 * actionability + + 0.2 * freshness + + 0.2 * source, + 4 + ) + + +def score_entry_detailed(entry: dict) -> dict: + """Score with breakdown.""" + content = entry.get("content", entry.get("text", entry.get("response", ""))) + model = entry.get("model", entry.get("provenance", {}).get("model")) + timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp")) + + specificity = score_specificity(content) + actionability = score_actionability(content) + freshness = score_freshness(timestamp) + source = score_source_quality(model) + + return { + "score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4), + "specificity": round(specificity, 4), + "actionability": round(actionability, 4), + "freshness": round(freshness, 4), + "source_quality": round(source, 4), + } + + +def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]: + """Filter entries below quality threshold.""" + filtered = [] + for entry in entries: + if score_entry(entry) >= threshold: + filtered.append(entry) + return filtered + + +def quality_report(entries: List[dict]) -> str: + """Generate quality distribution report.""" + if not entries: + return "No entries to analyze." + + scores = [score_entry(e) for e in entries] + + avg = sum(scores) / len(scores) + min_score = min(scores) + max_score = max(scores) + + # Distribution buckets + buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0} + for s in scores: + if s >= 0.7: + buckets["high"] += 1 + elif s >= 0.5: + buckets["medium"] += 1 + elif s >= 0.3: + buckets["low"] += 1 + else: + buckets["rejected"] += 1 + + lines = [ + "=" * 50, + " QUALITY GATE REPORT", + "=" * 50, + f" Total entries: {len(entries)}", + f" Average score: {avg:.3f}", + f" Min: {min_score:.3f}", + f" Max: {max_score:.3f}", + "", + " Distribution:", + ] + + for bucket, count in buckets.items(): + pct = count / len(entries) * 100 + bar = "█" * int(pct / 5) + lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}") + + passed = buckets["high"] + buckets["medium"] + lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)") + lines.append("=" * 50) + + return "\n".join(lines) + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Knowledge quality gate") + parser.add_argument("files", nargs="+", help="JSONL files to score") + parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold") + parser.add_argument("--json", action="store_true", help="JSON output") + parser.add_argument("--filter", action="store_true", help="Filter and write back") + args = parser.parse_args() + + all_entries = [] + for filepath in args.files: + with open(filepath) as f: + for line in f: + if line.strip(): + all_entries.append(json.loads(line)) + + if args.json: + results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries] + print(json.dumps(results, indent=2)) + elif args.filter: + filtered = filter_entries(all_entries, args.threshold) + print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})") + else: + print(quality_report(all_entries)) + + +if __name__ == "__main__": + main() diff --git a/tests/test_quality_gate.py b/tests/test_quality_gate.py new file mode 100644 index 0000000..089fe68 --- /dev/null +++ b/tests/test_quality_gate.py @@ -0,0 +1,108 @@ +""" +Tests for quality_gate.py — Knowledge entry quality scoring. +""" + +import unittest +from datetime import datetime, timezone, timedelta + +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from quality_gate import ( + score_specificity, + score_actionability, + score_freshness, + score_source_quality, + score_entry, + filter_entries, +) + + +class TestScoreSpecificity(unittest.TestCase): + def test_specific_content_scores_high(self): + content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx." + score = score_specificity(content) + self.assertGreater(score, 0.6) + + def test_vague_content_scores_low(self): + content = "It generally depends. Various factors might affect this. Basically, it varies." + score = score_specificity(content) + self.assertLess(score, 0.5) + + def test_empty_scores_baseline(self): + score = score_specificity("") + self.assertAlmostEqual(score, 0.5, delta=0.1) + + +class TestScoreActionability(unittest.TestCase): + def test_actionable_content_scores_high(self): + content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`" + score = score_actionability(content) + self.assertGreater(score, 0.6) + + def test_abstract_content_scores_low(self): + content = "The concept of intelligence is fascinating and multifaceted." + score = score_actionability(content) + self.assertLess(score, 0.5) + + +class TestScoreFreshness(unittest.TestCase): + def test_recent_timestamp_scores_high(self): + recent = datetime.now(timezone.utc).isoformat() + score = score_freshness(recent) + self.assertGreater(score, 0.9) + + def test_old_timestamp_scores_low(self): + old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat() + score = score_freshness(old) + self.assertLess(score, 0.2) + + def test_none_returns_baseline(self): + score = score_freshness(None) + self.assertEqual(score, 0.5) + + +class TestScoreSourceQuality(unittest.TestCase): + def test_claude_scores_high(self): + self.assertGreater(score_source_quality("claude-sonnet"), 0.85) + + def test_ollama_scores_lower(self): + self.assertLess(score_source_quality("ollama"), 0.7) + + def test_unknown_returns_default(self): + self.assertEqual(score_source_quality("unknown"), 0.5) + + +class TestScoreEntry(unittest.TestCase): + def test_good_entry_scores_high(self): + entry = { + "content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.", + "model": "claude-sonnet", + "timestamp": datetime.now(timezone.utc).isoformat(), + } + score = score_entry(entry) + self.assertGreater(score, 0.6) + + def test_poor_entry_scores_low(self): + entry = { + "content": "It depends. Various things might happen.", + "model": "unknown", + } + score = score_entry(entry) + self.assertLess(score, 0.5) + + +class TestFilterEntries(unittest.TestCase): + def test_filters_low_quality(self): + entries = [ + {"content": "Run `deploy.py` to fix the issue.", "model": "claude"}, + {"content": "It might work sometimes.", "model": "unknown"}, + {"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"}, + ] + filtered = filter_entries(entries, threshold=0.5) + self.assertGreaterEqual(len(filtered), 2) + + +if __name__ == "__main__": + unittest.main()