From da073ad7cf73550f2f47f9ee33fb98fe9af41122 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 14 Apr 2026 14:03:30 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20add=20harvester.py=20=E2=80=94=20sessio?= =?UTF-8?q?n=20knowledge=20extractor=20(#8)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Main harvester module that chains: session_reader → extraction prompt → LLM → validate → deduplicate → store Includes: - scripts/harvester.py — main module (reader + prompt + storage pipeline) - scripts/session_reader.py — JSONL transcript parser - scripts/test_harvester_pipeline.py — smoke tests (all passing) Pipeline: 1. Read session JSONL via session_reader 2. Truncate long sessions (first 50 + last 50 messages) 3. Send transcript + extraction prompt to LLM (mimo-v2-pro) 4. Parse structured JSON response (facts/pitfalls/patterns/quirks/questions) 5. Validate fields + confidence threshold 6. Deduplicate against knowledge/index.json (fingerprint + word overlap) 7. Write to knowledge store (index.json + per-repo markdown) CLI: Single: python3 harvester.py --session --output knowledge/ Batch: python3 harvester.py --batch --since 2026-04-01 --limit 100 Dry-run: python3 harvester.py --session --dry-run --- scripts/harvester.py | 447 +++++++++++++++++++++++++++++ scripts/session_reader.py | 142 +++++++++ scripts/test_harvester_pipeline.py | 162 +++++++++++ 3 files changed, 751 insertions(+) create mode 100644 scripts/harvester.py create mode 100644 scripts/session_reader.py create mode 100644 scripts/test_harvester_pipeline.py diff --git a/scripts/harvester.py b/scripts/harvester.py new file mode 100644 index 0000000..9b85b64 --- /dev/null +++ b/scripts/harvester.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +""" +harvester.py — Extract durable knowledge from Hermes session transcripts. + +Combines session_reader + extraction prompt + LLM inference to pull +facts, pitfalls, patterns, and tool quirks from finished sessions. + +Usage: + python3 harvester.py --session ~/.hermes/sessions/session_xxx.jsonl --output knowledge/ + python3 harvester.py --batch --since 2026-04-01 --limit 100 + python3 harvester.py --session session.jsonl --dry-run # Preview without writing +""" + +import argparse +import json +import os +import sys +import time +import hashlib +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +# Add scripts dir to path for sibling imports +SCRIPT_DIR = Path(__file__).parent.absolute() +sys.path.insert(0, str(SCRIPT_DIR)) + +from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text + +# --- Configuration --- + +DEFAULT_API_BASE = os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1") +DEFAULT_API_KEY = os.environ.get("HARVESTER_API_KEY", "") +DEFAULT_MODEL = os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro") +KNOWLEDGE_DIR = os.environ.get("HARVESTER_KNOWLEDGE_DIR", "knowledge") +PROMPT_PATH = os.environ.get("HARVESTER_PROMPT_PATH", str(SCRIPT_DIR.parent / "templates" / "harvest-prompt.md")) + +# Where to look for API keys if not set via env +API_KEY_PATHS = [ + os.path.expanduser("~/.config/nous/key"), + os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"), + os.path.expanduser("~/.config/openrouter/key"), +] + + +def find_api_key() -> str: + """Find API key from common locations.""" + for path in API_KEY_PATHS: + if os.path.exists(path): + with open(path) as f: + key = f.read().strip() + if key: + return key + return "" + + +def load_extraction_prompt() -> str: + """Load the extraction prompt template.""" + path = Path(PROMPT_PATH) + if not path.exists(): + print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr) + print("Expected templates/harvest-prompt.md from issue #7", file=sys.stderr) + sys.exit(1) + return path.read_text(encoding='utf-8') + + +def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[list[dict]]: + """Call the LLM API to extract knowledge from a transcript.""" + import urllib.request + + messages = [ + {"role": "system", "content": prompt}, + {"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"} + ] + + payload = json.dumps({ + "model": model, + "messages": messages, + "temperature": 0.1, # Low temp for consistent extraction + "max_tokens": 4096 + }).encode('utf-8') + + req = urllib.request.Request( + f"{api_base}/chat/completions", + data=payload, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + }, + method="POST" + ) + + try: + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read().decode('utf-8')) + content = result["choices"][0]["message"]["content"] + return parse_extraction_response(content) + except Exception as e: + print(f"ERROR: LLM API call failed: {e}", file=sys.stderr) + return None + + +def parse_extraction_response(content: str) -> Optional[list[dict]]: + """Parse the LLM response to extract knowledge items. + + Handles various response formats: raw JSON, markdown-wrapped JSON, etc. + """ + # Try direct JSON parse first + try: + data = json.loads(content) + if isinstance(data, dict) and 'knowledge' in data: + return data['knowledge'] + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + + # Try extracting JSON from markdown code blocks + import re + json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL) + if json_match: + try: + data = json.loads(json_match.group(1)) + if isinstance(data, dict) and 'knowledge' in data: + return data['knowledge'] + if isinstance(data, list): + return data + except json.JSONDecodeError: + pass + + # Try finding any JSON object with knowledge array + json_match = re.search(r'({[^{}]*"knowledge"[^{}]*[[sS]*?][^{}]*})', content) + if json_match: + try: + data = json.loads(json_match.group(1)) + return data.get('knowledge', []) + except json.JSONDecodeError: + pass + + print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr) + print(f"Response preview: {content[:500]}", file=sys.stderr) + return None + + +def load_existing_knowledge(knowledge_dir: str) -> dict: + """Load the existing knowledge index.""" + index_path = Path(knowledge_dir) / "index.json" + if not index_path.exists(): + return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} + + try: + with open(index_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr) + return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []} + + +def fact_fingerprint(fact: dict) -> str: + """Generate a deduplication fingerprint for a fact. + + Uses the fact text normalized (lowercase, stripped) as the key. + Similar facts will have similar fingerprints. + """ + text = fact.get('fact', '').lower().strip() + # Normalize whitespace + text = ' '.join(text.split()) + return hashlib.md5(text.encode('utf-8')).hexdigest() + + +def deduplicate(new_facts: list[dict], existing: list[dict], similarity_threshold: float = 0.8) -> list[dict]: + """Remove duplicate facts from new_facts that already exist in the knowledge store. + + Uses fingerprint matching for exact dedup and simple overlap check for near-dupes. + """ + existing_fingerprints = set() + existing_texts = [] + for f in existing: + fp = fact_fingerprint(f) + existing_fingerprints.add(fp) + existing_texts.append(f.get('fact', '').lower().strip()) + + unique = [] + for fact in new_facts: + fp = fact_fingerprint(fact) + if fp in existing_fingerprints: + continue + + # Check for near-duplicates using simple word overlap + fact_words = set(fact.get('fact', '').lower().split()) + is_dup = False + for existing_text in existing_texts: + existing_words = set(existing_text.split()) + if not fact_words or not existing_words: + continue + overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1) + if overlap >= similarity_threshold: + is_dup = True + break + + if not is_dup: + unique.append(fact) + existing_fingerprints.add(fp) + existing_texts.append(fact.get('fact', '').lower().strip()) + + return unique + + +def validate_fact(fact: dict) -> bool: + """Validate a single knowledge item has required fields.""" + required = ['fact', 'category', 'repo', 'confidence'] + for field in required: + if field not in fact: + return False + + if not isinstance(fact['fact'], str) or not fact['fact'].strip(): + return False + + valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question'] + if fact['category'] not in valid_categories: + return False + + if not isinstance(fact.get('confidence', 0), (int, float)): + return False + + if not (0.0 <= fact['confidence'] <= 1.0): + return False + + return True + + +def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: str, source_session: str = ""): + """Write new facts to the knowledge store.""" + kdir = Path(knowledge_dir) + kdir.mkdir(parents=True, exist_ok=True) + + # Add source tracking to each fact + for fact in new_facts: + fact['source_session'] = source_session + fact['harvested_at'] = datetime.now(timezone.utc).isoformat() + + # Update index + index['facts'].extend(new_facts) + index['total_facts'] = len(index['facts']) + index['last_updated'] = datetime.now(timezone.utc).isoformat() + + # Write index + index_path = kdir / "index.json" + with open(index_path, 'w', encoding='utf-8') as f: + json.dump(index, f, indent=2, ensure_ascii=False) + + # Also write per-repo markdown files for human reading + repos = {} + for fact in new_facts: + repo = fact.get('repo', 'global') + repos.setdefault(repo, []).append(fact) + + for repo, facts in repos.items(): + if repo == 'global': + md_path = kdir / "global" / "harvested.md" + else: + md_path = kdir / "repos" / f"{repo}.md" + + md_path.parent.mkdir(parents=True, exist_ok=True) + + # Append to existing or create new + mode = 'a' if md_path.exists() else 'w' + with open(md_path, mode, encoding='utf-8') as f: + if mode == 'w': + f.write(f"# Knowledge: {repo}\n\n") + f.write(f"## Harvested {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n") + for fact in facts: + icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': '❓'}.get(fact['category'], '•') + f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n") + f.write("\n") + + +def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str, + model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict: + """Harvest knowledge from a single session. + + Returns: dict with stats (facts_found, facts_new, facts_dup, elapsed_seconds, error) + """ + start_time = time.time() + stats = { + 'session': session_path, + 'facts_found': 0, + 'facts_new': 0, + 'facts_dup': 0, + 'elapsed_seconds': 0, + 'error': None + } + + try: + # 1. Read session + messages = read_session(session_path) + if not messages: + stats['error'] = "Empty session file" + return stats + + # 2. Extract conversation + conv = extract_conversation(messages) + if not conv: + stats['error'] = "No conversation turns found" + return stats + + # 3. Truncate for context window + truncated = truncate_for_context(conv, head=50, tail=50) + transcript = messages_to_text(truncated) + + # 4. Load extraction prompt + prompt = load_extraction_prompt() + + # 5. Call LLM + raw_facts = call_llm(prompt, transcript, api_base, api_key, model) + if raw_facts is None: + stats['error'] = "LLM extraction failed" + return stats + + # 6. Validate + valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence] + stats['facts_found'] = len(valid_facts) + + # 7. Deduplicate + existing_index = load_existing_knowledge(knowledge_dir) + existing_facts = existing_index.get('facts', []) + new_facts = deduplicate(valid_facts, existing_facts) + stats['facts_new'] = len(new_facts) + stats['facts_dup'] = len(valid_facts) - len(new_facts) + + # 8. Write (unless dry run) + if new_facts and not dry_run: + write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path) + + stats['elapsed_seconds'] = round(time.time() - start_time, 2) + return stats + + except Exception as e: + stats['error'] = str(e) + stats['elapsed_seconds'] = round(time.time() - start_time, 2) + return stats + + +def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str, + model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> list[dict]: + """Harvest knowledge from multiple sessions in batch.""" + sessions_path = Path(sessions_dir) + if not sessions_path.is_dir(): + print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr) + return [] + + # Find session files + session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True) # Newest first + + # Filter by date if --since provided + if since: + since_dt = datetime.fromisoformat(since.replace('Z', '+00:00')) + filtered = [] + for sf in session_files: + # Try to parse timestamp from filename (common format: session_YYYYMMDD_HHMMSS_hash.jsonl) + try: + parts = sf.stem.split('_') + if len(parts) >= 3: + date_str = parts[1] + file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc) + if file_dt >= since_dt: + filtered.append(sf) + except (ValueError, IndexError): + # If we can't parse the date, include the file (be permissive) + filtered.append(sf) + session_files = filtered + + # Apply limit + if limit > 0: + session_files = session_files[:limit] + + print(f"Harvesting {len(session_files)} sessions...") + + results = [] + for i, sf in enumerate(session_files, 1): + print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True) + stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run) + if stats['error']: + print(f"ERROR: {stats['error']}") + else: + print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)") + results.append(stats) + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts") + parser.add_argument('--session', help='Path to a single session JSONL file') + parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions') + parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'), + help='Directory containing session files (default: ~/.hermes/sessions)') + parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store') + parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)') + parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)') + parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL') + parser.add_argument('--api-key', default='', help='LLM API key (or set HARVESTER_API_KEY)') + parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction') + parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store') + parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold') + + args = parser.parse_args() + + # Resolve API key + api_key = args.api_key or DEFAULT_API_KEY or find_api_key() + if not api_key: + print("ERROR: No API key found. Set HARVESTER_API_KEY or store in one of:", file=sys.stderr) + for p in API_KEY_PATHS: + print(f" {p}", file=sys.stderr) + sys.exit(1) + + # Resolve knowledge directory + knowledge_dir = args.output + if not os.path.isabs(knowledge_dir): + knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir) + + if args.session: + # Single session mode + stats = harvest_session( + args.session, knowledge_dir, args.api_base, api_key, args.model, + dry_run=args.dry_run, min_confidence=args.min_confidence + ) + print(json.dumps(stats, indent=2)) + if stats['error']: + sys.exit(1) + elif args.batch: + # Batch mode + results = batch_harvest( + args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model, + since=args.since, limit=args.limit, dry_run=args.dry_run + ) + total_new = sum(r['facts_new'] for r in results) + total_dup = sum(r['facts_dup'] for r in results) + errors = sum(1 for r in results if r['error']) + print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors") + else: + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/session_reader.py b/scripts/session_reader.py new file mode 100644 index 0000000..2ed1242 --- /dev/null +++ b/scripts/session_reader.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +session_reader.py — Parse Hermes session JSONL transcripts. + +Each line in a session file is a JSON object representing a message. +Standard fields: role (user|assistant|system), content (str), timestamp (str). +Tool calls and tool results are also captured. +""" + +import json +import sys +from pathlib import Path +from typing import Iterator, Optional + + +def read_session(path: str) -> list[dict]: + """Read a session JSONL file and return all messages as a list.""" + messages = [] + with open(path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + messages.append(msg) + except json.JSONDecodeError as e: + print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr) + return messages + + +def read_session_iter(path: str) -> Iterator[dict]: + """Iterate over session messages without loading all into memory.""" + with open(path, 'r', encoding='utf-8') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError as e: + print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr) + + +def extract_conversation(messages: list[dict]) -> list[dict]: + """Extract user/assistant conversation turns, skipping tool-only messages.""" + conversation = [] + for msg in messages: + role = msg.get('role', '') + content = msg.get('content', '') + + # Skip empty messages and pure tool calls + if role in ('user', 'assistant', 'system'): + if isinstance(content, str) and content.strip(): + conversation.append({ + 'role': role, + 'content': content.strip(), + 'timestamp': msg.get('timestamp', '') + }) + elif isinstance(content, list): + # Multimodal content — extract text parts + text_parts = [] + for part in content: + if isinstance(part, dict) and part.get('type') == 'text': + text_parts.append(part.get('text', '')) + if text_parts: + conversation.append({ + 'role': role, + 'content': '\n'.join(text_parts), + 'timestamp': msg.get('timestamp', '') + }) + return conversation + + +def truncate_for_context(messages: list[dict], head: int = 50, tail: int = 50) -> list[dict]: + """Truncate long sessions: keep first N + last N messages. + + This preserves session start (initial context) and end (final results), + skipping the messy middle of long debugging sessions. + """ + if len(messages) <= head + tail: + return messages + + truncated = messages[:head] + truncated.append({ + 'role': 'system', + 'content': f'[{len(messages) - head - tail} messages truncated]', + 'timestamp': '' + }) + truncated.extend(messages[-tail:]) + return truncated + + +def messages_to_text(messages: list[dict]) -> str: + """Convert message list to plain text for LLM consumption.""" + lines = [] + for msg in messages: + role = msg.get('role', 'unknown').upper() + content = msg.get('content', '') + if msg.get('role') == 'system' and 'truncated' in content: + lines.append(f'--- {content} ---') + else: + lines.append(f'{role}: {content}') + return '\n\n'.join(lines) + + +def get_session_metadata(path: str) -> dict: + """Extract metadata from a session file (first message often has config info).""" + messages = read_session(path) + if not messages: + return {'path': path, 'message_count': 0} + + first = messages[0] + last = messages[-1] + + return { + 'path': path, + 'message_count': len(messages), + 'first_timestamp': first.get('timestamp', ''), + 'last_timestamp': last.get('timestamp', ''), + 'first_role': first.get('role', ''), + 'has_tool_calls': any(m.get('tool_calls') for m in messages), + } + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + + path = sys.argv[1] + meta = get_session_metadata(path) + print(json.dumps(meta, indent=2)) + + messages = read_session(path) + conv = extract_conversation(messages) + print(f"\nConversation: {len(conv)} turns") + + truncated = truncate_for_context(conv) + print(f"After truncation: {len(truncated)} turns") + print(f"\nPreview (first 500 chars):") + print(messages_to_text(truncated[:5])[:500]) diff --git a/scripts/test_harvester_pipeline.py b/scripts/test_harvester_pipeline.py new file mode 100644 index 0000000..b7764a1 --- /dev/null +++ b/scripts/test_harvester_pipeline.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +""" +Smoke test for harvester pipeline — verifies the full chain: +session_reader -> prompt -> LLM (mocked) -> validate -> deduplicate -> store + +Does NOT call the real LLM. Tests plumbing only. +""" + +import json +import sys +import tempfile +import os +from pathlib import Path + +# Setup path +SCRIPT_DIR = Path(__file__).parent.absolute() +sys.path.insert(0, str(SCRIPT_DIR)) + +from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text +from harvester import validate_fact, deduplicate, load_existing_knowledge, fact_fingerprint + + +def test_session_reader(): + """Test that session_reader parses JSONL correctly.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + f.write('{"role": "user", "content": "Hello", "timestamp": "2026-04-13T10:00:00Z"}\n') + f.write('{"role": "assistant", "content": "Hi there", "timestamp": "2026-04-13T10:00:01Z"}\n') + f.write('{"role": "user", "content": "Clone the repo", "timestamp": "2026-04-13T10:00:02Z"}\n') + f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n') + path = f.name + + messages = read_session(path) + assert len(messages) == 4, f"Expected 4 messages, got {len(messages)}" + + conv = extract_conversation(messages) + assert len(conv) == 4, f"Expected 4 conversation turns, got {len(conv)}" + + text = messages_to_text(conv) + assert "USER: Hello" in text + assert "ASSISTANT: Hi there" in text + + truncated = truncate_for_context(conv, head=2, tail=2) + assert len(truncated) == 4 # 4 <= head+tail, so no truncation + + os.unlink(path) + print(" [PASS] session_reader pipeline works") + + +def test_validate_fact(): + """Test fact validation.""" + good = {"fact": "Gitea token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9} + assert validate_fact(good), "Valid fact should pass" + + bad_missing = {"fact": "Something", "category": "fact"} + assert not validate_fact(bad_missing), "Missing fields should fail" + + bad_category = {"fact": "Something", "category": "nonsense", "repo": "x", "confidence": 0.5} + assert not validate_fact(bad_category), "Bad category should fail" + + bad_conf = {"fact": "Something", "category": "fact", "repo": "x", "confidence": 1.5} + assert not validate_fact(bad_conf), "Confidence > 1.0 should fail" + + print(" [PASS] fact validation works") + + +def test_deduplicate(): + """Test deduplication.""" + existing = [ + {"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9} + ] + new = [ + {"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}, # exact dup + {"fact": "Deploy uses Ansible on port 22", "category": "pattern", "repo": "fleet", "confidence": 0.8}, # unique + ] + result = deduplicate(new, existing) + assert len(result) == 1, f"Expected 1 unique, got {len(result)}" + assert result[0]["fact"] == "Deploy uses Ansible on port 22" + print(" [PASS] deduplication works") + + +def test_knowledge_store_roundtrip(): + """Test loading and writing knowledge index.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Load empty index + index = load_existing_knowledge(tmpdir) + assert index["total_facts"] == 0 + + # Write a fact + new_facts = [{"fact": "Test fact", "category": "fact", "repo": "test", "confidence": 0.9}] + + # Use harvester's write function + from harvester import write_knowledge + write_knowledge(index, new_facts, tmpdir, source_session="test.jsonl") + + # Reload and verify + index2 = load_existing_knowledge(tmpdir) + assert index2["total_facts"] == 1 + assert index2["facts"][0]["fact"] == "Test fact" + assert index2["facts"][0]["source_session"] == "test.jsonl" + + # Check markdown was written + md_path = Path(tmpdir) / "repos" / "test.md" + assert md_path.exists(), "Markdown file should be created" + + print(" [PASS] knowledge store roundtrip works") + + +def test_full_chain_no_llm(): + """Test the full pipeline minus the LLM call.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + f.write('{"role": "user", "content": "Clone compounding-intelligence", "timestamp": "2026-04-13T10:00:00Z"}\n') + f.write('{"role": "assistant", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:05Z"}\n') + session_path = f.name + + with tempfile.TemporaryDirectory() as knowledge_dir: + # Step 1: Read + messages = read_session(session_path) + assert len(messages) == 2 + + # Step 2: Extract conversation + conv = extract_conversation(messages) + assert len(conv) == 2 + + # Step 3: Truncate + truncated = truncate_for_context(conv, head=50, tail=50) + + # Step 4: Convert to text (this goes to the LLM) + transcript = messages_to_text(truncated) + assert "Clone compounding-intelligence" in transcript + + # Step 5-7: Would be LLM call, validate, deduplicate + # We simulate LLM output here + mock_facts = [ + {"fact": "compounding-intelligence repo was cloned", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9} + ] + valid = [f for f in mock_facts if validate_fact(f)] + + # Step 6: Deduplicate + index = load_existing_knowledge(knowledge_dir) + new_facts = deduplicate(valid, index.get("facts", [])) + assert len(new_facts) == 1 + + # Step 7: Store + from harvester import write_knowledge + write_knowledge(index, new_facts, knowledge_dir, source_session=session_path) + + # Verify + index2 = load_existing_knowledge(knowledge_dir) + assert index2["total_facts"] == 1 + + os.unlink(session_path) + print(" [PASS] full chain (reader -> validate -> dedup -> store) works") + + +if __name__ == "__main__": + print("Running harvester pipeline smoke tests...") + test_session_reader() + test_validate_fact() + test_deduplicate() + test_knowledge_store_roundtrip() + test_full_chain_no_llm() + print("\nAll tests passed.")