#!/usr/bin/env python3 """ quality_gate.py — Score and filter knowledge entries. Scores each entry on 4 dimensions: - Specificity: concrete examples vs vague generalities - Actionability: can this be used to do something? - Freshness: is this still accurate? - Source quality: was the model/provider reliable? Usage: from quality_gate import score_entry, filter_entries, quality_report score = score_entry(entry) filtered = filter_entries(entries, threshold=0.5) report = quality_report(entries) """ import json import math import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional # Source quality scores (higher = more reliable) SOURCE_QUALITY = { "claude-sonnet": 0.9, "claude-opus": 0.95, "gpt-4": 0.85, "gpt-4-turbo": 0.85, "gpt-5": 0.9, "mimo-v2-pro": 0.8, "gemini-pro": 0.8, "llama-3-70b": 0.75, "llama-3-8b": 0.7, "ollama": 0.6, "unknown": 0.5, } DEFAULT_SOURCE_QUALITY = 0.5 # Specificity indicators SPECIFIC_INDICATORS = [ r"\b\d+\.\d+", # decimal numbers r"\b\d{4}-\d{2}-\d{2}", # dates r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns r"`[^`]+`", # code/commands r"https?://", # URLs r"\b(example|instance|specifically|concretely)\b", r"\b(step \d|first|second|third)\b", r"\b(exactly|precisely|measured|counted)\b", ] # Vagueness indicators (penalty) VAGUE_INDICATORS = [ r"\b(generally|usually|often|sometimes|might|could|perhaps)\b", r"\b(various|several|many|some|few)\b", r"\b(it depends|varies|differs)\b", r"\b(basically|essentially|fundamentally)\b", r"\b(everyone knows|it's obvious|clearly)\b", ] # Actionability indicators ACTIONABLE_INDICATORS = [ r"\b(run|execute|install|deploy|configure|set up)\b", r"\b(use|apply|implement|create|build)\b", r"\b(check|verify|test|validate|confirm)\b", r"\b(fix|resolve|solve|debug|troubleshoot)\b", r"\b(if .+ then|when .+ do|to .+ use)\b", r"```[a-z]*\n", # code blocks r"\$\s", # shell commands r"\b\d+\.\s", # numbered steps ] def score_specificity(content: str) -> float: """Score specificity: 0=vague, 1=very specific.""" content_lower = content.lower() score = 0.5 # baseline # Check for specific indicators specific_count = sum( len(re.findall(p, content, re.IGNORECASE)) for p in SPECIFIC_INDICATORS ) # Check for vague indicators vague_count = sum( len(re.findall(p, content_lower)) for p in VAGUE_INDICATORS ) # Adjust score score += min(specific_count * 0.05, 0.4) score -= min(vague_count * 0.08, 0.3) # Length bonus (longer = more detail, up to a point) word_count = len(content.split()) if word_count > 50: score += min((word_count - 50) * 0.001, 0.1) return max(0.0, min(1.0, score)) def score_actionability(content: str) -> float: """Score actionability: 0=abstract, 1=highly actionable.""" content_lower = content.lower() score = 0.3 # baseline (most knowledge is informational) # Check for actionable indicators actionable_count = sum( len(re.findall(p, content_lower)) for p in ACTIONABLE_INDICATORS ) score += min(actionable_count * 0.1, 0.6) # Code blocks are highly actionable if "```" in content: score += 0.2 # Numbered steps are actionable if re.search(r"\d+\.\s+\w", content): score += 0.1 return max(0.0, min(1.0, score)) def score_freshness(timestamp: Optional[str]) -> float: """Score freshness: 1=new, decays over time.""" if not timestamp: return 0.5 try: if isinstance(timestamp, str): ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) else: ts = timestamp now = datetime.now(timezone.utc) age_days = (now - ts).days # Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days score = math.exp(-age_days / 180) return max(0.1, min(1.0, score)) except (ValueError, TypeError): return 0.5 def score_source_quality(model: Optional[str]) -> float: """Score source quality based on model/provider.""" if not model: return DEFAULT_SOURCE_QUALITY # Normalize model name model_lower = model.lower() for key, score in SOURCE_QUALITY.items(): if key in model_lower: return score return DEFAULT_SOURCE_QUALITY def score_entry(entry: dict) -> float: """ Score a knowledge entry on quality (0.0-1.0). Weights: - specificity: 0.3 - actionability: 0.3 - freshness: 0.2 - source_quality: 0.2 """ content = entry.get("content", entry.get("text", entry.get("response", ""))) model = entry.get("model", entry.get("provenance", {}).get("model")) timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp")) specificity = score_specificity(content) actionability = score_actionability(content) freshness = score_freshness(timestamp) source = score_source_quality(model) return round( 0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4 ) def score_entry_detailed(entry: dict) -> dict: """Score with breakdown.""" content = entry.get("content", entry.get("text", entry.get("response", ""))) model = entry.get("model", entry.get("provenance", {}).get("model")) timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp")) specificity = score_specificity(content) actionability = score_actionability(content) freshness = score_freshness(timestamp) source = score_source_quality(model) return { "score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4), "specificity": round(specificity, 4), "actionability": round(actionability, 4), "freshness": round(freshness, 4), "source_quality": round(source, 4), } def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]: """Filter entries below quality threshold.""" filtered = [] for entry in entries: if score_entry(entry) >= threshold: filtered.append(entry) return filtered def quality_report(entries: List[dict]) -> str: """Generate quality distribution report.""" if not entries: return "No entries to analyze." scores = [score_entry(e) for e in entries] avg = sum(scores) / len(scores) min_score = min(scores) max_score = max(scores) # Distribution buckets buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0} for s in scores: if s >= 0.7: buckets["high"] += 1 elif s >= 0.5: buckets["medium"] += 1 elif s >= 0.3: buckets["low"] += 1 else: buckets["rejected"] += 1 lines = [ "=" * 50, " QUALITY GATE REPORT", "=" * 50, f" Total entries: {len(entries)}", f" Average score: {avg:.3f}", f" Min: {min_score:.3f}", f" Max: {max_score:.3f}", "", " Distribution:", ] for bucket, count in buckets.items(): pct = count / len(entries) * 100 bar = "█" * int(pct / 5) lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}") passed = buckets["high"] + buckets["medium"] lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)") lines.append("=" * 50) return "\n".join(lines) def main(): import argparse parser = argparse.ArgumentParser(description="Knowledge quality gate") parser.add_argument("files", nargs="+", help="JSONL files to score") parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold") parser.add_argument("--json", action="store_true", help="JSON output") parser.add_argument("--filter", action="store_true", help="Filter and write back") args = parser.parse_args() all_entries = [] for filepath in args.files: with open(filepath) as f: for line in f: if line.strip(): all_entries.append(json.loads(line)) if args.json: results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries] print(json.dumps(results, indent=2)) elif args.filter: filtered = filter_entries(all_entries, args.threshold) print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})") else: print(quality_report(all_entries)) if __name__ == "__main__": main()