#!/usr/bin/env python3 """ session_knowledge_extractor.py — Extract session-level entities and relationships from Hermes transcripts. Creates knowledge facts about: which agent handled the session, what task was solved, which tools were used and why, and the outcome. Target: 10+ facts per session. Usage: python3 session_knowledge_extractor.py --session session.jsonl --output knowledge/ python3 session_knowledge_extractor.py --batch --sessions-dir ~/.hermes/sessions/ --limit 10 """ import argparse import json import os import sys import time import hashlib from datetime import datetime, timezone from pathlib import Path from typing import Optional, List, Dict, Any SCRIPT_DIR = Path(__file__).parent.absolute() sys.path.insert(0, str(SCRIPT_DIR)) from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text # --- Configuration --- DEFAULT_API_BASE = os.environ.get( "EXTRACTOR_API_BASE", os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1") ) DEFAULT_API_KEY = os.environ.get( "EXTRACTOR_API_KEY", os.environ.get("HARVESTER_API_KEY", "") ) DEFAULT_MODEL = os.environ.get( "EXTRACTOR_MODEL", os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro") ) KNOWLEDGE_DIR = os.environ.get("EXTRACTOR_KNOWLEDGE_DIR", "knowledge") PROMPT_PATH = os.environ.get( "EXTRACTOR_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "session-entity-prompt.md") ) API_KEY_PATHS = [ os.path.expanduser("~/.config/nous/key"), os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"), os.path.expanduser("~/.config/openrouter/key"), os.path.expanduser("~/.config/gitea/token"), # fallback ] def find_api_key() -> str: for path in API_KEY_PATHS: if os.path.exists(path): with open(path) as f: key = f.read().strip() if key: return key return "" def load_extraction_prompt() -> str: path = Path(PROMPT_PATH) if not path.exists(): print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr) sys.exit(1) return path.read_text(encoding='utf-8') def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[List[dict]]: """Call LLM to extract session entity knowledge.""" import urllib.request messages = [ {"role": "system", "content": prompt}, {"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"} ] payload = json.dumps({ "model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096 }).encode('utf-8') req = urllib.request.Request( f"{api_base}/chat/completions", data=payload, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, method="POST" ) try: with urllib.request.urlopen(req, timeout=60) as resp: result = json.loads(resp.read().decode('utf-8')) content = result["choices"][0]["message"]["content"] return parse_extraction_response(content) except Exception as e: print(f"ERROR: LLM API call failed: {e}", file=sys.stderr) return None def parse_extraction_response(content: str) -> Optional[List[dict]]: """Parse LLM response; handles JSON or markdown-wrapped JSON.""" try: data = json.loads(content) if isinstance(data, dict) and 'knowledge' in data: return data['knowledge'] if isinstance(data, list): return data except json.JSONDecodeError: pass import re json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: try: data = json.loads(json_match.group(1)) if isinstance(data, dict) and 'knowledge' in data: return data['knowledge'] if isinstance(data, list): return data except json.JSONDecodeError: pass json_match = re.search(r'(\{[^{}]*"knowledge"[^{}]*\[.*?\])', content, re.DOTALL) if json_match: try: data = json.loads(json_match.group(1)) return data.get('knowledge', []) except json.JSONDecodeError: pass print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr) print(f"Response preview: {content[:500]}", file=sys.stderr) return None def load_existing_knowledge(knowledge_dir: str) -> dict: index_path = Path(knowledge_dir) / "index.json" if not index_path.exists(): return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} try: with open(index_path, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr) return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} def fact_fingerprint(fact: dict) -> str: text = fact.get('fact', '').lower().strip() text = ' '.join(text.split()) return hashlib.md5(text.encode('utf-8')).hexdigest() def deduplicate(new_facts: List[dict], existing: List[dict], similarity_threshold: float = 0.8) -> List[dict]: existing_fingerprints = set() existing_texts = [] for f in existing: fp = fact_fingerprint(f) existing_fingerprints.add(fp) existing_texts.append(f.get('fact', '').lower().strip()) unique = [] for fact in new_facts: fp = fact_fingerprint(fact) if fp in existing_fingerprints: continue fact_words = set(fact.get('fact', '').lower().split()) is_dup = False for existing_text in existing_texts: existing_words = set(existing_text.split()) if not fact_words or not existing_words: continue overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1) if overlap >= similarity_threshold: is_dup = True break if not is_dup: unique.append(fact) existing_fingerprints.add(fp) existing_texts.append(fact.get('fact', '').lower().strip()) return unique def validate_fact(fact: dict) -> bool: required = ['fact', 'category', 'repo', 'confidence'] for field in required: if field not in fact: return False if not isinstance(fact['fact'], str) or not fact['fact'].strip(): return False valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question'] if fact['category'] not in valid_categories: return False if not isinstance(fact.get('confidence', 0), (int, float)): return False if not (0.0 <= fact['confidence'] <= 1.0): return False return True def write_knowledge(index: dict, new_facts: List[dict], knowledge_dir: str, source_session: str = ""): kdir = Path(knowledge_dir) kdir.mkdir(parents=True, exist_ok=True) for fact in new_facts: fact['source_session'] = source_session fact['harvested_at'] = datetime.now(timezone.utc).isoformat() index['facts'].extend(new_facts) index['total_facts'] = len(index['facts']) index['last_updated'] = datetime.now(timezone.utc).isoformat() index_path = kdir / "index.json" with open(index_path, 'w', encoding='utf-8') as f: json.dump(index, f, indent=2, ensure_ascii=False) repos = {} for fact in new_facts: repo = fact.get('repo', 'global') repos.setdefault(repo, []).append(fact) for repo, facts in repos.items(): if repo == 'global': md_path = kdir / "global" / "sessions.md" else: md_path = kdir / "repos" / f"{repo}.md" md_path.parent.mkdir(parents=True, exist_ok=True) mode = 'a' if md_path.exists() else 'w' with open(md_path, mode, encoding='utf-8') as f: if mode == 'w': f.write(f"# Session Knowledge: {repo}\n\n") f.write(f"## Session {Path(source_session).stem} — {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n") for fact in facts: icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': '❓'}.get(fact['category'], '•') f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n") f.write("\n") def extract_session_id(messages: List[dict]) -> str: """Derive a stable session ID from messages or return 'unknown'.""" # Try to find session_id in the first message or use filename from source for msg in messages[:3]: if msg.get('session_id'): return msg['session_id'][:32] # Fallback: hash first few messages content = str(messages[:3]) return hashlib.md5(content.encode()).hexdigest()[:12] def extract_agent(messages: List[dict]) -> Optional[str]: """Extract the agent/model name from assistant messages.""" for msg in messages: if msg.get('role') == 'assistant' and msg.get('model'): return msg['model'] return None def extract_tasks(messages: List[dict]) -> List[str]: """Extract the task/goal from the first user message.""" tasks = [] for msg in messages: if msg.get('role') == 'user' and msg.get('content'): content = msg['content'] if isinstance(content, str) and len(content.strip()) < 500: tasks.append(content.strip()) break # First user message is usually the task return tasks def extract_tools(messages: List[dict]) -> List[str]: """Extract tool names used in the session.""" tools = set() for msg in messages: if msg.get('tool_calls'): for tc in msg['tool_calls']: func = tc.get('function', {}) name = func.get('name', '') if name: tools.add(name) return list(tools) def extract_outcome(messages: List[dict]) -> str: """Classify session outcome: success/partial/failure.""" errors = [] for msg in messages: if msg.get('role') == 'tool' and msg.get('is_error'): err = msg.get('content', '') if isinstance(err, str): errors.append(err.lower()) if errors: if any('405' in e or 'permission' in e or 'authentication' in e for e in errors): return 'failure' return 'partial' # Check last assistant message for success indicators last = messages[-1] if messages else {} if last.get('role') == 'assistant': content = str(last.get('content', '')) success_words = ['done', 'completed', 'success', 'merged', 'pushed', 'created', 'saved'] if any(word in content.lower() for word in success_words): return 'success' return 'unknown' def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str, model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict: """Harvest session entities and relationships from one session.""" start_time = time.time() stats = { 'session': session_path, 'facts_found': 0, 'facts_new': 0, 'facts_dup': 0, 'elapsed_seconds': 0, 'error': None } try: messages = read_session(session_path) if not messages: stats['error'] = "Empty session file" return stats conv = extract_conversation(messages) if not conv: stats['error'] = "No conversation turns found" return stats truncated = truncate_for_context(conv, head=50, tail=50) transcript = messages_to_text(truncated) prompt = load_extraction_prompt() raw_facts = call_llm(prompt, transcript, api_base, api_key, model) if raw_facts is None: stats['error'] = "LLM extraction failed" return stats valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence] stats['facts_found'] = len(valid_facts) existing_index = load_existing_knowledge(knowledge_dir) existing_facts = existing_index.get('facts', []) new_facts = deduplicate(valid_facts, existing_facts) stats['facts_new'] = len(new_facts) stats['facts_dup'] = len(valid_facts) - len(new_facts) if new_facts and not dry_run: write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path) stats['elapsed_seconds'] = round(time.time() - start_time, 2) return stats except Exception as e: stats['error'] = str(e) stats['elapsed_seconds'] = round(time.time() - start_time, 2) return stats def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str, model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> List[dict]: sessions_path = Path(sessions_dir) if not sessions_path.is_dir(): print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr) return [] session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) if since: since_dt = datetime.fromisoformat(since.replace('Z', '+00:00')) filtered = [] for sf in session_files: try: parts = sf.stem.split('_') if len(parts) >= 3: date_str = parts[1] file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc) if file_dt >= since_dt: filtered.append(sf) except (ValueError, IndexError): filtered.append(sf) session_files = filtered if limit > 0: session_files = session_files[:limit] print(f"Harvesting {len(session_files)} sessions with session knowledge extractor...") results = [] for i, sf in enumerate(session_files, 1): print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True) stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run) if stats['error']: print(f"ERROR: {stats['error']}") else: print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)") results.append(stats) return results def main(): parser = argparse.ArgumentParser(description="Extract session entities and relationships from Hermes transcripts") parser.add_argument('--session', help='Path to a single session JSONL file') parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions') parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'), help='Directory containing session files (default: ~/.hermes/sessions)') parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store') parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)') parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)') parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL') parser.add_argument('--api-key', default='', help='LLM API key (or set EXTRACTOR_API_KEY)') parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction') parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store') parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold') args = parser.parse_args() api_key = args.api_key or DEFAULT_API_KEY or find_api_key() if not api_key: print("ERROR: No API key found. Set EXTRACTOR_API_KEY or store in one of:", file=sys.stderr) for p in API_KEY_PATHS: print(f" {p}", file=sys.stderr) sys.exit(1) knowledge_dir = args.output if not os.path.isabs(knowledge_dir): knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir) if args.session: stats = harvest_session( args.session, knowledge_dir, args.api_base, api_key, args.model, dry_run=args.dry_run, min_confidence=args.min_confidence ) print(json.dumps(stats, indent=2)) if stats['error']: sys.exit(1) elif args.batch: results = batch_harvest( args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model, since=args.since, limit=args.limit, dry_run=args.dry_run ) total_new = sum(r['facts_new'] for r in results) total_dup = sum(r['facts_dup'] for r in results) errors = sum(1 for r in results if r['error']) print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors") else: parser.print_help() sys.exit(1) if __name__ == '__main__': main()