Assembles relevant knowledge from the store into a compact 2k-token context block for session injection. Features: - Filter by repo, agent type, and global scope - Sort by confidence (pitfalls first, patterns, facts) - Per-repo and per-agent markdown knowledge files - Graceful empty-store handling - JSON output mode for programmatic use - Token-count-aware truncation at line boundaries Closes #11
385 lines
15 KiB
Python
385 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test harness for knowledge extraction prompt.
|
|
Validates output structure, content quality, and hallucination resistance.
|
|
|
|
Usage:
|
|
python3 scripts/test_harvest_prompt.py # Run all tests
|
|
python3 scripts/test_harvest_prompt.py --transcript FILE # Test against a real transcript
|
|
python3 scripts/test_harvest_prompt.py --validate FILE # Validate an existing extraction JSON
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
VALID_CATEGORIES = {"fact", "pitfall", "pattern", "tool-quirk", "question"}
|
|
REQUIRED_FIELDS = {"fact", "category", "repo", "confidence", "evidence"}
|
|
REQUIRED_META = {"session_outcome", "tools_used", "repos_touched", "error_count", "knowledge_count"}
|
|
|
|
|
|
def validate_knowledge_item(item, idx):
|
|
"""Validate a single knowledge item. Returns list of errors."""
|
|
errors = []
|
|
if not isinstance(item, dict):
|
|
return [f"Item {idx}: not a dict"]
|
|
for field in REQUIRED_FIELDS:
|
|
if field not in item:
|
|
errors.append(f"Item {idx}: missing field '{field}'")
|
|
if not isinstance(item.get("fact", ""), str) or len(item.get("fact", "").strip()) == 0:
|
|
errors.append(f"Item {idx}: fact must be a non-empty string")
|
|
if item.get("category") not in VALID_CATEGORIES:
|
|
errors.append(f"Item {idx}: invalid category '{item.get('category')}'")
|
|
if not isinstance(item.get("repo", ""), str) or len(item.get("repo", "").strip()) == 0:
|
|
errors.append(f"Item {idx}: repo must be a non-empty string")
|
|
conf = item.get("confidence")
|
|
if not isinstance(conf, (int, float)) or not (0.0 <= conf <= 1.0):
|
|
errors.append(f"Item {idx}: confidence must be a number 0.0-1.0, got {conf}")
|
|
if not isinstance(item.get("evidence", ""), str) or len(item.get("evidence", "").strip()) == 0:
|
|
errors.append(f"Item {idx}: evidence must be a non-empty string (hallucination check)")
|
|
return errors
|
|
|
|
|
|
def validate_extraction(data):
|
|
"""Validate a full extraction result. Returns (is_valid, errors, warnings)."""
|
|
errors = []
|
|
warnings = []
|
|
|
|
if not isinstance(data, dict):
|
|
return False, ["Root is not a JSON object"], []
|
|
|
|
if "knowledge" not in data:
|
|
return False, ["Missing 'knowledge' array"], []
|
|
|
|
if not isinstance(data["knowledge"], list):
|
|
return False, ["'knowledge' is not an array"], []
|
|
|
|
for i, item in enumerate(data["knowledge"]):
|
|
errors.extend(validate_knowledge_item(item, i))
|
|
|
|
# Meta block validation
|
|
if "meta" not in data:
|
|
warnings.append("Missing 'meta' block (session_outcome, tools_used, etc.)")
|
|
else:
|
|
meta = data["meta"]
|
|
for field in REQUIRED_META:
|
|
if field not in meta:
|
|
warnings.append(f"Meta missing field '{field}'")
|
|
|
|
# Quality checks
|
|
facts = data["knowledge"]
|
|
if len(facts) == 0:
|
|
warnings.append("No knowledge extracted (empty session or extraction failure)")
|
|
|
|
# Check for near-duplicate facts
|
|
seen_facts = set()
|
|
for item in facts:
|
|
normalized = item.get("fact", "").lower().strip()[:80]
|
|
if normalized in seen_facts:
|
|
warnings.append(f"Duplicate fact detected: '{normalized[:50]}...'")
|
|
seen_facts.add(normalized)
|
|
|
|
# Check confidence distribution
|
|
confidences = [item.get("confidence", 0) for item in facts]
|
|
if confidences:
|
|
avg_conf = sum(confidences) / len(confidences)
|
|
if avg_conf > 0.9:
|
|
warnings.append(f"Average confidence {avg_conf:.2f} is suspiciously high (possible hallucination)")
|
|
if avg_conf < 0.4:
|
|
warnings.append(f"Average confidence {avg_conf:.2f} is very low (extraction may be too cautious)")
|
|
|
|
return len(errors) == 0, errors, warnings
|
|
|
|
|
|
def validate_transcript_coverage(data, transcript):
|
|
"""
|
|
Check that extracted facts are actually supported by the transcript.
|
|
This is a heuristic — checks that key terms from facts appear in transcript.
|
|
Returns list of potential hallucinations.
|
|
"""
|
|
hallucinations = []
|
|
transcript_lower = transcript.lower()
|
|
|
|
for item in data.get("knowledge", []):
|
|
fact = item.get("fact", "")
|
|
evidence = item.get("evidence", "")
|
|
|
|
# Check if evidence string appears in transcript
|
|
if evidence and evidence.lower() not in transcript_lower:
|
|
# Partial match — check if key terms are present
|
|
evidence_words = set(evidence.lower().split())
|
|
transcript_words = set(transcript_lower.split())
|
|
overlap = evidence_words & transcript_words
|
|
if len(overlap) < min(3, len(evidence_words) * 0.3):
|
|
hallucinations.append({
|
|
"fact": fact[:80],
|
|
"evidence": evidence[:80],
|
|
"issue": "Evidence not found in transcript — possible hallucination"
|
|
})
|
|
|
|
return hallucinations
|
|
|
|
|
|
# --- Test transcripts ---
|
|
|
|
SAMPLE_TRANSCRIPT_SUCCESS = """
|
|
User: Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence
|
|
Agent: Trying git clone... Error: 401 Unauthorized
|
|
Agent: Checking for token... Found in ~/.config/gitea/token
|
|
Agent: Token is gitea_token format, not OAuth
|
|
Agent: Clone successful with token authentication
|
|
Agent: Now examining the repo structure...
|
|
Agent: Found knowledge/index.json with 0 facts
|
|
Agent: Creating harvest-prompt.md in templates/
|
|
Agent: Done. Files created: templates/harvest-prompt.md, scripts/test_harvest_prompt.py
|
|
"""
|
|
|
|
SAMPLE_TRANSCRIPT_FAILURE = """
|
|
User: Deploy the cron jobs to production
|
|
Agent: Connecting to VPS... SSH timeout after 30s
|
|
Agent: Retrying with verbose output... Connection refused on port 22
|
|
Agent: Checking firewall rules... iptables shows port 22 is open
|
|
Agent: Checking sshd config... sshd is not running
|
|
Agent: Attempting to start sshd... systemctl start sshd failed: unit not found
|
|
Agent: ERROR: OpenSSH server not installed on this VPS
|
|
Agent: Cannot proceed without SSH access. Deployment blocked.
|
|
"""
|
|
|
|
SAMPLE_TRANSCRIPT_API_QUIRK = """
|
|
User: List all open issues in hermes-agent
|
|
Agent: Querying Gitea API... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?state=open&limit=50
|
|
Agent: Response: 0 items. That's odd.
|
|
Agent: Trying without state filter... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?limit=50
|
|
Agent: Response: 50 items returned, but all have pull_request field. These are PRs, not issues.
|
|
Agent: Gitea issues API returns PRs alongside issues. Must filter by presence of pull_request key.
|
|
Agent: Also discovered: state=open parameter returns empty when used with limit=50, but works with limit=5.
|
|
Agent: This appears to be a Gitea API quirk — pagination interacts with state filter unexpectedly.
|
|
Agent: Workaround: fetch without state filter, then filter client-side for state=='open' and 'pull_request' not in item.
|
|
"""
|
|
|
|
|
|
def run_tests():
|
|
"""Run the built-in test suite."""
|
|
tests_passed = 0
|
|
tests_failed = 0
|
|
|
|
print("=" * 60)
|
|
print("KNOWLEDGE EXTRACTION PROMPT — TEST SUITE")
|
|
print("=" * 60)
|
|
|
|
# Test 1: Prompt file exists and is under 2k tokens (~8k chars)
|
|
print("\n[Test 1] Prompt file size constraint")
|
|
prompt_path = Path("templates/harvest-prompt.md")
|
|
if not prompt_path.exists():
|
|
print(" FAIL: harvest-prompt.md not found")
|
|
tests_failed += 1
|
|
else:
|
|
size = prompt_path.stat().st_size
|
|
# Rough token estimate: ~4 chars per token
|
|
est_tokens = size / 4
|
|
print(f" Prompt size: {size} bytes (~{est_tokens:.0f} tokens)")
|
|
if est_tokens > 2000:
|
|
print(f" WARN: Prompt exceeds ~1500 tokens (target: ~1000)")
|
|
else:
|
|
print(f" PASS: Within token budget")
|
|
tests_passed += 1
|
|
|
|
# Test 2: Validate a well-formed extraction
|
|
print("\n[Test 2] Valid extraction passes validation")
|
|
valid_extraction = {
|
|
"knowledge": [
|
|
{
|
|
"fact": "Gitea auth token is at ~/.config/gitea/token",
|
|
"category": "tool-quirk",
|
|
"repo": "global",
|
|
"confidence": 0.9,
|
|
"evidence": "Found in ~/.config/gitea/token"
|
|
},
|
|
{
|
|
"fact": "Clone fails with 401 when no token is provided",
|
|
"category": "pitfall",
|
|
"repo": "compounding-intelligence",
|
|
"confidence": 0.9,
|
|
"evidence": "Error: 401 Unauthorized"
|
|
}
|
|
],
|
|
"meta": {
|
|
"session_outcome": "success",
|
|
"tools_used": ["git"],
|
|
"repos_touched": ["compounding-intelligence"],
|
|
"error_count": 1,
|
|
"knowledge_count": 2
|
|
}
|
|
}
|
|
is_valid, errors, warnings = validate_extraction(valid_extraction)
|
|
if is_valid:
|
|
print(f" PASS: Valid extraction accepted ({len(warnings)} warnings)")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Valid extraction rejected: {errors}")
|
|
tests_failed += 1
|
|
|
|
# Test 3: Reject missing fields
|
|
print("\n[Test 3] Missing fields are rejected")
|
|
bad_extraction = {
|
|
"knowledge": [
|
|
{"fact": "Something learned", "category": "fact"} # Missing repo, confidence, evidence
|
|
]
|
|
}
|
|
is_valid, errors, warnings = validate_extraction(bad_extraction)
|
|
if not is_valid:
|
|
print(f" PASS: Rejected with {len(errors)} errors")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Should have rejected missing fields")
|
|
tests_failed += 1
|
|
|
|
# Test 4: Reject invalid category
|
|
print("\n[Test 4] Invalid category is rejected")
|
|
bad_cat = {
|
|
"knowledge": [
|
|
{"fact": "Test", "category": "discovery", "repo": "x", "confidence": 0.8, "evidence": "test"}
|
|
]
|
|
}
|
|
is_valid, errors, warnings = validate_extraction(bad_cat)
|
|
if not is_valid and any("category" in e for e in errors):
|
|
print(f" PASS: Invalid category 'discovery' rejected")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Should have rejected invalid category")
|
|
tests_failed += 1
|
|
|
|
# Test 5: Detect near-duplicates
|
|
print("\n[Test 5] Duplicate detection")
|
|
dup_extraction = {
|
|
"knowledge": [
|
|
{"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "a"},
|
|
{"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "b"}
|
|
],
|
|
"meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 2}
|
|
}
|
|
is_valid, errors, warnings = validate_extraction(dup_extraction)
|
|
if any("Duplicate" in w for w in warnings):
|
|
print(f" PASS: Duplicate detected")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Should have detected duplicate")
|
|
tests_failed += 1
|
|
|
|
# Test 6: Hallucination check against transcript
|
|
print("\n[Test 6] Hallucination detection")
|
|
hallucinated = {
|
|
"knowledge": [
|
|
{
|
|
"fact": "Database port is 5433",
|
|
"category": "fact",
|
|
"repo": "x",
|
|
"confidence": 0.9,
|
|
"evidence": "PostgreSQL listening on port 5433"
|
|
}
|
|
],
|
|
"meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 1}
|
|
}
|
|
hallucinations = validate_transcript_coverage(hallucinated, SAMPLE_TRANSCRIPT_SUCCESS)
|
|
if hallucinations:
|
|
print(f" PASS: Hallucination detected ({len(hallucinations)} items)")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Should have detected hallucinated evidence")
|
|
tests_failed += 1
|
|
|
|
# Test 7: Failed session should extract pitfalls
|
|
print("\n[Test 7] Failed session extraction shape")
|
|
failed_extraction = {
|
|
"knowledge": [
|
|
{
|
|
"fact": "SSH server not installed on target VPS",
|
|
"category": "pitfall",
|
|
"repo": "global",
|
|
"confidence": 0.9,
|
|
"evidence": "ERROR: OpenSSH server not installed on this VPS"
|
|
},
|
|
{
|
|
"fact": "VPS blocks deployment without SSH access",
|
|
"category": "question",
|
|
"repo": "global",
|
|
"confidence": 0.7,
|
|
"evidence": "Cannot proceed without SSH access. Deployment blocked."
|
|
}
|
|
],
|
|
"meta": {
|
|
"session_outcome": "failed",
|
|
"tools_used": ["ssh", "systemctl"],
|
|
"repos_touched": [],
|
|
"error_count": 3,
|
|
"knowledge_count": 2
|
|
}
|
|
}
|
|
is_valid, errors, warnings = validate_extraction(failed_extraction)
|
|
if is_valid:
|
|
categories = [item["category"] for item in failed_extraction["knowledge"]]
|
|
if "pitfall" in categories:
|
|
print(f" PASS: Failed session extracted {len(categories)} items including pitfalls")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Failed session should extract pitfalls")
|
|
tests_failed += 1
|
|
else:
|
|
print(f" FAIL: {errors}")
|
|
tests_failed += 1
|
|
|
|
# Test 8: Empty extraction is warned
|
|
print("\n[Test 8] Empty extraction warning")
|
|
empty = {"knowledge": [], "meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 0}}
|
|
is_valid, errors, warnings = validate_extraction(empty)
|
|
if any("No knowledge" in w for w in warnings):
|
|
print(f" PASS: Empty extraction warned")
|
|
tests_passed += 1
|
|
else:
|
|
print(f" FAIL: Should warn on empty extraction")
|
|
tests_failed += 1
|
|
|
|
# Summary
|
|
print(f"\n{'=' * 60}")
|
|
print(f"Results: {tests_passed} passed, {tests_failed} failed")
|
|
print(f"{'=' * 60}")
|
|
return tests_failed == 0
|
|
|
|
|
|
def validate_file(filepath):
|
|
"""Validate an existing extraction JSON file."""
|
|
path = Path(filepath)
|
|
if not path.exists():
|
|
print(f"ERROR: {filepath} not found")
|
|
return False
|
|
|
|
data = json.loads(path.read_text())
|
|
is_valid, errors, warnings = validate_extraction(data)
|
|
|
|
print(f"Validation of {filepath}:")
|
|
print(f" Knowledge items: {len(data.get('knowledge', []))}")
|
|
print(f" Errors: {len(errors)}")
|
|
print(f" Warnings: {len(warnings)}")
|
|
|
|
for e in errors:
|
|
print(f" ERROR: {e}")
|
|
for w in warnings:
|
|
print(f" WARN: {w}")
|
|
|
|
return is_valid
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Test knowledge extraction prompt")
|
|
parser.add_argument("--validate", help="Validate an existing extraction JSON file")
|
|
parser.add_argument("--transcript", help="Test against a real transcript file (informational)")
|
|
args = parser.parse_args()
|
|
|
|
if args.validate:
|
|
success = validate_file(args.validate)
|
|
sys.exit(0 if success else 1)
|
|
else:
|
|
success = run_tests()
|
|
sys.exit(0 if success else 1)
|