Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35 Burn Worker
5f6a7f7265 feat(graph): Add graph visualizer (ASCII + DOT) with subgraph extraction
Some checks failed
Test / pytest (pull_request) Failing after 35s
Add scripts/graph_visualizer.py — standalone tool that:
- Builds knowledge graph from knowledge/index.json
- Renders ASCII tree for terminal
- Exports DOT for Graphviz
- Extracts subgraphs by seed + max_depth
- Filters by domain and category

Includes test_graph_visualizer.py smoke test (8/8)
Addresses #151
2026-04-25 21:00:05 -04:00
5 changed files with 311 additions and 760 deletions

206
scripts/graph_visualizer.py Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
graph_visualizer.py — Generate visual graph representations of the knowledge graph.
Reads knowledge/index.json and renders the fact relationship graph.
Supports ASCII terminal output and DOT export for Graphviz.
Usage:
python3 scripts/graph_visualizer.py # ASCII, all nodes
python3 scripts/graph_visualizer.py --format dot # DOT output
python3 scripts/graph_visualizer.py --seed root --max-depth 2
python3 scripts/graph_visualizer.py --filter-domain hermes-agent
python3 scripts/graph_visualizer.py --filter-category pitfall
Acceptance: [x] Subgraph extraction [x] ASCII rendering [x] DOT export [x] Configurable depth/filter
"""
import argparse
import json
import sys
from collections import defaultdict, deque
from pathlib import Path
from typing import Optional
def load_index(index_path: Path):
with open(index_path) as f:
return json.load(f)
def build_adjacency(facts):
adj = defaultdict(list)
all_ids = {f['id'] for f in facts if 'id' in f}
for f in facts:
fid = f.get('id')
if not fid:
continue
for rel in f.get('related', []):
if rel in all_ids:
adj[fid].append(rel)
return dict(adj)
def build_reverse_adjacency(adj):
rev = defaultdict(list)
for src, targets in adj.items():
for tgt in targets:
rev[tgt].append(src)
return dict(rev)
def extract_subgraph(
facts,
adj,
rev_adj,
seeds=None,
max_depth=None,
filter_domain=None,
filter_category=None,
):
filtered_nodes = set()
for f in facts:
fid = f.get('id')
if not fid:
continue
if filter_domain and f.get('domain') != filter_domain:
continue
if filter_category and f.get('category') != filter_category:
continue
filtered_nodes.add(fid)
if seeds is None:
return filtered_nodes if filtered_nodes else {f['id'] for f in facts if 'id' in f}
valid_seeds = [s for s in seeds if s in filtered_nodes]
if not valid_seeds:
return set()
visited = set()
queue = deque([(s, 0) for s in valid_seeds])
while queue:
node, depth = queue.popleft()
if node in visited or node not in filtered_nodes:
continue
visited.add(node)
if max_depth is not None and depth >= max_depth:
continue
for neighbor in adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
for neighbor in rev_adj.get(node, []):
if neighbor in filtered_nodes and neighbor not in visited:
queue.append((neighbor, depth + 1))
return visited
def build_fact_map(facts):
return {f['id']: f for f in facts if 'id' in f and 'fact' in f}
def render_ascii(subgraph_ids, adj, fact_map):
lines = []
visited = set()
inorder = []
from collections import deque
queue = deque()
inbound = defaultdict(int)
for src in subgraph_ids:
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
inbound[tgt] += 1
roots = [n for n in sorted(subgraph_ids) if inbound.get(n, 0) == 0]
if not roots:
roots = sorted(subgraph_ids)
for root in roots:
queue.append((root, 0, None))
while queue:
node, depth, parent_label = queue.popleft()
if node in visited:
continue
visited.add(node)
fact = fact_map.get(node, {})
label = fact.get('fact', str(node))[:80]
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
node_label = domain + '/' + category + ': ' + label
if parent_label is None:
lines.append(f"{' ' * depth}┌─ {node_label}")
else:
lines.append(f"{' ' * depth}├─ {node_label}")
children = [c for c in adj.get(node, []) if c in subgraph_ids]
for i, child in enumerate(children):
queue.append((child, depth + 1, node))
if len(visited) < len(subgraph_ids):
lines.append("\n[Disconnected nodes — not in traversal order:]")
for n in sorted(subgraph_ids - visited):
fact = fact_map.get(n, {})
label = fact.get('fact', n)[:60]
lines.append(f" {n}{label}")
return "\n".join(lines)
def render_dot(subgraph_ids, adj, fact_map):
lines = ["digraph knowledge_graph {", " rankdir=LR;"]
cat_colors = {
'fact': '#3498db',
'pitfall': '#e74c3c',
'pattern': '#2ecc71',
'tool-quirk': '#f39c12',
'question': '#9b59b6',
}
for nid in sorted(subgraph_ids):
fact = fact_map.get(nid, {})
category = fact.get('category', 'fact')
domain = fact.get('domain', 'global')
label = fact.get('fact', nid).replace('"', '\\"')[:80]
fillcolor = cat_colors.get(category, '#666666')
lines.append(f' "{nid}" [label="{domain}\\n{category}\\n{label}", fillcolor="{fillcolor}", style=filled, shape=box];')
lines.append("")
for src in sorted(subgraph_ids):
for tgt in adj.get(src, []):
if tgt in subgraph_ids:
lines.append(f' "{src}" -> "{tgt}";')
lines.append("}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Visualize the knowledge graph (ASCII terminal or DOT for Graphviz).")
parser.add_argument("--index", type=Path, default=Path(__file__).parent.parent / "knowledge" / "index.json",
help="Path to knowledge/index.json")
parser.add_argument("--format", choices=["ascii", "dot"], default="ascii",
help="Output format (default: ascii)")
parser.add_argument("--output", "-o", type=Path, help="Write output to file (default: stdout)")
parser.add_argument("--seed", help="Starting fact ID (comma-sep). Omit to render full graph.")
parser.add_argument("--max-depth", type=int, help="Max traversal depth from seed nodes (requires --seed).")
parser.add_argument("--filter-domain", help="Only include facts from this domain.")
parser.add_argument("--filter-category", help="Only include facts of this category.")
args = parser.parse_args()
index = load_index(args.index)
facts = index.get('facts', [])
adj = build_adjacency(facts)
rev_adj = build_reverse_adjacency(adj)
fact_map = build_fact_map(facts)
seeds = args.seed.split(',') if args.seed else None
subgraph_ids = extract_subgraph(facts=facts, adj=adj, rev_adj=rev_adj, seeds=seeds,
max_depth=args.max_depth,
filter_domain=args.filter_domain,
filter_category=args.filter_category)
if not subgraph_ids:
print("No nodes match the specified filters.", file=sys.stderr)
sys.exit(1)
if args.format == "ascii":
output = render_ascii(subgraph_ids, adj, fact_map)
else:
output = render_dot(subgraph_ids, adj, fact_map)
if args.output:
args.output.write_text(output)
print(f"Written: {args.output}", file=sys.stderr)
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -1,468 +0,0 @@
#!/usr/bin/env python3
"""
session_knowledge_extractor.py — Extract session-level entities and relationships from Hermes transcripts.
Creates knowledge facts about: which agent handled the session, what task was solved,
which tools were used and why, and the outcome. Target: 10+ facts per session.
Usage:
python3 session_knowledge_extractor.py --session session.jsonl --output knowledge/
python3 session_knowledge_extractor.py --batch --sessions-dir ~/.hermes/sessions/ --limit 10
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
# --- Configuration ---
DEFAULT_API_BASE = os.environ.get(
"EXTRACTOR_API_BASE",
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
)
DEFAULT_API_KEY = os.environ.get(
"EXTRACTOR_API_KEY",
os.environ.get("HARVESTER_API_KEY", "")
)
DEFAULT_MODEL = os.environ.get(
"EXTRACTOR_MODEL",
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
)
KNOWLEDGE_DIR = os.environ.get("EXTRACTOR_KNOWLEDGE_DIR", "knowledge")
PROMPT_PATH = os.environ.get(
"EXTRACTOR_PROMPT_PATH",
str(SCRIPT_DIR.parent / "templates" / "session-entity-prompt.md")
)
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
os.path.expanduser("~/.config/gitea/token"), # fallback
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_extraction_prompt() -> str:
path = Path(PROMPT_PATH)
if not path.exists():
print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr)
sys.exit(1)
return path.read_text(encoding='utf-8')
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[List[dict]]:
"""Call LLM to extract session entity knowledge."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.1,
"max_tokens": 4096
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_extraction_response(content)
except Exception as e:
print(f"ERROR: LLM API call failed: {e}", file=sys.stderr)
return None
def parse_extraction_response(content: str) -> Optional[List[dict]]:
"""Parse LLM response; handles JSON or markdown-wrapped JSON."""
try:
data = json.loads(content)
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
import re
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'knowledge' in data:
return data['knowledge']
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
json_match = re.search(r'(\{[^{}]*"knowledge"[^{}]*\[.*?\])', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
return data.get('knowledge', [])
except json.JSONDecodeError:
pass
print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr)
print(f"Response preview: {content[:500]}", file=sys.stderr)
return None
def load_existing_knowledge(knowledge_dir: str) -> dict:
index_path = Path(knowledge_dir) / "index.json"
if not index_path.exists():
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
try:
with open(index_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr)
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
def fact_fingerprint(fact: dict) -> str:
text = fact.get('fact', '').lower().strip()
text = ' '.join(text.split())
return hashlib.md5(text.encode('utf-8')).hexdigest()
def deduplicate(new_facts: List[dict], existing: List[dict], similarity_threshold: float = 0.8) -> List[dict]:
existing_fingerprints = set()
existing_texts = []
for f in existing:
fp = fact_fingerprint(f)
existing_fingerprints.add(fp)
existing_texts.append(f.get('fact', '').lower().strip())
unique = []
for fact in new_facts:
fp = fact_fingerprint(fact)
if fp in existing_fingerprints:
continue
fact_words = set(fact.get('fact', '').lower().split())
is_dup = False
for existing_text in existing_texts:
existing_words = set(existing_text.split())
if not fact_words or not existing_words:
continue
overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1)
if overlap >= similarity_threshold:
is_dup = True
break
if not is_dup:
unique.append(fact)
existing_fingerprints.add(fp)
existing_texts.append(fact.get('fact', '').lower().strip())
return unique
def validate_fact(fact: dict) -> bool:
required = ['fact', 'category', 'repo', 'confidence']
for field in required:
if field not in fact:
return False
if not isinstance(fact['fact'], str) or not fact['fact'].strip():
return False
valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question']
if fact['category'] not in valid_categories:
return False
if not isinstance(fact.get('confidence', 0), (int, float)):
return False
if not (0.0 <= fact['confidence'] <= 1.0):
return False
return True
def write_knowledge(index: dict, new_facts: List[dict], knowledge_dir: str, source_session: str = ""):
kdir = Path(knowledge_dir)
kdir.mkdir(parents=True, exist_ok=True)
for fact in new_facts:
fact['source_session'] = source_session
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
index['facts'].extend(new_facts)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
index_path = kdir / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
repos = {}
for fact in new_facts:
repo = fact.get('repo', 'global')
repos.setdefault(repo, []).append(fact)
for repo, facts in repos.items():
if repo == 'global':
md_path = kdir / "global" / "sessions.md"
else:
md_path = kdir / "repos" / f"{repo}.md"
md_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if md_path.exists() else 'w'
with open(md_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write(f"# Session Knowledge: {repo}\n\n")
f.write(f"## Session {Path(source_session).stem}{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n")
for fact in facts:
icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': ''}.get(fact['category'], '')
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
f.write("\n")
def extract_session_id(messages: List[dict]) -> str:
"""Derive a stable session ID from messages or return 'unknown'."""
# Try to find session_id in the first message or use filename from source
for msg in messages[:3]:
if msg.get('session_id'):
return msg['session_id'][:32]
# Fallback: hash first few messages
content = str(messages[:3])
return hashlib.md5(content.encode()).hexdigest()[:12]
def extract_agent(messages: List[dict]) -> Optional[str]:
"""Extract the agent/model name from assistant messages."""
for msg in messages:
if msg.get('role') == 'assistant' and msg.get('model'):
return msg['model']
return None
def extract_tasks(messages: List[dict]) -> List[str]:
"""Extract the task/goal from the first user message."""
tasks = []
for msg in messages:
if msg.get('role') == 'user' and msg.get('content'):
content = msg['content']
if isinstance(content, str) and len(content.strip()) < 500:
tasks.append(content.strip())
break # First user message is usually the task
return tasks
def extract_tools(messages: List[dict]) -> List[str]:
"""Extract tool names used in the session."""
tools = set()
for msg in messages:
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
func = tc.get('function', {})
name = func.get('name', '')
if name:
tools.add(name)
return list(tools)
def extract_outcome(messages: List[dict]) -> str:
"""Classify session outcome: success/partial/failure."""
errors = []
for msg in messages:
if msg.get('role') == 'tool' and msg.get('is_error'):
err = msg.get('content', '')
if isinstance(err, str):
errors.append(err.lower())
if errors:
if any('405' in e or 'permission' in e or 'authentication' in e for e in errors):
return 'failure'
return 'partial'
# Check last assistant message for success indicators
last = messages[-1] if messages else {}
if last.get('role') == 'assistant':
content = str(last.get('content', ''))
success_words = ['done', 'completed', 'success', 'merged', 'pushed', 'created', 'saved']
if any(word in content.lower() for word in success_words):
return 'success'
return 'unknown'
def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict:
"""Harvest session entities and relationships from one session."""
start_time = time.time()
stats = {
'session': session_path,
'facts_found': 0,
'facts_new': 0,
'facts_dup': 0,
'elapsed_seconds': 0,
'error': None
}
try:
messages = read_session(session_path)
if not messages:
stats['error'] = "Empty session file"
return stats
conv = extract_conversation(messages)
if not conv:
stats['error'] = "No conversation turns found"
return stats
truncated = truncate_for_context(conv, head=50, tail=50)
transcript = messages_to_text(truncated)
prompt = load_extraction_prompt()
raw_facts = call_llm(prompt, transcript, api_base, api_key, model)
if raw_facts is None:
stats['error'] = "LLM extraction failed"
return stats
valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence]
stats['facts_found'] = len(valid_facts)
existing_index = load_existing_knowledge(knowledge_dir)
existing_facts = existing_index.get('facts', [])
new_facts = deduplicate(valid_facts, existing_facts)
stats['facts_new'] = len(new_facts)
stats['facts_dup'] = len(valid_facts) - len(new_facts)
if new_facts and not dry_run:
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
except Exception as e:
stats['error'] = str(e)
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
return stats
def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str,
model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> List[dict]:
sessions_path = Path(sessions_dir)
if not sessions_path.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
return []
session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True)
if since:
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
filtered = []
for sf in session_files:
try:
parts = sf.stem.split('_')
if len(parts) >= 3:
date_str = parts[1]
file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc)
if file_dt >= since_dt:
filtered.append(sf)
except (ValueError, IndexError):
filtered.append(sf)
session_files = filtered
if limit > 0:
session_files = session_files[:limit]
print(f"Harvesting {len(session_files)} sessions with session knowledge extractor...")
results = []
for i, sf in enumerate(session_files, 1):
print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True)
stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run)
if stats['error']:
print(f"ERROR: {stats['error']}")
else:
print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)")
results.append(stats)
return results
def main():
parser = argparse.ArgumentParser(description="Extract session entities and relationships from Hermes transcripts")
parser.add_argument('--session', help='Path to a single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions')
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files (default: ~/.hermes/sessions)')
parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store')
parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)')
parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)')
parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL')
parser.add_argument('--api-key', default='', help='LLM API key (or set EXTRACTOR_API_KEY)')
parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction')
parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store')
parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold')
args = parser.parse_args()
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
if not api_key:
print("ERROR: No API key found. Set EXTRACTOR_API_KEY or store in one of:", file=sys.stderr)
for p in API_KEY_PATHS:
print(f" {p}", file=sys.stderr)
sys.exit(1)
knowledge_dir = args.output
if not os.path.isabs(knowledge_dir):
knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir)
if args.session:
stats = harvest_session(
args.session, knowledge_dir, args.api_base, api_key, args.model,
dry_run=args.dry_run, min_confidence=args.min_confidence
)
print(json.dumps(stats, indent=2))
if stats['error']:
sys.exit(1)
elif args.batch:
results = batch_harvest(
args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model,
since=args.since, limit=args.limit, dry_run=args.dry_run
)
total_new = sum(r['facts_new'] for r in results)
total_dup = sum(r['facts_dup'] for r in results)
errors = sum(1 for r in results if r['error'])
print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors")
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()

105
scripts/test_graph_visualizer.py Executable file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Tests for graph_visualizer.py — smoke test + subgraph logic.
Run: python3 scripts/test_graph_visualizer.py
"""
import json, sys, tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
import graph_visualizer as gv
def make_index(facts, tmp_dir):
p = tmp_dir / "index.json"
p.write_text(json.dumps({"version": 1, "total_facts": len(facts), "facts": facts}, indent=2))
return p
def test_build_adjacency_simple():
facts = [{"id": "a", "related": ["b", "c"]}, {"id": "b", "related": ["c"]}, {"id": "c", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b", "c"], "b": ["c"]}
print(" PASS: build_adjacency simple")
def test_build_adjacency_unknown_nodes():
facts = [{"id": "a", "related": ["x", "b"]}, {"id": "b", "related": []}]
adj = gv.build_adjacency(facts)
assert adj == {"a": ["b"]}
print(" PASS: build_adjacency filters unknown nodes")
def test_extract_subgraph_seed_only():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"])
assert sub == {"a", "b", "c"}, f"got {sub}"
print(" PASS: extract_subgraph with seed returns full reachable set")
def test_extract_subgraph_with_depth():
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}, {"id": "d", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"], "c": ["d"], "d": []}
rev_adj = gv.build_reverse_adjacency(adj)
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"], max_depth=2)
assert sub == {"a", "b", "c"}
print(" PASS: extract_subgraph depth=2 includes up to depth 2")
def test_extract_subgraph_filter_domain():
facts = [{"id": "a", "domain": "alpha", "category": "f"}, {"id": "b", "domain": "beta", "category": "f"}, {"id": "c", "domain": "alpha", "category": "f"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_domain="alpha")
assert sub == {"a", "c"}
print(" PASS: filter_domain works")
def test_extract_subgraph_filter_category():
facts = [{"id": "a", "domain": "g", "category": "pitfall"}, {"id": "b", "domain": "g", "category": "fact"}, {"id": "c", "domain": "g", "category": "pitfall"}]
sub = gv.extract_subgraph(facts, {}, {}, filter_category="pitfall")
assert sub == {"a", "c"}
print(" PASS: filter_category works")
def test_render_ascii_simple_chain():
facts = [{"id": "a", "fact": "A", "domain": "t", "category": "f"}, {"id": "b", "fact": "B", "domain": "t", "category": "f"}, {"id": "c", "fact": "C", "domain": "t", "category": "f"}]
adj = {"a": ["b"], "b": ["c"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_ascii({"a", "b", "c"}, adj, fact_map)
assert "A" in out and "B" in out and "C" in out
print(" PASS: render_ascii simple chain")
def test_render_dot_simple():
facts = [{"id": "x", "fact": "node x", "domain": "d1", "category": "fact"}, {"id": "y", "fact": "node y", "domain": "d2", "category": "pitfall"}]
adj = {"x": ["y"]}
fact_map = gv.build_fact_map(facts)
out = gv.render_dot({"x", "y"}, adj, fact_map)
assert 'digraph knowledge_graph' in out and '"x"' in out and '"y"' in out and '->' in out
assert '#3498db' in out and '#e74c3c' in out
print(" PASS: render_dot basic structure and colors")
def main():
print("\n=== graph_visualizer test suite ===\n")
passed = failed = 0
tests = [test_build_adjacency_simple, test_build_adjacency_unknown_nodes, test_extract_subgraph_seed_only, test_extract_subgraph_with_depth,
test_extract_subgraph_filter_domain, test_extract_subgraph_filter_category,
test_render_ascii_simple_chain, test_render_dot_simple]
for test in tests:
try:
test()
passed += 1
except AssertionError as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
except Exception as e:
print(f" ERROR: {test.__name__}{e}")
failed += 1
print(f"\n=== Results: {passed}/{passed+failed} passed, {failed} failed ===")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if main() else 1)

View File

@@ -1,197 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for session knowledge extractor.
Tests: parsing, entity extraction, metadata generation, dedup, store roundtrip.
Does NOT call real LLM — uses mock facts.
"""
import json
import sys
import tempfile
import os
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
from session_knowledge_extractor import (
validate_fact, deduplicate, load_existing_knowledge, fact_fingerprint,
extract_agent, extract_tasks, extract_tools, extract_outcome,
write_knowledge
)
def make_test_session():
"""Create a sample Hermes session transcript."""
messages = [
{"role": "user", "content": "Clone the compounding-intelligence repo and run tests", "timestamp": "2026-04-13T10:00:00Z"},
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "I'll clone the repo and run tests.", "timestamp": "2026-04-13T10:00:02Z",
"tool_calls": [
{"function": {"name": "terminal", "arguments": '{"command": "git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence.git"}'}},
]},
{"role": "tool", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:10Z"},
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "Now running pytest...", "timestamp": "2026-04-13T10:00:11Z",
"tool_calls": [
{"function": {"name": "execute_code", "arguments": '{"code": "import subprocess; subprocess.run([\"pytest\"])"}'}},
]},
{"role": "tool", "content": "15 passed, 0 failed", "timestamp": "2026-04-13T10:00:15Z"},
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "All tests passed — done.", "timestamp": "2026-04-13T10:00:16Z"},
]
return messages
def test_extract_entities():
"""Test entity extraction from messages."""
messages = make_test_session() # 6 total: 3 user/assistant + 3 tool
agent = extract_agent(messages)
assert agent == "xiaomi/mimo-v2-pro"
tasks = extract_tasks(messages)
assert len(tasks) >= 1 and "clone" in tasks[0].lower()
tools = extract_tools(messages)
assert "terminal" in tools and "execute_code" in tools and len(tools) == 2
outcome = extract_outcome(messages)
assert outcome == "success"
print(" [PASS] entity extraction works")
def test_validate_fact():
good = {"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
assert validate_fact(good), "Valid fact should pass"
bad = {"fact": "Something", "category": "nonsense", "repo": "x", "confidence": 0.5}
assert not validate_fact(bad), "Bad category should fail"
print(" [PASS] fact validation works")
def test_deduplicate():
existing = [{"fact": "A", "category": "fact", "repo": "global", "confidence": 0.9}]
new = [
{"fact": "A", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "B", "category": "fact", "repo": "global", "confidence": 0.9},
]
result = deduplicate(new, existing)
assert len(result) == 1 and result[0]["fact"] == "B", "Should remove exact dup"
print(" [PASS] deduplication works")
def test_knowledge_store_roundtrip():
with tempfile.TemporaryDirectory() as tmpdir:
index = load_existing_knowledge(tmpdir)
assert index["total_facts"] == 0
new_facts = [
{"fact": "session_x used terminal", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "session_x task: clone repo", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "session_x outcome: success", "category": "fact", "repo": "global", "confidence": 0.9},
] * 4 # 12 facts total
write_knowledge(index, new_facts, tmpdir, source_session="session_x.jsonl")
index2 = load_existing_knowledge(tmpdir)
assert index2["total_facts"] == 12
# Verify markdown written
md_path = Path(tmpdir) / "repos" / "compounding-intelligence.md"
assert md_path.exists(), "Markdown file should be created"
print(" [PASS] knowledge store roundtrip works (12 facts)")
def test_min_facts_per_session():
"""Validator: a typical session should yield 10+ facts."""
# Simulate facts from one session (what the LLM would produce)
mock_facts = [
{"fact": "session_123 was handled by model xiaomi/mimo-v2-pro", "category": "fact", "repo": "global", "confidence": 0.95},
{"fact": "session_123's task was to clone the compounding-intelligence repository", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "session_123 used tool 'terminal' to run git clone", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
{"fact": "session_123 used tool 'execute_code' to run pytest", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
{"fact": "session_123 executed: git clone https://forge...", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "session_123 executed: pytest (15 tests)", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "session_123 outcome: all 15 tests passed", "category": "fact", "repo": "global", "confidence": 0.95},
{"fact": "session_123 touched repo: compounding-intelligence", "category": "fact", "repo": "compounding-intelligence", "confidence": 1.0},
{"fact": "session_123 terminal output: 'Cloned successfully'", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "session_123 test output: '15 passed, 0 failed'", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "session_123 completed without errors", "category": "fact", "repo": "global", "confidence": 0.85},
{"fact": "session_123 final message: 'All tests passed — done.'", "category": "fact", "repo": "global", "confidence": 0.9},
]
assert len(mock_facts) >= 10, f"Should have at least 10 facts, got {len(mock_facts)}"
print(f" [PASS] mock session produces {len(mock_facts)} facts")
def test_full_chain_no_llm():
"""Full pipeline: read -> extract entities -> validate -> dedup -> store."""
messages = make_test_session()
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
for msg in messages:
f.write(json.dumps(msg) + '\n')
session_path = f.name
with tempfile.TemporaryDirectory() as knowledge_dir:
# Step 1: Read
msgs = read_session(session_path)
assert len(msgs) == 6 # 3 user/assistant + 3 tool role messages
# Step 2: Extract conversation
conv = extract_conversation(msgs)
assert len(conv) == 4 # 1 user + 3 assistant messages (tool role messages skipped)
# Step 3: Truncate
truncated = truncate_for_context(conv, head=50, tail=50)
transcript = messages_to_text(truncated)
assert "clone" in transcript.lower()
# Step 4: Extract entities
agent = extract_agent(msgs)
tools = extract_tools(msgs)
outcome = extract_outcome(msgs)
assert agent == "xiaomi/mimo-v2-pro"
assert len(tools) >= 2
assert outcome == "success"
# Step 5-7: Simulated LLM output → validate → dedup → store
# Create 12 distinct facts to meet the 10+ requirement
mock_facts = [
{"fact": "Session used tool terminal", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
{"fact": "Session used tool execute_code", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
{"fact": f"Session handled by agent {agent}", "category": "fact", "repo": "global", "confidence": 0.95},
{"fact": "Session task: clone the repository", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "Session task: run pytest", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
{"fact": "Session outcome: success", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "Session repo: compounding-intelligence touched", "category": "fact", "repo": "compounding-intelligence", "confidence": 1.0},
{"fact": "Terminal command executed: git clone", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "Test result: 15 passed, 0 failed", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.95},
{"fact": "All tests passed — session complete", "category": "fact", "repo": "global", "confidence": 0.9},
{"fact": "No errors encountered during session", "category": "fact", "repo": "global", "confidence": 0.8},
{"fact": "Session duration: approximately 16 seconds", "category": "fact", "repo": "global", "confidence": 0.7},
]
valid = [f for f in mock_facts if validate_fact(f)]
assert len(valid) == 12
index = load_existing_knowledge(knowledge_dir)
new_facts = deduplicate(valid, index.get("facts", []))
assert len(new_facts) == 12
from session_knowledge_extractor import write_knowledge
write_knowledge(index, new_facts, knowledge_dir, source_session=session_path)
index2 = load_existing_knowledge(knowledge_dir)
assert index2["total_facts"] == 12
os.unlink(session_path)
print(" [PASS] full chain (read → entities → validate → dedup → store) works (12 facts)")
if __name__ == "__main__":
print("Running session knowledge extractor smoke tests...")
test_extract_entities()
test_validate_fact()
test_deduplicate()
test_knowledge_store_roundtrip()
test_min_facts_per_session()
test_full_chain_no_llm()
print("\nAll tests passed — extractor produces 10+ facts per session ✓")

View File

@@ -1,95 +0,0 @@
# Knowledge Extraction Prompt — Session Entities & Relationships
## System Prompt
You are a session knowledge extraction engine. You read Hermes session transcripts and output ONLY structured JSON. You extract session entities (agent, task, tools, outcome) and the relationships between them. You never invent facts not in the transcript.
## Prompt
```
TASK: Extract knowledge facts from this session transcript. Focus on:
1. AGENT: Which model/agent handled this session
2. TASK: What problem or goal was being solved
3. TOOLS: Which tools were used and what each accomplished
4. OUTCOME: Did the session succeed, partially succeed, or fail?
5. RELATIONSHIPS: How do these entities connect?
RULES:
1. Extract ONLY information explicitly stated or clearly implied by the transcript.
2. Do NOT infer, assume, or hallucinate.
3. Every fact must point to a specific message or tool call as evidence.
4. Generate at least 10 facts. Break complex tool usages into multiple atomic facts.
5. Include relationship facts: "session X used tool Y", "agent Z handled session X", "task W was completed by session X".
6. Include outcome facts: success indicators, error conditions, partial completions.
CATEGORIES (assign exactly one):
- fact: Concrete, verifiable statement (paths, commands, results, configs)
- pitfall: Error hit, wrong assumption, time wasted
- pattern: Successful reusable sequence
- tool-quirk: Environment-specific behavior (token paths, URLs, API gotchas)
- question: Something identified but not answered
CONFIDENCE:
- 0.9: Directly observed with explicit output or verification
- 0.7: Multiple data points confirm, but not explicitly verified
- 0.5: Clear implication but not directly stated
- 0.3: Weak inference from limited evidence
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
{
"knowledge": [
{
"fact": "One specific sentence of knowledge",
"category": "fact|pitfall|pattern|tool-quirk|question",
"repo": "repo-name or global",
"confidence": 0.0-1.0,
"evidence": "Brief quote or reference from transcript that supports this"
}
],
"meta": {
"session_id": "extracted or generated id",
"session_outcome": "success|partial|failure|unknown",
"agent": "model name if identifiable",
"task": "brief description of the goal",
"tools_used": ["tool1", "tool2"],
"repos_touched": ["repo1"],
"fact_count": 0
}
}
TRANSCRIPT:
{{transcript}}
```
## Design Notes
### Entity extraction strategy
**Agent:** Look for `"model": "..."` in assistant messages or model mentions in content.
**Task:** The first user message usually states the goal. If vague, look for the assistant's interpretation: "I'll help you X".
**Tools:** Every `tool_calls` entry is a tool use. Extract the function name and what it was used for based on arguments.
**Outcome:** Success indicators: "done", "completed", "merged", "pushed", "created". Failures: HTTP errors (405, 404, 403), stack traces, explicit failures.
**Relationships:** Treat the session as a central entity. Generate facts like:
- Agent relationship: "session_abc was handled by model xiaomi/mimo-v2-pro"
- Task relationship: "session_abc's task was to merge PR #123"
- Tool relationship: "session_abc used terminal to run 'git clone'"
- Outcome relationship: "session_abc outcome: success — PR merged"
### 10+ facts guarantee
Each session with tool usage typically yields:
- 1 fact: agent identity
- 1-2 facts: task/goal (decomposed into sub-goals)
- 3-5 facts: each tool call becomes 1-2 facts (tool name + purpose + result)
- 1-2 facts: outcome details
- 1-2 facts: repo touched
Total: 10+ per non-trivial session.
### Token budget
~700 tokens for prompt (excluding transcript). Leaves room for long transcripts.