#!/usr/bin/env python3 """ Test harness for knowledge extraction prompt. Validates output structure, content quality, and hallucination resistance. Usage: python3 scripts/test_harvest_prompt.py # Run all tests python3 scripts/test_harvest_prompt.py --transcript FILE # Test against a real transcript python3 scripts/test_harvest_prompt.py --validate FILE # Validate an existing extraction JSON """ import json import sys import argparse from pathlib import Path VALID_CATEGORIES = {"fact", "pitfall", "pattern", "tool-quirk", "question"} REQUIRED_FIELDS = {"fact", "category", "repo", "confidence", "evidence"} REQUIRED_META = {"session_outcome", "tools_used", "repos_touched", "error_count", "knowledge_count"} def validate_knowledge_item(item, idx): """Validate a single knowledge item. Returns list of errors.""" errors = [] if not isinstance(item, dict): return [f"Item {idx}: not a dict"] for field in REQUIRED_FIELDS: if field not in item: errors.append(f"Item {idx}: missing field '{field}'") if not isinstance(item.get("fact", ""), str) or len(item.get("fact", "").strip()) == 0: errors.append(f"Item {idx}: fact must be a non-empty string") if item.get("category") not in VALID_CATEGORIES: errors.append(f"Item {idx}: invalid category '{item.get('category')}'") if not isinstance(item.get("repo", ""), str) or len(item.get("repo", "").strip()) == 0: errors.append(f"Item {idx}: repo must be a non-empty string") conf = item.get("confidence") if not isinstance(conf, (int, float)) or not (0.0 <= conf <= 1.0): errors.append(f"Item {idx}: confidence must be a number 0.0-1.0, got {conf}") if not isinstance(item.get("evidence", ""), str) or len(item.get("evidence", "").strip()) == 0: errors.append(f"Item {idx}: evidence must be a non-empty string (hallucination check)") return errors def validate_extraction(data): """Validate a full extraction result. Returns (is_valid, errors, warnings).""" errors = [] warnings = [] if not isinstance(data, dict): return False, ["Root is not a JSON object"], [] if "knowledge" not in data: return False, ["Missing 'knowledge' array"], [] if not isinstance(data["knowledge"], list): return False, ["'knowledge' is not an array"], [] for i, item in enumerate(data["knowledge"]): errors.extend(validate_knowledge_item(item, i)) # Meta block validation if "meta" not in data: warnings.append("Missing 'meta' block (session_outcome, tools_used, etc.)") else: meta = data["meta"] for field in REQUIRED_META: if field not in meta: warnings.append(f"Meta missing field '{field}'") # Quality checks facts = data["knowledge"] if len(facts) == 0: warnings.append("No knowledge extracted (empty session or extraction failure)") # Check for near-duplicate facts seen_facts = set() for item in facts: normalized = item.get("fact", "").lower().strip()[:80] if normalized in seen_facts: warnings.append(f"Duplicate fact detected: '{normalized[:50]}...'") seen_facts.add(normalized) # Check confidence distribution confidences = [item.get("confidence", 0) for item in facts] if confidences: avg_conf = sum(confidences) / len(confidences) if avg_conf > 0.9: warnings.append(f"Average confidence {avg_conf:.2f} is suspiciously high (possible hallucination)") if avg_conf < 0.4: warnings.append(f"Average confidence {avg_conf:.2f} is very low (extraction may be too cautious)") return len(errors) == 0, errors, warnings def validate_transcript_coverage(data, transcript): """ Check that extracted facts are actually supported by the transcript. This is a heuristic — checks that key terms from facts appear in transcript. Returns list of potential hallucinations. """ hallucinations = [] transcript_lower = transcript.lower() for item in data.get("knowledge", []): fact = item.get("fact", "") evidence = item.get("evidence", "") # Check if evidence string appears in transcript if evidence and evidence.lower() not in transcript_lower: # Partial match — check if key terms are present evidence_words = set(evidence.lower().split()) transcript_words = set(transcript_lower.split()) overlap = evidence_words & transcript_words if len(overlap) < min(3, len(evidence_words) * 0.3): hallucinations.append({ "fact": fact[:80], "evidence": evidence[:80], "issue": "Evidence not found in transcript — possible hallucination" }) return hallucinations # --- Test transcripts --- SAMPLE_TRANSCRIPT_SUCCESS = """ User: Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence Agent: Trying git clone... Error: 401 Unauthorized Agent: Checking for token... Found in ~/.config/gitea/token Agent: Token is gitea_token format, not OAuth Agent: Clone successful with token authentication Agent: Now examining the repo structure... Agent: Found knowledge/index.json with 0 facts Agent: Creating harvest-prompt.md in templates/ Agent: Done. Files created: templates/harvest-prompt.md, scripts/test_harvest_prompt.py """ SAMPLE_TRANSCRIPT_FAILURE = """ User: Deploy the cron jobs to production Agent: Connecting to VPS... SSH timeout after 30s Agent: Retrying with verbose output... Connection refused on port 22 Agent: Checking firewall rules... iptables shows port 22 is open Agent: Checking sshd config... sshd is not running Agent: Attempting to start sshd... systemctl start sshd failed: unit not found Agent: ERROR: OpenSSH server not installed on this VPS Agent: Cannot proceed without SSH access. Deployment blocked. """ SAMPLE_TRANSCRIPT_API_QUIRK = """ User: List all open issues in hermes-agent Agent: Querying Gitea API... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?state=open&limit=50 Agent: Response: 0 items. That's odd. Agent: Trying without state filter... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?limit=50 Agent: Response: 50 items returned, but all have pull_request field. These are PRs, not issues. Agent: Gitea issues API returns PRs alongside issues. Must filter by presence of pull_request key. Agent: Also discovered: state=open parameter returns empty when used with limit=50, but works with limit=5. Agent: This appears to be a Gitea API quirk — pagination interacts with state filter unexpectedly. Agent: Workaround: fetch without state filter, then filter client-side for state=='open' and 'pull_request' not in item. """ def run_tests(): """Run the built-in test suite.""" tests_passed = 0 tests_failed = 0 print("=" * 60) print("KNOWLEDGE EXTRACTION PROMPT — TEST SUITE") print("=" * 60) # Test 1: Prompt file exists and is under 2k tokens (~8k chars) print("\n[Test 1] Prompt file size constraint") prompt_path = Path("templates/harvest-prompt.md") if not prompt_path.exists(): print(" FAIL: harvest-prompt.md not found") tests_failed += 1 else: size = prompt_path.stat().st_size # Rough token estimate: ~4 chars per token est_tokens = size / 4 print(f" Prompt size: {size} bytes (~{est_tokens:.0f} tokens)") if est_tokens > 2000: print(f" WARN: Prompt exceeds ~1500 tokens (target: ~1000)") else: print(f" PASS: Within token budget") tests_passed += 1 # Test 2: Validate a well-formed extraction print("\n[Test 2] Valid extraction passes validation") valid_extraction = { "knowledge": [ { "fact": "Gitea auth token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9, "evidence": "Found in ~/.config/gitea/token" }, { "fact": "Clone fails with 401 when no token is provided", "category": "pitfall", "repo": "compounding-intelligence", "confidence": 0.9, "evidence": "Error: 401 Unauthorized" } ], "meta": { "session_outcome": "success", "tools_used": ["git"], "repos_touched": ["compounding-intelligence"], "error_count": 1, "knowledge_count": 2 } } is_valid, errors, warnings = validate_extraction(valid_extraction) if is_valid: print(f" PASS: Valid extraction accepted ({len(warnings)} warnings)") tests_passed += 1 else: print(f" FAIL: Valid extraction rejected: {errors}") tests_failed += 1 # Test 3: Reject missing fields print("\n[Test 3] Missing fields are rejected") bad_extraction = { "knowledge": [ {"fact": "Something learned", "category": "fact"} # Missing repo, confidence, evidence ] } is_valid, errors, warnings = validate_extraction(bad_extraction) if not is_valid: print(f" PASS: Rejected with {len(errors)} errors") tests_passed += 1 else: print(f" FAIL: Should have rejected missing fields") tests_failed += 1 # Test 4: Reject invalid category print("\n[Test 4] Invalid category is rejected") bad_cat = { "knowledge": [ {"fact": "Test", "category": "discovery", "repo": "x", "confidence": 0.8, "evidence": "test"} ] } is_valid, errors, warnings = validate_extraction(bad_cat) if not is_valid and any("category" in e for e in errors): print(f" PASS: Invalid category 'discovery' rejected") tests_passed += 1 else: print(f" FAIL: Should have rejected invalid category") tests_failed += 1 # Test 5: Detect near-duplicates print("\n[Test 5] Duplicate detection") dup_extraction = { "knowledge": [ {"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "a"}, {"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "b"} ], "meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 2} } is_valid, errors, warnings = validate_extraction(dup_extraction) if any("Duplicate" in w for w in warnings): print(f" PASS: Duplicate detected") tests_passed += 1 else: print(f" FAIL: Should have detected duplicate") tests_failed += 1 # Test 6: Hallucination check against transcript print("\n[Test 6] Hallucination detection") hallucinated = { "knowledge": [ { "fact": "Database port is 5433", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "PostgreSQL listening on port 5433" } ], "meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 1} } hallucinations = validate_transcript_coverage(hallucinated, SAMPLE_TRANSCRIPT_SUCCESS) if hallucinations: print(f" PASS: Hallucination detected ({len(hallucinations)} items)") tests_passed += 1 else: print(f" FAIL: Should have detected hallucinated evidence") tests_failed += 1 # Test 7: Failed session should extract pitfalls print("\n[Test 7] Failed session extraction shape") failed_extraction = { "knowledge": [ { "fact": "SSH server not installed on target VPS", "category": "pitfall", "repo": "global", "confidence": 0.9, "evidence": "ERROR: OpenSSH server not installed on this VPS" }, { "fact": "VPS blocks deployment without SSH access", "category": "question", "repo": "global", "confidence": 0.7, "evidence": "Cannot proceed without SSH access. Deployment blocked." } ], "meta": { "session_outcome": "failed", "tools_used": ["ssh", "systemctl"], "repos_touched": [], "error_count": 3, "knowledge_count": 2 } } is_valid, errors, warnings = validate_extraction(failed_extraction) if is_valid: categories = [item["category"] for item in failed_extraction["knowledge"]] if "pitfall" in categories: print(f" PASS: Failed session extracted {len(categories)} items including pitfalls") tests_passed += 1 else: print(f" FAIL: Failed session should extract pitfalls") tests_failed += 1 else: print(f" FAIL: {errors}") tests_failed += 1 # Test 8: Empty extraction is warned print("\n[Test 8] Empty extraction warning") empty = {"knowledge": [], "meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 0}} is_valid, errors, warnings = validate_extraction(empty) if any("No knowledge" in w for w in warnings): print(f" PASS: Empty extraction warned") tests_passed += 1 else: print(f" FAIL: Should warn on empty extraction") tests_failed += 1 # Summary print(f"\n{'=' * 60}") print(f"Results: {tests_passed} passed, {tests_failed} failed") print(f"{'=' * 60}") return tests_failed == 0 def validate_file(filepath): """Validate an existing extraction JSON file.""" path = Path(filepath) if not path.exists(): print(f"ERROR: {filepath} not found") return False data = json.loads(path.read_text()) is_valid, errors, warnings = validate_extraction(data) print(f"Validation of {filepath}:") print(f" Knowledge items: {len(data.get('knowledge', []))}") print(f" Errors: {len(errors)}") print(f" Warnings: {len(warnings)}") for e in errors: print(f" ERROR: {e}") for w in warnings: print(f" WARN: {w}") return is_valid if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test knowledge extraction prompt") parser.add_argument("--validate", help="Validate an existing extraction JSON file") parser.add_argument("--transcript", help="Test against a real transcript file (informational)") args = parser.parse_args() if args.validate: success = validate_file(args.validate) sys.exit(0 if success else 1) else: success = run_tests() sys.exit(0 if success else 1)