#!/usr/bin/env python3 """Deep Dive Quality Evaluation Framework — Issue #830 Scores generated briefings against a multi-dimensional rubric. Detects drift across consecutive runs. Supports A/B prompt testing. Usage: python3 quality_eval.py /path/to/briefing_20260405_124506.json python3 quality_eval.py /path/to/briefing.json --previous /path/to/briefing_yesterday.json python3 quality_eval.py /path/to/briefing.json --json """ import argparse import json import math import sys from dataclasses import dataclass, asdict from pathlib import Path from typing import List, Optional, Dict, Any # --------------------------------------------------------------------------- # Rubric configuration (tunable) # --------------------------------------------------------------------------- TARGET_WORD_COUNT_MIN = 600 TARGET_WORD_COUNT_MAX = 1200 TARGET_AUDIO_MINUTES_MIN = 10 TARGET_AUDIO_MINUTES_MAX = 15 MAX_SOURCES_EXPECTED = 12 RELEVANCE_KEYWORDS = [ "llm", "agent", "architecture", "hermes", "tool use", "mcp", "reinforcement learning", "rlhf", "grpo", "transformer", "local model", "llama.cpp", "gemma", "inference", "alignment", "fleet", "timmy", "nexus", "openclaw", "sovereign", ] ACTIONABILITY_MARKERS = [ "implication", "recommend", "should", "next step", "action", "deploy", "integrate", "watch", "risk", "opportunity", ] GROUNDING_MARKERS = [ "fleet", "repo", "issue", "pr ", "commit", "milestone", "wizard", "hermes", "timmy", "nexus", "openclaw", "bezalel", ] @dataclass class QualityReport: briefing_path: str overall_score: float # 0.0 - 100.0 relevance_score: float # 0.0 - 100.0 grounding_score: float # 0.0 - 100.0 conciseness_score: float # 0.0 - 100.0 actionability_score: float # 0.0 - 100.0 source_diversity_score: float # 0.0 - 100.0 drift_score: Optional[float] = None # 0.0 - 100.0 (similarity to previous) warnings: List[str] = None recommendations: List[str] = None def __post_init__(self): if self.warnings is None: self.warnings = [] if self.recommendations is None: self.recommendations = [] def load_briefing(path: Path) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as f: return json.load(f) def _word_count(text: str) -> int: return len(text.split()) def _estimate_audio_minutes(word_count: int, wpm: int = 130) -> float: return round(word_count / wpm, 1) def score_relevance(briefing: Dict[str, Any]) -> tuple[float, List[str]]: """Score how well the briefing covers AI/ML topics relevant to Hermes work.""" text = _extract_full_text(briefing).lower() hits = sum(1 for kw in RELEVANCE_KEYWORDS if kw in text) score = min(100.0, (hits / max(len(RELEVANCE_KEYWORDS) * 0.3, 1)) * 100.0) warnings = [] if hits < 3: warnings.append("Briefing lacks AI/ML relevance keywords.") return round(score, 1), warnings def score_grounding(briefing: Dict[str, Any]) -> tuple[float, List[str]]: """Score how well the briefing incorporates fleet context.""" text = _extract_full_text(briefing).lower() fleet_ctx = briefing.get("fleet_context") or briefing.get("context") or {} has_fleet_context = bool(fleet_ctx) hits = sum(1 for marker in GROUNDING_MARKERS if marker in text) score = min(100.0, (hits / max(len(GROUNDING_MARKERS) * 0.2, 1)) * 100.0) if has_fleet_context and hits < 2: score *= 0.5 # Penalty for ignoring injected context warnings = [] if not has_fleet_context: warnings.append("No fleet_context found in briefing payload.") elif hits < 2: warnings.append("Fleet context was injected but not referenced in briefing text.") return round(score, 1), warnings def score_conciseness(briefing: Dict[str, Any]) -> tuple[float, List[str]]: """Score whether briefing length lands in the target zone.""" text = _extract_full_text(briefing) wc = _word_count(text) audio_min = _estimate_audio_minutes(wc) warnings = [] if wc < TARGET_WORD_COUNT_MIN: warnings.append(f"Briefing too short ({wc} words). Target: {TARGET_WORD_COUNT_MIN}-{TARGET_WORD_COUNT_MAX}.") elif wc > TARGET_WORD_COUNT_MAX: warnings.append(f"Briefing too long ({wc} words). Target: {TARGET_WORD_COUNT_MIN}-{TARGET_WORD_COUNT_MAX}.") if audio_min < TARGET_AUDIO_MINUTES_MIN: warnings.append(f"Audio estimate too short ({audio_min} min). Target: {TARGET_AUDIO_MINUTES_MIN}-{TARGET_AUDIO_MINUTES_MAX}.") elif audio_min > TARGET_AUDIO_MINUTES_MAX: warnings.append(f"Audio estimate too long ({audio_min} min). Target: {TARGET_AUDIO_MINUTES_MIN}-{TARGET_AUDIO_MINUTES_MAX}.") # Score peaks at target center, falls off linearly outside center_wc = (TARGET_WORD_COUNT_MIN + TARGET_WORD_COUNT_MAX) / 2 deviation = abs(wc - center_wc) max_dev = max(center_wc - 0, TARGET_WORD_COUNT_MAX - center_wc) * 2 score = max(0.0, 100.0 - (deviation / max_dev) * 100.0) return round(score, 1), warnings def score_actionability(briefing: Dict[str, Any]) -> tuple[float, List[str]]: """Score whether the briefing contains explicit recommendations or next steps.""" text = _extract_full_text(briefing).lower() hits = sum(1 for marker in ACTIONABILITY_MARKERS if marker in text) score = min(100.0, (hits / max(len(ACTIONABILITY_MARKERS) * 0.3, 1)) * 100.0) warnings = [] if hits < 2: warnings.append("Briefing lacks explicit actionability markers (recommendations, next steps, risks).") return round(score, 1), warnings def score_source_diversity(briefing: Dict[str, Any]) -> tuple[float, List[str]]: """Score whether the briefing draws from a healthy variety of sources.""" sources = briefing.get("sources", []) if not sources and "items_ranked" in briefing: # Fallback: use items_ranked count as proxy n = briefing.get("items_ranked", 0) score = min(100.0, (n / 8) * 100.0) warnings = [] if n < 5: warnings.append(f"Only {n} items ranked — source diversity may be low.") return round(score, 1), warnings domains = set() for src in sources: url = src.get("url", "") if url: domain = url.split("/")[2] if "//" in url else url.split("/")[0] domains.add(domain) score = min(100.0, (len(domains) / 5) * 100.0) warnings = [] if len(domains) < 3: warnings.append(f"Only {len(domains)} unique sources — diversity may be low.") return round(score, 1), warnings def detect_drift(current: Dict[str, Any], previous: Dict[str, Any]) -> tuple[float, List[str]]: """Detect content drift between two briefings using simple overlap heuristics.""" curr_text = _extract_full_text(current).lower() prev_text = _extract_full_text(previous).lower() curr_words = set(curr_text.split()) prev_words = set(prev_text.split()) if not curr_words or not prev_words: return 0.0, ["Cannot compute drift — empty briefing text."] jaccard = len(curr_words & prev_words) / len(curr_words | prev_words) # Scale to 0-100 where 100 = identical, 0 = completely different score = round(jaccard * 100, 1) warnings = [] if score < 15: warnings.append(f"High drift detected (Jaccard={jaccard:.2f}). Briefings share very little vocabulary.") elif score > 85: warnings.append(f"Low drift (Jaccard={jaccard:.2f}). Briefings may be repetitive or stale.") return score, warnings def _extract_full_text(briefing: Dict[str, Any]) -> str: """Best-effort extraction of briefing text from payload variants.""" candidates = [ briefing.get("briefing_text"), briefing.get("text"), briefing.get("summary"), briefing.get("content"), ] for c in candidates: if c and isinstance(c, str): return c # If briefing has sections sections = briefing.get("sections", []) if sections: return "\n\n".join(str(s.get("text", s)) for s in sections) # If briefing has ranked items items = briefing.get("ranked_items", briefing.get("items", [])) if items: return "\n\n".join( f"{i.get('title', '')}\n{i.get('summary', i.get('text', ''))}" for i in items ) return json.dumps(briefing, indent=2) def evaluate(briefing_path: Path, previous_path: Optional[Path] = None) -> QualityReport: briefing = load_briefing(briefing_path) rel_score, rel_warn = score_relevance(briefing) grd_score, grd_warn = score_grounding(briefing) con_score, con_warn = score_conciseness(briefing) act_score, act_warn = score_actionability(briefing) div_score, div_warn = score_source_diversity(briefing) warnings = rel_warn + grd_warn + con_warn + act_warn + div_warn overall = round( (rel_score * 0.25 + grd_score * 0.25 + con_score * 0.20 + act_score * 0.20 + div_score * 0.10), 1, ) recommendations = [] if overall < 60: recommendations.append("CRITICAL: Briefing quality is below acceptable threshold. Review synthesis prompt and source configuration.") if rel_score < 50: recommendations.append("Relevance is low. Expand keyword list or tighten source aggregation.") if grd_score < 50: recommendations.append("Grounding is weak. Verify fleet_context injection is working and prompt references it explicitly.") if con_score < 50: recommendations.append("Length is off-target. Adjust synthesis prompt word-count guidance or ranking threshold.") if act_score < 50: recommendations.append("Actionability is low. Add explicit instructions to the synthesis prompt to include 'Implications' and 'Recommended Actions' sections.") drift_score = None if previous_path: previous = load_briefing(previous_path) drift_score, drift_warn = detect_drift(briefing, previous) warnings.extend(drift_warn) return QualityReport( briefing_path=str(briefing_path), overall_score=overall, relevance_score=rel_score, grounding_score=grd_score, conciseness_score=con_score, actionability_score=act_score, source_diversity_score=div_score, drift_score=drift_score, warnings=warnings, recommendations=recommendations, ) def print_report(report: QualityReport, json_mode: bool = False): if json_mode: print(json.dumps(asdict(report), indent=2)) return print("=" * 70) print(" DEEP DIVE QUALITY EVALUATION REPORT") print("=" * 70) print(f" Briefing : {report.briefing_path}") print(f" Overall : {report.overall_score}/100") print("-" * 70) print(f" Relevance : {report.relevance_score:>6}/100") print(f" Grounding : {report.grounding_score:>6}/100") print(f" Conciseness : {report.conciseness_score:>6}/100") print(f" Actionability : {report.actionability_score:>6}/100") print(f" Source Diversity : {report.source_diversity_score:>6}/100") if report.drift_score is not None: print(f" Drift vs Previous: {report.drift_score:>6}/100") print("-" * 70) if report.warnings: print("\n⚠️ WARNINGS:") for w in report.warnings: print(f" • {w}") if report.recommendations: print("\n💡 RECOMMENDATIONS:") for r in report.recommendations: print(f" • {r}") print("=" * 70) def main(): parser = argparse.ArgumentParser(description="Evaluate Deep Dive briefing quality") parser.add_argument("briefing", type=Path, help="Path to briefing JSON") parser.add_argument("--previous", type=Path, help="Path to previous briefing JSON for drift detection") parser.add_argument("--json", action="store_true", help="Output JSON") args = parser.parse_args() if not args.briefing.exists(): print(f"Error: briefing not found: {args.briefing}", file=sys.stderr) sys.exit(1) report = evaluate(args.briefing, args.previous) print_report(report, json_mode=args.json) # Exit non-zero if quality is critically low sys.exit(0 if report.overall_score >= 50 else 2) if __name__ == "__main__": main()