From 9a2135b1dfcea14b646acba2b876f4a9ec2972fb Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 14 Apr 2026 17:27:20 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20add=20harvester.py=20=E2=80=94=20sessio?= =?UTF-8?q?n=20knowledge=20extractor=20(#8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/harvester.py | 447 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 447 insertions(+) create mode 100644 scripts/harvester.py diff --git a/scripts/harvester.py b/scripts/harvester.py new file mode 100644 index 0000000..9b85b64 --- /dev/null +++ b/scripts/harvester.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +""" +harvester.py — Extract durable knowledge from Hermes session transcripts. + +Combines session_reader + extraction prompt + LLM inference to pull +facts, pitfalls, patterns, and tool quirks from finished sessions. + +Usage: + python3 harvester.py --session ~/.hermes/sessions/session_xxx.jsonl --output knowledge/ + python3 harvester.py --batch --since 2026-04-01 --limit 100 + python3 harvester.py --session session.jsonl --dry-run # Preview without writing +""" + +import argparse +import json +import os +import sys +import time +import hashlib +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +# Add scripts dir to path for sibling imports +SCRIPT_DIR = Path(__file__).parent.absolute() +sys.path.insert(0, str(SCRIPT_DIR)) + +from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text + +# --- Configuration --- + +DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1") +DEFAULT_API_KEY = os.environ.get("HARVESTER_API_KEY", "") +DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro") +KNOWLEDGE_DIR = os.environ.get("HARVESTER_KNOWLEDGE_DIR", "knowledge") +PROMPT_PATH = os.environ.get("HARVESTER_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "harvest-prompt.md")) + +# Where to look for API keys if not set via env +API_KEY_PATHS = [ + os.path.expanduser("~/.config/nous/key"), + os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"), + os.path.expanduser("~/.config/openrouter/key"), +] + + +def find_api_key() -> str: + """Find API key from common locations.""" + for path in API_KEY_PATHS: + if os.path.exists(path): + with open(path) as f: + key = f.read().strip() + if key: + return key + return "" + + +def load_extraction_prompt() -> str: + """Load the extraction prompt template.""" + path = Path(PROMPT_PATH) + if not path.exists(): + print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr) + print("Expected templates/harvest-prompt.md from issue #7", file=sys.stderr) + sys.exit(1) + return path.read_text(encoding='utf-8') + + +def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[list[dict]]: + """Call the LLM API to extract knowledge from a transcript.""" + import urllib.request + + messages = [ + {"role": "system", "content": prompt}, + {"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"} + ] + + payload = json.dumps({ + "model": model, + "messages": messages, + "temperature": 0.1, # Low temp for consistent extraction + "max_tokens": 4096 + }).encode('utf-8') + + req = urllib.request.Request( + f"{api_base}/chat/completions", + data=payload, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + }, + method="POST" + ) + + try: + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read().decode('utf-8')) + content = result["choices"][0]["message"]["content"] + return parse_extraction_response(content) + except Exception as e: + print(f"ERROR: LLM API call failed: {e}", file=sys.stderr) + return None + + +def parse_extraction_response(content: str) -> Optional[list[dict]]: + """Parse the LLM response to extract knowledge items. + + Handles various response formats: raw JSON, markdown-wrapped JSON, etc. + """ + # Try direct JSON parse first + try: + data = json.loads(content) + if isinstance(data, dict) and 'knowledge' in data: + return data['knowledge'] + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + + # Try extracting JSON from markdown code blocks + import re + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL) + if json_match: + try: + data = json.loads(json_match.group(1)) + if isinstance(data, dict) and 'knowledge' in data: + return data['knowledge'] + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + + # Try finding any JSON object with knowledge array + json_match = re.search(r'({[^{}]*"knowledge"[^{}]*[[sS]*?][^{}]*})', content) + if json_match: + try: + data = json.loads(json_match.group(1)) + return data.get('knowledge', []) + except json.JSONDecodeError: + pass + + print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr) + print(f"Response preview: {content[:500]}", file=sys.stderr) + return None + + +def load_existing_knowledge(knowledge_dir: str) -> dict: + """Load the existing knowledge index.""" + index_path = Path(knowledge_dir) / "index.json" + if not index_path.exists(): + return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} + + try: + with open(index_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr) + return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} + + +def fact_fingerprint(fact: dict) -> str: + """Generate a deduplication fingerprint for a fact. + + Uses the fact text normalized (lowercase, stripped) as the key. + Similar facts will have similar fingerprints. + """ + text = fact.get('fact', '').lower().strip() + # Normalize whitespace + text = ' '.join(text.split()) + return hashlib.md5(text.encode('utf-8')).hexdigest() + + +def deduplicate(new_facts: list[dict], existing: list[dict], similarity_threshold: float = 0.8) -> list[dict]: + """Remove duplicate facts from new_facts that already exist in the knowledge store. + + Uses fingerprint matching for exact dedup and simple overlap check for near-dupes. + """ + existing_fingerprints = set() + existing_texts = [] + for f in existing: + fp = fact_fingerprint(f) + existing_fingerprints.add(fp) + existing_texts.append(f.get('fact', '').lower().strip()) + + unique = [] + for fact in new_facts: + fp = fact_fingerprint(fact) + if fp in existing_fingerprints: + continue + + # Check for near-duplicates using simple word overlap + fact_words = set(fact.get('fact', '').lower().split()) + is_dup = False + for existing_text in existing_texts: + existing_words = set(existing_text.split()) + if not fact_words or not existing_words: + continue + overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1) + if overlap >= similarity_threshold: + is_dup = True + break + + if not is_dup: + unique.append(fact) + existing_fingerprints.add(fp) + existing_texts.append(fact.get('fact', '').lower().strip()) + + return unique + + +def validate_fact(fact: dict) -> bool: + """Validate a single knowledge item has required fields.""" + required = ['fact', 'category', 'repo', 'confidence'] + for field in required: + if field not in fact: + return False + + if not isinstance(fact['fact'], str) or not fact['fact'].strip(): + return False + + valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question'] + if fact['category'] not in valid_categories: + return False + + if not isinstance(fact.get('confidence', 0), (int, float)): + return False + + if not (0.0 <= fact['confidence'] <= 1.0): + return False + + return True + + +def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""): + """Write new facts to the knowledge store.""" + kdir = Path(knowledge_dir) + kdir.mkdir(parents=True, exist_ok=True) + + # Add source tracking to each fact + for fact in new_facts: + fact['source_session'] = source_session + fact['harvested_at'] = datetime.now(timezone.utc).isoformat() + + # Update index + index['facts'].extend(new_facts) + index['total_facts'] = len(index['facts']) + index['last_updated'] = datetime.now(timezone.utc).isoformat() + + # Write index + index_path = kdir / "index.json" + with open(index_path, 'w', encoding='utf-8') as f: + json.dump(index, f, indent=2, ensure_ascii=False) + + # Also write per-repo markdown files for human reading + repos = {} + for fact in new_facts: + repo = fact.get('repo', 'global') + repos.setdefault(repo, []).append(fact) + + for repo, facts in repos.items(): + if repo == 'global': + md_path = kdir / "global" / "harvested.md" + else: + md_path = kdir / "repos" / f"{repo}.md" + + md_path.parent.mkdir(parents=True, exist_ok=True) + + # Append to existing or create new + mode = 'a' if md_path.exists() else 'w' + with open(md_path, mode, encoding='utf-8') as f: + if mode == 'w': + f.write(f"# Knowledge: {repo}\n\n") + f.write(f"## Harvested {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n") + for fact in facts: + icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': '❓'}.get(fact['category'], '•') + f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n") + f.write("\n") + + +def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str, + model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict: + """Harvest knowledge from a single session. + + Returns: dict with stats (facts_found, facts_new, facts_dup, elapsed_seconds, error) + """ + start_time = time.time() + stats = { + 'session': session_path, + 'facts_found': 0, + 'facts_new': 0, + 'facts_dup': 0, + 'elapsed_seconds': 0, + 'error': None + } + + try: + # 1. Read session + messages = read_session(session_path) + if not messages: + stats['error'] = "Empty session file" + return stats + + # 2. Extract conversation + conv = extract_conversation(messages) + if not conv: + stats['error'] = "No conversation turns found" + return stats + + # 3. Truncate for context window + truncated = truncate_for_context(conv, head=50, tail=50) + transcript = messages_to_text(truncated) + + # 4. Load extraction prompt + prompt = load_extraction_prompt() + + # 5. Call LLM + raw_facts = call_llm(prompt, transcript, api_base, api_key, model) + if raw_facts is None: + stats['error'] = "LLM extraction failed" + return stats + + # 6. Validate + valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence] + stats['facts_found'] = len(valid_facts) + + # 7. Deduplicate + existing_index = load_existing_knowledge(knowledge_dir) + existing_facts = existing_index.get('facts', []) + new_facts = deduplicate(valid_facts, existing_facts) + stats['facts_new'] = len(new_facts) + stats['facts_dup'] = len(valid_facts) - len(new_facts) + + # 8. Write (unless dry run) + if new_facts and not dry_run: + write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path) + + stats['elapsed_seconds'] = round(time.time() - start_time, 2) + return stats + + except Exception as e: + stats['error'] = str(e) + stats['elapsed_seconds'] = round(time.time() - start_time, 2) + return stats + + +def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str, + model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> list[dict]: + """Harvest knowledge from multiple sessions in batch.""" + sessions_path = Path(sessions_dir) + if not sessions_path.is_dir(): + print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr) + return [] + + # Find session files + session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) # Newest first + + # Filter by date if --since provided + if since: + since_dt = datetime.fromisoformat(since.replace('Z', '+00:00')) + filtered = [] + for sf in session_files: + # Try to parse timestamp from filename (common format: session_YYYYMMDD_HHMMSS_hash.jsonl) + try: + parts = sf.stem.split('_') + if len(parts) >= 3: + date_str = parts[1] + file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc) + if file_dt >= since_dt: + filtered.append(sf) + except (ValueError, IndexError): + # If we can't parse the date, include the file (be permissive) + filtered.append(sf) + session_files = filtered + + # Apply limit + if limit > 0: + session_files = session_files[:limit] + + print(f"Harvesting {len(session_files)} sessions...") + + results = [] + for i, sf in enumerate(session_files, 1): + print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True) + stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run) + if stats['error']: + print(f"ERROR: {stats['error']}") + else: + print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)") + results.append(stats) + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts") + parser.add_argument('--session', help='Path to a single session JSONL file') + parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions') + parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'), + help='Directory containing session files (default: ~/.hermes/sessions)') + parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store') + parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)') + parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)') + parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL') + parser.add_argument('--api-key', default='', help='LLM API key (or set HARVESTER_API_KEY)') + parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction') + parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store') + parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold') + + args = parser.parse_args() + + # Resolve API key + api_key = args.api_key or DEFAULT_API_KEY or find_api_key() + if not api_key: + print("ERROR: No API key found. Set HARVESTER_API_KEY or store in one of:", file=sys.stderr) + for p in API_KEY_PATHS: + print(f" {p}", file=sys.stderr) + sys.exit(1) + + # Resolve knowledge directory + knowledge_dir = args.output + if not os.path.isabs(knowledge_dir): + knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir) + + if args.session: + # Single session mode + stats = harvest_session( + args.session, knowledge_dir, args.api_base, api_key, args.model, + dry_run=args.dry_run, min_confidence=args.min_confidence + ) + print(json.dumps(stats, indent=2)) + if stats['error']: + sys.exit(1) + elif args.batch: + # Batch mode + results = batch_harvest( + args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model, + since=args.since, limit=args.limit, dry_run=args.dry_run + ) + total_new = sum(r['facts_new'] for r in results) + total_dup = sum(r['facts_dup'] for r in results) + errors = sum(1 for r in results if r['error']) + print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors") + else: + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main()