diff --git a/scripts/harvester.py b/scripts/harvester.py new file mode 100644 index 0000000..cc57c69 --- /dev/null +++ b/scripts/harvester.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +""" +Session Harvester for Compounding Intelligence. +Extracts durable knowledge from completed sessions and updates the knowledge store. +""" + +import json +import os +import sys +import logging +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import List, Dict, Any, Optional + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent)) +from session_reader import SessionReader + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(Path(__file__).parent.parent / 'metrics' / 'harvester.log'), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + + +class KnowledgeHarvester: + """Extracts knowledge from completed sessions.""" + + def __init__(self, repo_root: str = None): + """Initialize the harvester.""" + if repo_root is None: + repo_root = str(Path(__file__).parent.parent) + self.repo_root = Path(repo_root) + self.knowledge_dir = self.repo_root / "knowledge" + self.index_path = self.knowledge_dir / "index.json" + self.prompt_path = self.repo_root / "templates" / "harvest-prompt.md" + + # Load or create knowledge index + self.index = self._load_index() + + # Initialize session reader + self.reader = SessionReader() + + # Harvest state file + self.state_path = self.knowledge_dir / "harvest_state.json" + self.state = self._load_state() + + def _load_index(self) -> Dict[str, Any]: + """Load or create the knowledge index.""" + if self.index_path.exists(): + with open(self.index_path, 'r') as f: + return json.load(f) + else: + return { + "version": 1, + "last_updated": datetime.now(timezone.utc).isoformat(), + "total_facts": 0, + "facts": [] + } + + def _save_index(self): + """Save the knowledge index.""" + self.index["last_updated"] = datetime.now(timezone.utc).isoformat() + with open(self.index_path, 'w') as f: + json.dump(self.index, f, indent=2) + + def _load_state(self) -> Dict[str, Any]: + """Load harvest state.""" + if self.state_path.exists(): + with open(self.state_path, 'r') as f: + return json.load(f) + else: + return { + "last_harvest": None, + "harvested_sessions": [], + "total_sessions_processed": 0, + "total_facts_extracted": 0 + } + + def _save_state(self): + """Save harvest state.""" + with open(self.state_path, 'w') as f: + json.dump(self.state, f, indent=2) + + def get_sessions_to_harvest(self, max_age_hours: float = 24) -> List[Dict[str, Any]]: + """ + Get sessions that need harvesting. + + Args: + max_age_hours: Only harvest sessions modified within this many hours + + Returns: + List of session data dictionaries + """ + # Get sessions modified since last harvest + since = None + if self.state["last_harvest"]: + try: + since = datetime.fromisoformat(self.state["last_harvest"].replace('Z', '+00:00')) + except (ValueError, AttributeError): + pass + + # If no last harvest, use max_age_hours + if since is None: + since = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) + + # Get recent sessions + sessions = self.reader.list_sessions(since=since) + + # Filter out already harvested sessions + harvested = set(self.state["harvested_sessions"]) + to_harvest = [] + + for path in sessions: + session = self.reader.read_session(path) + if "error" in session: + logger.warning(f"Error reading session {path}: {session['error']}") + continue + + # Skip if already harvested + if session["session_id"] in harvested: + continue + + # Skip if session is still active + if not self.reader.is_session_complete(session): + continue + + to_harvest.append(session) + + return to_harvest + + def extract_knowledge_from_session(self, session: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Extract knowledge from a single session. + + This is a simplified extraction that looks for patterns in the session. + In a full implementation, this would use an LLM with the harvest prompt. + + Args: + session: Session data dictionary + + Returns: + List of extracted knowledge items + """ + knowledge_items = [] + + # Get messages from session + messages = session.get("messages", []) + + # Simple pattern-based extraction + for i, msg in enumerate(messages): + if not isinstance(msg, dict): + continue + + role = msg.get("role", "") + content = msg.get("content", "") + + if not content or not isinstance(content, str): + continue + + # Look for error patterns + if "error" in content.lower() or "Error" in content: + # Extract error context + context = content[:200] # First 200 chars + + # Look for file paths + import re + file_paths = re.findall(r'[~/.]?[\w/]+\.\w+', context) + + if file_paths: + knowledge_items.append({ + "fact": f"Error encountered with file: {file_paths[0]}", + "category": "pitfall", + "repo": "global", + "confidence": 0.7, + "session_id": session["session_id"], + "extracted_at": datetime.now(timezone.utc).isoformat() + }) + + # Look for successful patterns + if "success" in content.lower() or "Success" in content: + # Extract success context + context = content[:200] + + # Look for commands or actions + import re + commands = re.findall(r'(?:git|npm|pip|python|curl|ssh)\s+[\w\s\-\.]+', context) + + if commands: + knowledge_items.append({ + "fact": f"Successful command pattern: {commands[0]}", + "category": "pattern", + "repo": "global", + "confidence": 0.6, + "session_id": session["session_id"], + "extracted_at": datetime.now(timezone.utc).isoformat() + }) + + return knowledge_items + + def harvest_session(self, session: Dict[str, Any]) -> Dict[str, Any]: + """ + Harvest knowledge from a single session. + + Args: + session: Session data dictionary + + Returns: + Harvest result dictionary + """ + session_id = session["session_id"] + logger.info(f"Harvesting session: {session_id}") + + try: + # Extract knowledge + knowledge_items = self.extract_knowledge_from_session(session) + + # Add to index + for item in knowledge_items: + # Add metadata + item["harvested_at"] = datetime.now(timezone.utc).isoformat() + item["session_path"] = session.get("path", "") + + # Add to facts + self.index["facts"].append(item) + + # Update state + self.state["harvested_sessions"].append(session_id) + self.state["total_sessions_processed"] += 1 + self.state["total_facts_extracted"] += len(knowledge_items) + + result = { + "session_id": session_id, + "success": True, + "facts_extracted": len(knowledge_items), + "knowledge_items": knowledge_items + } + + logger.info(f"Extracted {len(knowledge_items)} facts from session {session_id}") + + except Exception as e: + logger.error(f"Error harvesting session {session_id}: {e}") + result = { + "session_id": session_id, + "success": False, + "error": str(e), + "facts_extracted": 0 + } + + return result + + def harvest_batch(self, max_sessions: int = 10, max_age_hours: float = 24) -> Dict[str, Any]: + """ + Harvest a batch of sessions. + + Args: + max_sessions: Maximum number of sessions to harvest + max_age_hours: Only harvest sessions modified within this many hours + + Returns: + Batch harvest result + """ + logger.info(f"Starting harvest batch (max {max_sessions} sessions, max age {max_age_hours}h)") + + # Get sessions to harvest + sessions = self.get_sessions_to_harvest(max_age_hours) + + if not sessions: + logger.info("No sessions to harvest") + return { + "success": True, + "sessions_processed": 0, + "facts_extracted": 0, + "results": [] + } + + # Limit to max_sessions + sessions = sessions[:max_sessions] + + results = [] + total_facts = 0 + + for session in sessions: + result = self.harvest_session(session) + results.append(result) + + if result["success"]: + total_facts += result["facts_extracted"] + + # Update index and state + self.index["total_facts"] = len(self.index["facts"]) + self._save_index() + + self.state["last_harvest"] = datetime.now(timezone.utc).isoformat() + self._save_state() + + batch_result = { + "success": True, + "sessions_processed": len(sessions), + "facts_extracted": total_facts, + "results": results, + "timestamp": datetime.now(timezone.utc).isoformat() + } + + logger.info(f"Harvest batch complete: {len(sessions)} sessions, {total_facts} facts") + + return batch_result + + +def main(): + """Main entry point for the harvester.""" + import argparse + + parser = argparse.ArgumentParser(description="Harvest knowledge from completed sessions") + parser.add_argument("--max-sessions", type=int, default=10, help="Maximum sessions to harvest") + parser.add_argument("--max-age-hours", type=float, default=24, help="Max age in hours") + parser.add_argument("--dry-run", action="store_true", help="Don't save, just report") + + args = parser.parse_args() + + harvester = KnowledgeHarvester() + + if args.dry_run: + sessions = harvester.get_sessions_to_harvest(args.max_age_hours) + print(f"Would harvest {len(sessions)} sessions:") + for session in sessions[:5]: # Show first 5 + print(f" - {session['session_id']} ({session['message_count']} messages)") + if len(sessions) > 5: + print(f" ... and {len(sessions) - 5} more") + return + + result = harvester.harvest_batch( + max_sessions=args.max_sessions, + max_age_hours=args.max_age_hours + ) + + if result["success"]: + print(f"Harvest complete: {result['sessions_processed']} sessions, {result['facts_extracted']} facts") + else: + print(f"Harvest failed: {result.get('error', 'Unknown error')}") + sys.exit(1) + + +if __name__ == "__main__": + main()