Compare commits

..

2 Commits

6 changed files with 721 additions and 1050 deletions

View File

@@ -1,359 +0,0 @@
#!/usr/bin/env python3
"""
Bootstrapper — assemble pre-session context from knowledge store.
Reads the knowledge store and produces a compact context block (2k tokens max)
that can be injected into a new session so it starts with situational awareness.
Usage:
python3 bootstrapper.py --repo the-nexus --agent mimo-sprint
python3 bootstrapper.py --repo timmy-home --global
python3 bootstrapper.py --global
python3 bootstrapper.py --repo the-nexus --max-tokens 1000
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Optional
# Resolve knowledge root relative to this script's parent
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
INDEX_PATH = KNOWLEDGE_DIR / "index.json"
# Approximate token count: ~4 chars per token for English text
CHARS_PER_TOKEN = 4
# Category sort priority (lower = shown first)
CATEGORY_PRIORITY = {
"pitfall": 0,
"tool-quirk": 1,
"pattern": 2,
"fact": 3,
"question": 4,
}
def load_index(index_path: Path = INDEX_PATH) -> dict:
"""Load and validate the knowledge index."""
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
data = json.load(f)
if "facts" not in data:
print(f"WARNING: index.json missing 'facts' key", file=sys.stderr)
return {"version": 1, "total_facts": 0, "facts": []}
return data
def filter_facts(
facts: list[dict],
repo: Optional[str] = None,
agent: Optional[str] = None,
include_global: bool = True,
) -> list[dict]:
"""Filter facts by repo, agent, and global scope."""
filtered = []
for fact in facts:
fact_repo = fact.get("repo", "global")
fact_agent = fact.get("agent", "")
# Match by repo (regardless of agent)
if repo and fact_repo == repo:
filtered.append(fact)
continue
# Match by exact agent type
if agent and fact_agent == agent:
filtered.append(fact)
continue
# Include global facts without agent restriction (universal facts)
if include_global and fact_repo == "global" and not fact_agent:
filtered.append(fact)
return filtered
def sort_facts(facts: list[dict]) -> list[dict]:
"""
Sort facts by: confidence (desc), then category priority, then fact text.
Most reliable and most dangerous facts come first.
"""
def sort_key(f):
confidence = f.get("confidence", 0.5)
category = f.get("category", "fact")
cat_priority = CATEGORY_PRIORITY.get(category, 5)
return (-confidence, cat_priority, f.get("fact", ""))
return sorted(facts, key=sort_key)
def load_repo_knowledge(repo: str) -> Optional[str]:
"""Load per-repo knowledge markdown if it exists."""
repo_path = KNOWLEDGE_DIR / "repos" / f"{repo}.md"
if repo_path.exists():
return repo_path.read_text().strip()
return None
def load_agent_knowledge(agent: str) -> Optional[str]:
"""Load per-agent knowledge markdown if it exists."""
agent_path = KNOWLEDGE_DIR / "agents" / f"{agent}.md"
if agent_path.exists():
return agent_path.read_text().strip()
return None
def load_global_knowledge() -> list[str]:
"""Load all global knowledge markdown files."""
global_dir = KNOWLEDGE_DIR / "global"
if not global_dir.exists():
return []
chunks = []
for md_file in sorted(global_dir.glob("*.md")):
content = md_file.read_text().strip()
if content:
chunks.append(content)
return chunks
def render_facts_section(facts: list[dict], category: str, label: str) -> str:
"""Render a section of facts for a single category."""
cat_facts = [f for f in facts if f.get("category") == category]
if not cat_facts:
return ""
lines = [f"### {label}\n"]
for f in cat_facts:
conf = f.get("confidence", 0.5)
fact_text = f.get("fact", "")
repo_tag = f.get("repo", "")
if repo_tag and repo_tag != "global":
lines.append(f"- [{conf:.0%}] ({repo_tag}) {fact_text}")
else:
lines.append(f"- [{conf:.0%}] {fact_text}")
return "\n".join(lines) + "\n"
def estimate_tokens(text: str) -> int:
"""Rough token estimate."""
return len(text) // CHARS_PER_TOKEN
def truncate_to_tokens(text: str, max_tokens: int) -> str:
"""Truncate text to approximately max_tokens, cutting at line boundaries."""
max_chars = max_tokens * CHARS_PER_TOKEN
if len(text) <= max_chars:
return text
# Cut at last newline before the limit
truncated = text[:max_chars]
last_newline = truncated.rfind("\n")
if last_newline > 0:
truncated = truncated[:last_newline]
return truncated + "\n\n[... truncated to fit context window ...]"
def build_bootstrap_context(
repo: Optional[str] = None,
agent: Optional[str] = None,
include_global: bool = True,
max_tokens: int = 2000,
index_path: Path = INDEX_PATH,
) -> str:
"""
Build the full bootstrap context block.
Returns a markdown string suitable for injection into a session prompt.
"""
index = load_index(index_path)
facts = index.get("facts", [])
# Filter
filtered = filter_facts(facts, repo=repo, agent=agent, include_global=include_global)
# Sort
sorted_facts = sort_facts(filtered)
# Build sections
sections = ["## What You Know (bootstrapped)\n"]
# Per-repo markdown knowledge
if repo:
repo_md = load_repo_knowledge(repo)
if repo_md:
sections.append(f"### Repo Notes: {repo}\n")
sections.append(repo_md + "\n")
# Structured facts by category
if sorted_facts:
# Group by source
repo_facts = [f for f in sorted_facts if f.get("repo") == repo] if repo else []
global_facts = [f for f in sorted_facts if f.get("repo") == "global"]
agent_facts = [f for f in sorted_facts if f.get("agent") == agent] if agent else []
if repo_facts:
sections.append(f"### Repo: {repo}\n")
for cat, label in [
("pitfall", "PITFALLS"),
("tool-quirk", "QUIRKS"),
("pattern", "PATTERNS"),
("fact", "FACTS"),
("question", "OPEN QUESTIONS"),
]:
section = render_facts_section(repo_facts, cat, label)
if section:
sections.append(section)
if global_facts:
sections.append("### Global\n")
for cat, label in [
("pitfall", "PITFALLS"),
("tool-quirk", "QUIRKS"),
("pattern", "PATTERNS"),
("fact", "FACTS"),
]:
section = render_facts_section(global_facts, cat, label)
if section:
sections.append(section)
if agent_facts:
sections.append(f"### Agent Notes ({agent})\n")
for cat, label in [
("pitfall", "PITFALLS"),
("tool-quirk", "QUIRKS"),
("pattern", "PATTERNS"),
("fact", "FACTS"),
]:
section = render_facts_section(agent_facts, cat, label)
if section:
sections.append(section)
# Per-agent markdown knowledge
if agent:
agent_md = load_agent_knowledge(agent)
if agent_md:
sections.append(f"### Agent Profile: {agent}\n")
sections.append(agent_md + "\n")
# Global markdown knowledge
global_chunks = load_global_knowledge()
if global_chunks:
sections.append("### Global Notes\n")
sections.extend(chunk + "\n" for chunk in global_chunks)
# If nothing was found
if len(sections) == 1:
sections.append("_No relevant knowledge found. Starting fresh._\n")
if not facts:
sections.append(
"_Knowledge store is empty. Run the harvester to populate it._\n"
)
# Join and truncate
context = "\n".join(sections)
context = truncate_to_tokens(context, max_tokens)
return context
def main():
parser = argparse.ArgumentParser(
description="Assemble pre-session context from knowledge store"
)
parser.add_argument(
"--repo",
type=str,
default=None,
help="Repository name to filter facts by",
)
parser.add_argument(
"--agent",
type=str,
default=None,
help="Agent type to filter facts by (e.g., mimo-sprint, groq-fast)",
)
parser.add_argument(
"--global",
dest="include_global",
action="store_true",
default=True,
help="Include global facts (default: true)",
)
parser.add_argument(
"--no-global",
dest="include_global",
action="store_false",
help="Exclude global facts",
)
parser.add_argument(
"--max-tokens",
type=int,
default=2000,
help="Maximum token count for output (default: 2000)",
)
parser.add_argument(
"--index",
type=str,
default=None,
help="Path to index.json (default: knowledge/index.json)",
)
parser.add_argument(
"--json",
dest="output_json",
action="store_true",
help="Output raw JSON instead of markdown",
)
args = parser.parse_args()
index_path = Path(args.index) if args.index else INDEX_PATH
if args.output_json:
# JSON mode: return the filtered, sorted facts
index = load_index(index_path)
facts = index.get("facts", [])
filtered = filter_facts(
facts,
repo=args.repo,
agent=args.agent,
include_global=args.include_global,
)
sorted_facts = sort_facts(filtered)
output = {
"repo": args.repo,
"agent": args.agent,
"include_global": args.include_global,
"total_indexed": len(facts),
"matched": len(sorted_facts),
"facts": sorted_facts,
}
print(json.dumps(output, indent=2))
else:
# Markdown mode: full bootstrap context
context = build_bootstrap_context(
repo=args.repo,
agent=args.agent,
include_global=args.include_global,
max_tokens=args.max_tokens,
index_path=index_path,
)
print(context)
return 0
if __name__ == "__main__":
sys.exit(main())

447
scripts/harvester.py Normal file
View File

@@ -0,0 +1,447 @@
#!/usr/bin/env python3
"""
harvester.py — Extract durable knowledge from Hermes session transcripts.
Combines session_reader + extraction prompt + LLM inference to pull
facts, pitfalls, patterns, and tool quirks from finished sessions.
Usage:
python3 harvester.py --session ~/.hermes/sessions/session_xxx.jsonl --output knowledge/
python3 harvester.py --batch --since 2026-04-01 --limit 100
python3 harvester.py --session session.jsonl --dry-run # Preview without writing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Add scripts dir to path for sibling imports
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
# --- Configuration ---
DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
DEFAULT_API_KEY = os.environ.get("HARVESTER_API_KEY", "")
DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
KNOWLEDGE_DIR = os.environ.get("HARVESTER_KNOWLEDGE_DIR", "knowledge")
PROMPT_PATH = os.environ.get("HARVESTER_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "harvest-prompt.md"))
# Where to look for API keys if not set via env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
"""Find API key from common locations."""
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_extraction_prompt() -> str:
"""Load the extraction prompt template."""
path = Path(PROMPT_PATH)
if not path.exists():
print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr)
print("Expected templates/harvest-prompt.md from issue #7", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding='utf-8')
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[list[dict]]:
"""Call the LLM API to extract knowledge from a transcript."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1, # Low temp for consistent extraction
"max_tokens": 4096
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_extraction_response(content)
except Exception as e:
print(f"ERROR: LLM API call failed: {e}", file=sys.stderr)
return None
def parse_extraction_response(content: str) -> Optional[list[dict]]:
"""Parse the LLM response to extract knowledge items.
Handles various response formats: raw JSON, markdown-wrapped JSON, etc.
"""
# Try direct JSON parse first
try:
data = json.loads(content)
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try extracting JSON from markdown code blocks
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object with knowledge array
json_match = re.search(r'({[^{}]*"knowledge"[^{}]*[[sS]*?][^{}]*})', content)
if json_match:
try:
data = json.loads(json_match.group(1))
return data.get('knowledge', [])
except json.JSONDecodeError:
pass
print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr)
print(f"Response preview: {content[:500]}", file=sys.stderr)
return None
def load_existing_knowledge(knowledge_dir: str) -> dict:
"""Load the existing knowledge index."""
index_path = Path(knowledge_dir) / "index.json"
if not index_path.exists():
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
try:
with open(index_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr)
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
def fact_fingerprint(fact: dict) -> str:
"""Generate a deduplication fingerprint for a fact.
Uses the fact text normalized (lowercase, stripped) as the key.
Similar facts will have similar fingerprints.
"""
text = fact.get('fact', '').lower().strip()
# Normalize whitespace
text = ' '.join(text.split())
return hashlib.md5(text.encode('utf-8')).hexdigest()
def deduplicate(new_facts: list[dict], existing: list[dict], similarity_threshold: float = 0.8) -> list[dict]:
"""Remove duplicate facts from new_facts that already exist in the knowledge store.
Uses fingerprint matching for exact dedup and simple overlap check for near-dupes.
"""
existing_fingerprints = set()
existing_texts = []
for f in existing:
fp = fact_fingerprint(f)
existing_fingerprints.add(fp)
existing_texts.append(f.get('fact', '').lower().strip())
unique = []
for fact in new_facts:
fp = fact_fingerprint(fact)
if fp in existing_fingerprints:
continue
# Check for near-duplicates using simple word overlap
fact_words = set(fact.get('fact', '').lower().split())
is_dup = False
for existing_text in existing_texts:
existing_words = set(existing_text.split())
if not fact_words or not existing_words:
continue
overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1)
if overlap >= similarity_threshold:
is_dup = True
break
if not is_dup:
unique.append(fact)
existing_fingerprints.add(fp)
existing_texts.append(fact.get('fact', '').lower().strip())
return unique
def validate_fact(fact: dict) -> bool:
"""Validate a single knowledge item has required fields."""
required = ['fact', 'category', 'repo', 'confidence']
for field in required:
if field not in fact:
return False
if not isinstance(fact['fact'], str) or not fact['fact'].strip():
return False
valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question']
if fact['category'] not in valid_categories:
return False
if not isinstance(fact.get('confidence', 0), (int, float)):
return False
if not (0.0 <= fact['confidence'] <= 1.0):
return False
return True
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""):
"""Write new facts to the knowledge store."""
kdir = Path(knowledge_dir)
kdir.mkdir(parents=True, exist_ok=True)
# Add source tracking to each fact
for fact in new_facts:
fact['source_session'] = source_session
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
# Update index
index['facts'].extend(new_facts)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
index_path = kdir / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
# Also write per-repo markdown files for human reading
repos = {}
for fact in new_facts:
repo = fact.get('repo', 'global')
repos.setdefault(repo, []).append(fact)
for repo, facts in repos.items():
if repo == 'global':
md_path = kdir / "global" / "harvested.md"
else:
md_path = kdir / "repos" / f"{repo}.md"
md_path.parent.mkdir(parents=True, exist_ok=True)
# Append to existing or create new
mode = 'a' if md_path.exists() else 'w'
with open(md_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write(f"# Knowledge: {repo}\n\n")
f.write(f"## Harvested {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n")
for fact in facts:
icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': ''}.get(fact['category'], '')
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
f.write("\n")
def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict:
"""Harvest knowledge from a single session.
Returns: dict with stats (facts_found, facts_new, facts_dup, elapsed_seconds, error)
"""
start_time = time.time()
stats = {
'session': session_path,
'facts_found': 0,
'facts_new': 0,
'facts_dup': 0,
'elapsed_seconds': 0,
'error': None
}
try:
# 1. Read session
messages = read_session(session_path)
if not messages:
stats['error'] = "Empty session file"
return stats
# 2. Extract conversation
conv = extract_conversation(messages)
if not conv:
stats['error'] = "No conversation turns found"
return stats
# 3. Truncate for context window
truncated = truncate_for_context(conv, head=50, tail=50)
transcript = messages_to_text(truncated)
# 4. Load extraction prompt
prompt = load_extraction_prompt()
# 5. Call LLM
raw_facts = call_llm(prompt, transcript, api_base, api_key, model)
if raw_facts is None:
stats['error'] = "LLM extraction failed"
return stats
# 6. Validate
valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence]
stats['facts_found'] = len(valid_facts)
# 7. Deduplicate
existing_index = load_existing_knowledge(knowledge_dir)
existing_facts = existing_index.get('facts', [])
new_facts = deduplicate(valid_facts, existing_facts)
stats['facts_new'] = len(new_facts)
stats['facts_dup'] = len(valid_facts) - len(new_facts)
# 8. Write (unless dry run)
if new_facts and not dry_run:
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
except Exception as e:
stats['error'] = str(e)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> list[dict]:
"""Harvest knowledge from multiple sessions in batch."""
sessions_path = Path(sessions_dir)
if not sessions_path.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
return []
# Find session files
session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) # Newest first
# Filter by date if --since provided
if since:
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
filtered = []
for sf in session_files:
# Try to parse timestamp from filename (common format: session_YYYYMMDD_HHMMSS_hash.jsonl)
try:
parts = sf.stem.split('_')
if len(parts) >= 3:
date_str = parts[1]
file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc)
if file_dt >= since_dt:
filtered.append(sf)
except (ValueError, IndexError):
# If we can't parse the date, include the file (be permissive)
filtered.append(sf)
session_files = filtered
# Apply limit
if limit > 0:
session_files = session_files[:limit]
print(f"Harvesting {len(session_files)} sessions...")
results = []
for i, sf in enumerate(session_files, 1):
print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True)
stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run)
if stats['error']:
print(f"ERROR: {stats['error']}")
else:
print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)")
results.append(stats)
return results
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Path to a single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions')
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files (default: ~/.hermes/sessions)')
parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store')
parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)')
parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)')
parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL')
parser.add_argument('--api-key', default='', help='LLM API key (or set HARVESTER_API_KEY)')
parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction')
parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store')
parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold')
args = parser.parse_args()
# Resolve API key
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key found. Set HARVESTER_API_KEY or store in one of:", file=sys.stderr)
for p in API_KEY_PATHS:
print(f" {p}", file=sys.stderr)
sys.exit(1)
# Resolve knowledge directory
knowledge_dir = args.output
if not os.path.isabs(knowledge_dir):
knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir)
if args.session:
# Single session mode
stats = harvest_session(
args.session, knowledge_dir, args.api_base, api_key, args.model,
dry_run=args.dry_run, min_confidence=args.min_confidence
)
print(json.dumps(stats, indent=2))
if stats['error']:
sys.exit(1)
elif args.batch:
# Batch mode
results = batch_harvest(
args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model,
since=args.since, limit=args.limit, dry_run=args.dry_run
)
total_new = sum(r['facts_new'] for r in results)
total_dup = sum(r['facts_dup'] for r in results)
errors = sum(1 for r in results if r['error'])
print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors")
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

142
scripts/session_reader.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
session_reader.py — Parse Hermes session JSONL transcripts.
Each line in a session file is a JSON object representing a message.
Standard fields: role (user|assistant|system), content (str), timestamp (str).
Tool calls and tool results are also captured.
"""
import json
import sys
from pathlib import Path
from typing import Iterator, Optional
def read_session(path: str) -> list[dict]:
"""Read a session JSONL file and return all messages as a list."""
messages = []
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
messages.append(msg)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
return messages
def read_session_iter(path: str) -> Iterator[dict]:
"""Iterate over session messages without loading all into memory."""
with open(path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
def extract_conversation(messages: list[dict]) -> list[dict]:
"""Extract user/assistant conversation turns, skipping tool-only messages."""
conversation = []
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
# Skip empty messages and pure tool calls
if role in ('user', 'assistant', 'system'):
if isinstance(content, str) and content.strip():
conversation.append({
'role': role,
'content': content.strip(),
'timestamp': msg.get('timestamp', '')
})
elif isinstance(content, list):
# Multimodal content — extract text parts
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part.get('text', ''))
if text_parts:
conversation.append({
'role': role,
'content': '\n'.join(text_parts),
'timestamp': msg.get('timestamp', '')
})
return conversation
def truncate_for_context(messages: list[dict], head: int = 50, tail: int = 50) -> list[dict]:
"""Truncate long sessions: keep first N + last N messages.
This preserves session start (initial context) and end (final results),
skipping the messy middle of long debugging sessions.
"""
if len(messages) <= head + tail:
return messages
truncated = messages[:head]
truncated.append({
'role': 'system',
'content': f'[{len(messages) - head - tail} messages truncated]',
'timestamp': ''
})
truncated.extend(messages[-tail:])
return truncated
def messages_to_text(messages: list[dict]) -> str:
"""Convert message list to plain text for LLM consumption."""
lines = []
for msg in messages:
role = msg.get('role', 'unknown').upper()
content = msg.get('content', '')
if msg.get('role') == 'system' and 'truncated' in content:
lines.append(f'--- {content} ---')
else:
lines.append(f'{role}: {content}')
return '\n\n'.join(lines)
def get_session_metadata(path: str) -> dict:
"""Extract metadata from a session file (first message often has config info)."""
messages = read_session(path)
if not messages:
return {'path': path, 'message_count': 0}
first = messages[0]
last = messages[-1]
return {
'path': path,
'message_count': len(messages),
'first_timestamp': first.get('timestamp', ''),
'last_timestamp': last.get('timestamp', ''),
'first_role': first.get('role', ''),
'has_tool_calls': any(m.get('tool_calls') for m in messages),
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <session.jsonl>")
sys.exit(1)
path = sys.argv[1]
meta = get_session_metadata(path)
print(json.dumps(meta, indent=2))
messages = read_session(path)
conv = extract_conversation(messages)
print(f"\nConversation: {len(conv)} turns")
truncated = truncate_for_context(conv)
print(f"After truncation: {len(truncated)} turns")
print(f"\nPreview (first 500 chars):")
print(messages_to_text(truncated[:5])[:500])

View File

@@ -1,239 +0,0 @@
#!/usr/bin/env python3
"""
Tests for bootstrapper.py — context assembly from knowledge store.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts dir to path for import
sys.path.insert(0, str(Path(__file__).resolve().parent))
from bootstrapper import (
build_bootstrap_context,
estimate_tokens,
filter_facts,
load_index,
sort_facts,
truncate_to_tokens,
)
def make_index(facts: list[dict], tmp_dir: Path) -> Path:
"""Create a temporary index.json with given facts."""
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
def test_empty_index():
"""Empty knowledge store produces graceful output."""
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
index_path = make_index([], tmp_dir)
# Create empty knowledge dirs
for sub in ["repos", "agents", "global"]:
(tmp_dir / sub).mkdir(exist_ok=True)
context = build_bootstrap_context(
repo="the-nexus", index_path=index_path
)
assert "No relevant knowledge found" in context
assert "Starting fresh" in context
print("PASS: empty_index")
def test_filter_by_repo():
"""Filter facts by repository."""
facts = [
{"fact": "A", "category": "fact", "repo": "the-nexus", "confidence": 0.9},
{"fact": "B", "category": "fact", "repo": "fleet-ops", "confidence": 0.8},
{"fact": "C", "category": "fact", "repo": "global", "confidence": 0.7},
]
filtered = filter_facts(facts, repo="the-nexus", include_global=True)
texts = [f["fact"] for f in filtered]
assert "A" in texts
assert "B" not in texts
assert "C" in texts
print("PASS: filter_by_repo")
def test_filter_by_agent():
"""Filter facts by agent type."""
facts = [
{"fact": "A", "category": "pattern", "repo": "global", "agent": "mimo-sprint", "confidence": 0.8},
{"fact": "B", "category": "pattern", "repo": "global", "agent": "groq-fast", "confidence": 0.7},
{"fact": "C", "category": "fact", "repo": "global", "confidence": 0.9},
]
filtered = filter_facts(facts, agent="mimo-sprint", include_global=True)
texts = [f["fact"] for f in filtered]
assert "A" in texts
assert "B" not in texts
assert "C" in texts # global, no agent restriction
print("PASS: filter_by_agent")
def test_no_global_flag():
"""Excluding global facts works."""
facts = [
{"fact": "A", "category": "fact", "repo": "the-nexus", "confidence": 0.9},
{"fact": "B", "category": "fact", "repo": "global", "confidence": 0.8},
]
filtered = filter_facts(facts, repo="the-nexus", include_global=False)
texts = [f["fact"] for f in filtered]
assert "A" in texts
assert "B" not in texts
print("PASS: no_global_flag")
def test_sort_by_confidence():
"""Facts sort by confidence descending."""
facts = [
{"fact": "low", "category": "fact", "repo": "global", "confidence": 0.3},
{"fact": "high", "category": "fact", "repo": "global", "confidence": 0.95},
{"fact": "mid", "category": "fact", "repo": "global", "confidence": 0.7},
]
sorted_f = sort_facts(facts)
assert sorted_f[0]["fact"] == "high"
assert sorted_f[1]["fact"] == "mid"
assert sorted_f[2]["fact"] == "low"
print("PASS: sort_by_confidence")
def test_sort_pitfalls_first():
"""Pitfalls sort before facts at same confidence."""
facts = [
{"fact": "regular fact", "category": "fact", "repo": "global", "confidence": 0.8},
{"fact": "danger pitfall", "category": "pitfall", "repo": "global", "confidence": 0.8},
]
sorted_f = sort_facts(facts)
assert sorted_f[0]["category"] == "pitfall"
print("PASS: sort_pitfalls_first")
def test_truncate_to_tokens():
"""Truncation cuts at line boundary."""
text = "line1\nline2\nline3\nline4\nline5\n"
truncated = truncate_to_tokens(text, max_tokens=2) # ~8 chars
assert "line1" in truncated
assert "truncated" in truncated.lower()
print("PASS: truncate_to_tokens")
def test_estimate_tokens():
"""Token estimation is reasonable."""
text = "a" * 400
tokens = estimate_tokens(text)
assert 90 <= tokens <= 110 # ~100 tokens
print("PASS: estimate_tokens")
def test_build_full_context():
"""Full context with facts renders correctly."""
facts = [
{"fact": "API merges fail with 405", "category": "pitfall", "repo": "the-nexus", "confidence": 0.95},
{"fact": "Has 50+ open PRs", "category": "fact", "repo": "the-nexus", "confidence": 0.9},
{"fact": "Token at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
{"fact": "Check git remote -v first", "category": "pattern", "repo": "global", "confidence": 0.8},
]
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
index_path = make_index(facts, tmp_dir)
# Create knowledge dirs
for sub in ["repos", "agents", "global"]:
(tmp_dir / sub).mkdir(exist_ok=True)
context = build_bootstrap_context(
repo="the-nexus",
agent="mimo-sprint",
include_global=True,
index_path=index_path,
)
assert "What You Know" in context
assert "PITFALLS" in context
assert "API merges fail with 405" in context
assert "the-nexus" in context
assert "Token at" in context # global fact included
print("PASS: build_full_context")
def test_max_tokens_respected():
"""Output respects max_tokens limit."""
# Generate lots of facts
facts = [
{"fact": f"Fact number {i} with some detail about things", "category": "fact", "repo": "global", "confidence": 0.8}
for i in range(100)
]
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
index_path = make_index(facts, tmp_dir)
for sub in ["repos", "agents", "global"]:
(tmp_dir / sub).mkdir(exist_ok=True)
context = build_bootstrap_context(
repo=None,
max_tokens=500,
index_path=index_path,
)
actual_tokens = estimate_tokens(context)
# Allow 10% overshoot since we cut at line boundaries
assert actual_tokens <= 550, f"Expected ~500 tokens, got {actual_tokens}"
print(f"PASS: max_tokens_respected (got {actual_tokens} tokens)")
def test_missing_index_graceful():
"""Missing index.json doesn't crash."""
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
# Don't create index.json
for sub in ["repos", "agents", "global"]:
(tmp_dir / sub).mkdir(exist_ok=True)
fake_index = tmp_dir / "nonexistent.json"
context = build_bootstrap_context(repo="anything", index_path=fake_index)
assert "No relevant knowledge found" in context
print("PASS: missing_index_graceful")
if __name__ == "__main__":
tests = [
test_empty_index,
test_filter_by_repo,
test_filter_by_agent,
test_no_global_flag,
test_sort_by_confidence,
test_sort_pitfalls_first,
test_truncate_to_tokens,
test_estimate_tokens,
test_build_full_context,
test_max_tokens_respected,
test_missing_index_graceful,
]
passed = 0
failed = 0
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f"FAIL: {test.__name__}{e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View File

@@ -1,129 +1,41 @@
#!/usr/bin/env python3
"""
Test harness for knowledge extraction prompt.
Validates output structure, content quality, and hallucination resistance.
Usage:
python3 scripts/test_harvest_prompt.py # Run all tests
python3 scripts/test_harvest_prompt.py --transcript FILE # Test against a real transcript
python3 scripts/test_harvest_prompt.py --validate FILE # Validate an existing extraction JSON
Test script for knowledge extraction prompt.
Validates that the prompt produces consistent, structured output.
"""
import json
import sys
import argparse
from pathlib import Path
VALID_CATEGORIES = {"fact", "pitfall", "pattern", "tool-quirk", "question"}
REQUIRED_FIELDS = {"fact", "category", "repo", "confidence", "evidence"}
REQUIRED_META = {"session_outcome", "tools_used", "repos_touched", "error_count", "knowledge_count"}
def validate_knowledge_item(item, idx):
"""Validate a single knowledge item. Returns list of errors."""
errors = []
if not isinstance(item, dict):
return [f"Item {idx}: not a dict"]
for field in REQUIRED_FIELDS:
def validate_knowledge_item(item):
"""Validate a single knowledge item."""
required_fields = ["fact", "category", "repo", "confidence"]
for field in required_fields:
if field not in item:
errors.append(f"Item {idx}: missing field '{field}'")
if not isinstance(item.get("fact", ""), str) or len(item.get("fact", "").strip()) == 0:
errors.append(f"Item {idx}: fact must be a non-empty string")
if item.get("category") not in VALID_CATEGORIES:
errors.append(f"Item {idx}: invalid category '{item.get('category')}'")
if not isinstance(item.get("repo", ""), str) or len(item.get("repo", "").strip()) == 0:
errors.append(f"Item {idx}: repo must be a non-empty string")
conf = item.get("confidence")
if not isinstance(conf, (int, float)) or not (0.0 <= conf <= 1.0):
errors.append(f"Item {idx}: confidence must be a number 0.0-1.0, got {conf}")
if not isinstance(item.get("evidence", ""), str) or len(item.get("evidence", "").strip()) == 0:
errors.append(f"Item {idx}: evidence must be a non-empty string (hallucination check)")
return errors
return False, f"Missing field: {field}"
if not isinstance(item["fact"], str) or len(item["fact"].strip()) == 0:
return False, "Fact must be a non-empty string"
valid_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
if item["category"] not in valid_categories:
return False, f"Invalid category: {item['category']}"
if not isinstance(item["repo"], str):
return False, "Repo must be a string"
if not isinstance(item["confidence"], (int, float)):
return False, "Confidence must be a number"
if not (0.0 <= item["confidence"] <= 1.0):
return False, "Confidence must be between 0.0 and 1.0"
return True, "Valid"
def validate_extraction(data):
"""Validate a full extraction result. Returns (is_valid, errors, warnings)."""
errors = []
warnings = []
if not isinstance(data, dict):
return False, ["Root is not a JSON object"], []
if "knowledge" not in data:
return False, ["Missing 'knowledge' array"], []
if not isinstance(data["knowledge"], list):
return False, ["'knowledge' is not an array"], []
for i, item in enumerate(data["knowledge"]):
errors.extend(validate_knowledge_item(item, i))
# Meta block validation
if "meta" not in data:
warnings.append("Missing 'meta' block (session_outcome, tools_used, etc.)")
else:
meta = data["meta"]
for field in REQUIRED_META:
if field not in meta:
warnings.append(f"Meta missing field '{field}'")
# Quality checks
facts = data["knowledge"]
if len(facts) == 0:
warnings.append("No knowledge extracted (empty session or extraction failure)")
# Check for near-duplicate facts
seen_facts = set()
for item in facts:
normalized = item.get("fact", "").lower().strip()[:80]
if normalized in seen_facts:
warnings.append(f"Duplicate fact detected: '{normalized[:50]}...'")
seen_facts.add(normalized)
# Check confidence distribution
confidences = [item.get("confidence", 0) for item in facts]
if confidences:
avg_conf = sum(confidences) / len(confidences)
if avg_conf > 0.9:
warnings.append(f"Average confidence {avg_conf:.2f} is suspiciously high (possible hallucination)")
if avg_conf < 0.4:
warnings.append(f"Average confidence {avg_conf:.2f} is very low (extraction may be too cautious)")
return len(errors) == 0, errors, warnings
def validate_transcript_coverage(data, transcript):
"""
Check that extracted facts are actually supported by the transcript.
This is a heuristic — checks that key terms from facts appear in transcript.
Returns list of potential hallucinations.
"""
hallucinations = []
transcript_lower = transcript.lower()
for item in data.get("knowledge", []):
fact = item.get("fact", "")
evidence = item.get("evidence", "")
# Check if evidence string appears in transcript
if evidence and evidence.lower() not in transcript_lower:
# Partial match — check if key terms are present
evidence_words = set(evidence.lower().split())
transcript_words = set(transcript_lower.split())
overlap = evidence_words & transcript_words
if len(overlap) < min(3, len(evidence_words) * 0.3):
hallucinations.append({
"fact": fact[:80],
"evidence": evidence[:80],
"issue": "Evidence not found in transcript — possible hallucination"
})
return hallucinations
# --- Test transcripts ---
SAMPLE_TRANSCRIPT_SUCCESS = """
def test_sample_transcript():
"""Test with a sample transcript."""
sample_transcript = """
User: Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence
Agent: Trying git clone... Error: 401 Unauthorized
Agent: Checking for token... Found in ~/.config/gitea/token
@@ -132,253 +44,30 @@ Agent: Clone successful with token authentication
Agent: Now examining the repo structure...
Agent: Found knowledge/index.json with 0 facts
Agent: Creating harvest-prompt.md in templates/
Agent: Done. Files created: templates/harvest-prompt.md, scripts/test_harvest_prompt.py
"""
SAMPLE_TRANSCRIPT_FAILURE = """
User: Deploy the cron jobs to production
Agent: Connecting to VPS... SSH timeout after 30s
Agent: Retrying with verbose output... Connection refused on port 22
Agent: Checking firewall rules... iptables shows port 22 is open
Agent: Checking sshd config... sshd is not running
Agent: Attempting to start sshd... systemctl start sshd failed: unit not found
Agent: ERROR: OpenSSH server not installed on this VPS
Agent: Cannot proceed without SSH access. Deployment blocked.
"""
SAMPLE_TRANSCRIPT_API_QUIRK = """
User: List all open issues in hermes-agent
Agent: Querying Gitea API... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?state=open&limit=50
Agent: Response: 0 items. That's odd.
Agent: Trying without state filter... endpoint: /repos/Timmy_Foundation/hermes-agent/issues?limit=50
Agent: Response: 50 items returned, but all have pull_request field. These are PRs, not issues.
Agent: Gitea issues API returns PRs alongside issues. Must filter by presence of pull_request key.
Agent: Also discovered: state=open parameter returns empty when used with limit=50, but works with limit=5.
Agent: This appears to be a Gitea API quirk — pagination interacts with state filter unexpectedly.
Agent: Workaround: fetch without state filter, then filter client-side for state=='open' and 'pull_request' not in item.
"""
def run_tests():
"""Run the built-in test suite."""
tests_passed = 0
tests_failed = 0
print("=" * 60)
print("KNOWLEDGE EXTRACTION PROMPT — TEST SUITE")
print("=" * 60)
# Test 1: Prompt file exists and is under 2k tokens (~8k chars)
print("\n[Test 1] Prompt file size constraint")
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
print(" FAIL: harvest-prompt.md not found")
tests_failed += 1
else:
size = prompt_path.stat().st_size
# Rough token estimate: ~4 chars per token
est_tokens = size / 4
print(f" Prompt size: {size} bytes (~{est_tokens:.0f} tokens)")
if est_tokens > 2000:
print(f" WARN: Prompt exceeds ~1500 tokens (target: ~1000)")
else:
print(f" PASS: Within token budget")
tests_passed += 1
# Test 2: Validate a well-formed extraction
print("\n[Test 2] Valid extraction passes validation")
valid_extraction = {
"knowledge": [
{
"fact": "Gitea auth token is at ~/.config/gitea/token",
"category": "tool-quirk",
"repo": "global",
"confidence": 0.9,
"evidence": "Found in ~/.config/gitea/token"
},
{
"fact": "Clone fails with 401 when no token is provided",
"category": "pitfall",
"repo": "compounding-intelligence",
"confidence": 0.9,
"evidence": "Error: 401 Unauthorized"
}
],
"meta": {
"session_outcome": "success",
"tools_used": ["git"],
"repos_touched": ["compounding-intelligence"],
"error_count": 1,
"knowledge_count": 2
}
}
is_valid, errors, warnings = validate_extraction(valid_extraction)
if is_valid:
print(f" PASS: Valid extraction accepted ({len(warnings)} warnings)")
tests_passed += 1
else:
print(f" FAIL: Valid extraction rejected: {errors}")
tests_failed += 1
# Test 3: Reject missing fields
print("\n[Test 3] Missing fields are rejected")
bad_extraction = {
"knowledge": [
{"fact": "Something learned", "category": "fact"} # Missing repo, confidence, evidence
]
}
is_valid, errors, warnings = validate_extraction(bad_extraction)
if not is_valid:
print(f" PASS: Rejected with {len(errors)} errors")
tests_passed += 1
else:
print(f" FAIL: Should have rejected missing fields")
tests_failed += 1
# Test 4: Reject invalid category
print("\n[Test 4] Invalid category is rejected")
bad_cat = {
"knowledge": [
{"fact": "Test", "category": "discovery", "repo": "x", "confidence": 0.8, "evidence": "test"}
]
}
is_valid, errors, warnings = validate_extraction(bad_cat)
if not is_valid and any("category" in e for e in errors):
print(f" PASS: Invalid category 'discovery' rejected")
tests_passed += 1
else:
print(f" FAIL: Should have rejected invalid category")
tests_failed += 1
# Test 5: Detect near-duplicates
print("\n[Test 5] Duplicate detection")
dup_extraction = {
"knowledge": [
{"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "a"},
{"fact": "Token is at ~/.config/gitea/token", "category": "fact", "repo": "x", "confidence": 0.9, "evidence": "b"}
],
"meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 2}
}
is_valid, errors, warnings = validate_extraction(dup_extraction)
if any("Duplicate" in w for w in warnings):
print(f" PASS: Duplicate detected")
tests_passed += 1
else:
print(f" FAIL: Should have detected duplicate")
tests_failed += 1
# Test 6: Hallucination check against transcript
print("\n[Test 6] Hallucination detection")
hallucinated = {
"knowledge": [
{
"fact": "Database port is 5433",
"category": "fact",
"repo": "x",
"confidence": 0.9,
"evidence": "PostgreSQL listening on port 5433"
}
],
"meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 1}
}
hallucinations = validate_transcript_coverage(hallucinated, SAMPLE_TRANSCRIPT_SUCCESS)
if hallucinations:
print(f" PASS: Hallucination detected ({len(hallucinations)} items)")
tests_passed += 1
else:
print(f" FAIL: Should have detected hallucinated evidence")
tests_failed += 1
# Test 7: Failed session should extract pitfalls
print("\n[Test 7] Failed session extraction shape")
failed_extraction = {
"knowledge": [
{
"fact": "SSH server not installed on target VPS",
"category": "pitfall",
"repo": "global",
"confidence": 0.9,
"evidence": "ERROR: OpenSSH server not installed on this VPS"
},
{
"fact": "VPS blocks deployment without SSH access",
"category": "question",
"repo": "global",
"confidence": 0.7,
"evidence": "Cannot proceed without SSH access. Deployment blocked."
}
],
"meta": {
"session_outcome": "failed",
"tools_used": ["ssh", "systemctl"],
"repos_touched": [],
"error_count": 3,
"knowledge_count": 2
}
}
is_valid, errors, warnings = validate_extraction(failed_extraction)
if is_valid:
categories = [item["category"] for item in failed_extraction["knowledge"]]
if "pitfall" in categories:
print(f" PASS: Failed session extracted {len(categories)} items including pitfalls")
tests_passed += 1
else:
print(f" FAIL: Failed session should extract pitfalls")
tests_failed += 1
else:
print(f" FAIL: {errors}")
tests_failed += 1
# Test 8: Empty extraction is warned
print("\n[Test 8] Empty extraction warning")
empty = {"knowledge": [], "meta": {"session_outcome": "success", "tools_used": [], "repos_touched": [], "error_count": 0, "knowledge_count": 0}}
is_valid, errors, warnings = validate_extraction(empty)
if any("No knowledge" in w for w in warnings):
print(f" PASS: Empty extraction warned")
tests_passed += 1
else:
print(f" FAIL: Should warn on empty extraction")
tests_failed += 1
# Summary
print(f"\n{'=' * 60}")
print(f"Results: {tests_passed} passed, {tests_failed} failed")
print(f"{'=' * 60}")
return tests_failed == 0
def validate_file(filepath):
"""Validate an existing extraction JSON file."""
path = Path(filepath)
if not path.exists():
print(f"ERROR: {filepath} not found")
return False
data = json.loads(path.read_text())
is_valid, errors, warnings = validate_extraction(data)
print(f"Validation of {filepath}:")
print(f" Knowledge items: {len(data.get('knowledge', []))}")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
for e in errors:
print(f" ERROR: {e}")
for w in warnings:
print(f" WARN: {w}")
return is_valid
# This would be replaced with actual prompt execution
print("Sample transcript processed")
print("Expected categories: fact, pitfall, pattern, tool-quirk, question")
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test knowledge extraction prompt")
parser.add_argument("--validate", help="Validate an existing extraction JSON file")
parser.add_argument("--transcript", help="Test against a real transcript file (informational)")
args = parser.parse_args()
if args.validate:
success = validate_file(args.validate)
sys.exit(0 if success else 1)
else:
success = run_tests()
sys.exit(0 if success else 1)
print("Testing knowledge extraction prompt...")
# Test 1: Validate prompt file exists
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
print("ERROR: harvest-prompt.md not found")
sys.exit(1)
print(f"OK: Prompt file exists: {prompt_path}")
# Test 2: Check prompt size
prompt_size = prompt_path.stat().st_size
print(f"OK: Prompt size: {prompt_size} bytes")
# Test 3: Test sample transcript processing
if test_sample_transcript():
print("OK: Sample transcript test passed")
print("\nAll tests passed!")

View File

@@ -2,107 +2,98 @@
## System Prompt
You are a knowledge extraction engine. You read session transcripts and output ONLY structured JSON. You never infer. You never assume. You extract only what the transcript explicitly states.
You are a knowledge extraction engine. Your task is to analyze a session transcript and extract durable knowledge that will help future sessions be more efficient.
## Prompt
## Instructions
Read the session transcript carefully. Extract ONLY information that is explicitly stated in the transcript. Do NOT infer, assume, or hallucinate information.
### Categories
Extract knowledge into these categories:
1. **fact**: Concrete, verifiable information learned (e.g., "Repository X has 5 files", "API returns JSON with field Y")
2. **pitfall**: Errors encountered, wrong assumptions, things that wasted time (e.g., "Assumed API token was in env var GITEA_TOKEN, but it's in ~/.config/gitea/token")
3. **pattern**: Successful sequences of actions (e.g., "To deploy: 1. Run tests 2. Build 3. Push to Gitea 4. Trigger webhook")
4. **tool-quirk**: Environment-specific behaviors (e.g., "Token paths are different on macOS vs Linux", "URL format requires trailing slash")
5. **question**: Things identified but not answered (e.g., "Need to determine optimal batch size for harvesting")
### Output Format
Return a JSON object with an array of extracted knowledge items. Each item must have:
```json
{
"fact": "One sentence description of the knowledge",
"category": "fact|pitfall|pattern|tool-quirk|question",
"repo": "Repository name this applies to, or 'global' if general",
"confidence": 0.0-1.0
}
```
TASK: Extract durable knowledge from this session transcript.
RULES:
1. Extract ONLY information explicitly stated in the transcript.
2. Do NOT infer, assume, or hallucinate.
3. Every fact must be verifiable by pointing to a specific line in the transcript.
4. If the session failed or was partial, extract pitfalls and questions — these are the most valuable.
5. Be specific. "Gitea API is slow" is worthless. "Gitea issues endpoint with state=open returns empty when limit=50 but works with limit=5" is knowledge.
### Confidence Scoring
CATEGORIES (assign exactly one per item):
- fact: Concrete, verifiable thing learned (paths, formats, counts, configs)
- pitfall: Error hit, wrong assumption, time wasted, thing that didn't work
- pattern: Successful sequence that should be reused (deploy steps, debug flow)
- tool-quirk: Environment-specific behavior (token paths, URL formats, API gotchas)
- question: Something identified but not answered — the NEXT agent should investigate
- 0.9-1.0: Explicitly stated with verification (e.g., "Error message shows X")
- 0.7-0.8: Clearly implied by multiple data points
- 0.5-0.6: Suggested but not fully verified
- 0.3-0.4: Inferred from limited data
- 0.1-0.2: Speculative or uncertain
CONFIDENCE:
- 0.9: Directly observed with error output or explicit verification
- 0.7: Multiple data points confirm, but not explicitly verified
- 0.5: Suggested by context, not tested
- 0.3: Inferred from limited evidence
### Constraints
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
1. **No hallucination**: Only extract what's explicitly in the transcript
2. **Specificity**: Each fact must be specific and actionable
3. **Relevance**: Only extract knowledge that would help future sessions
4. **Brevity**: One sentence per fact
5. **Partial sessions**: Even failed or incomplete sessions may contain valuable pitfalls
### Example Input/Output
**Input Transcript (excerpt):**
```
User: Clone the repo at forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence
Agent: Trying git clone... Error: 401 Unauthorized
Agent: Checking for token... Found in ~/.config/gitea/token
Agent: Token is gitea_token format, not OAuth
Agent: Clone successful with token authentication
```
**Output:**
```json
{
"knowledge": [
{
"fact": "One specific sentence of knowledge",
"category": "fact|pitfall|pattern|tool-quirk|question",
"repo": "repo-name or global",
"confidence": 0.0-1.0,
"evidence": "Brief quote or reference from transcript that supports this"
"fact": "Gitea repo at forge.alexanderwhitestone.com requires authentication for cloning",
"category": "fact",
"repo": "compounding-intelligence",
"confidence": 0.9
},
{
"fact": "Gitea authentication token is stored at ~/.config/gitea/token",
"category": "tool-quirk",
"repo": "global",
"confidence": 0.9
},
{
"fact": "Gitea uses gitea_token format, not OAuth for API access",
"category": "tool-quirk",
"repo": "global",
"confidence": 0.8
},
{
"fact": "Clone fails with 401 when no token is provided",
"category": "pitfall",
"repo": "compounding-intelligence",
"confidence": 0.9
}
],
"meta": {
"session_outcome": "success|partial|failed",
"tools_used": ["tool1", "tool2"],
"repos_touched": ["repo1"],
"error_count": 0,
"knowledge_count": 0
}
]
}
TRANSCRIPT:
{{transcript}}
```
## Design Notes
## Final Notes
### Why this works with mimo-v2-pro
Mimo needs:
- Explicit format constraints ("valid JSON only, no markdown")
- Clear category definitions with concrete examples
- Hard rules before soft guidance
- The transcript at the END (so it reads all instructions first)
This prompt front-loads all rules, then gives the transcript last. Mimo follows the pattern.
### Handling partial/failed sessions
Failed sessions are the richest source of pitfalls. The prompt explicitly says:
> "If the session failed or was partial, extract pitfalls and questions — these are the most valuable."
This reframes failure as valuable output, not noise to discard.
### The `evidence` field
Added to the original spec. Every extracted item must cite where in the transcript it came from. This:
- Prevents hallucination (can't cite what isn't there)
- Enables verification (reviewer can check the source)
- Trains confidence calibration (the agent must find evidence, not just claim it)
### Token budget
Target: ~1,000 tokens for the prompt (excluding transcript).
```
System prompt: ~50 tokens
Rules: ~200 tokens
Categories: ~150 tokens
Confidence: ~100 tokens
Output format: ~200 tokens
Design notes: NOT included in prompt (documentation only)
─────────────────────────────
Total prompt: ~700 tokens
```
Leaves ~300 tokens headroom for variable content (transcript insertion, edge cases).
### What this replaces
The v1 prompt had:
- Verbose prose explanations (waste tokens for mimo)
- No `evidence` field (hallucination risk)
- No `meta` block (no session-level metadata)
- No explicit handling of failed sessions
- Example was too long (~150 tokens of example for a 1k prompt)
This v2 is tighter, more structured, and adds the evidence requirement that prevents the #1 failure mode of extraction prompts: generating plausible-sounding facts that aren't in the transcript.
- Process the entire transcript, not just the beginning
- Pay special attention to errors and corrections
- Note any environment-specific details
- Track tool-specific behaviors and quirks
- If the session failed, focus on pitfalls and questions