Add scripts/harvester.py
This commit is contained in:
350
scripts/harvester.py
Normal file
350
scripts/harvester.py
Normal file
@@ -0,0 +1,350 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Session Harvester for Compounding Intelligence.
|
||||
Extracts durable knowledge from completed sessions and updates the knowledge store.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from session_reader import SessionReader
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler(Path(__file__).parent.parent / 'metrics' / 'harvester.log'),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KnowledgeHarvester:
|
||||
"""Extracts knowledge from completed sessions."""
|
||||
|
||||
def __init__(self, repo_root: str = None):
|
||||
"""Initialize the harvester."""
|
||||
if repo_root is None:
|
||||
repo_root = str(Path(__file__).parent.parent)
|
||||
self.repo_root = Path(repo_root)
|
||||
self.knowledge_dir = self.repo_root / "knowledge"
|
||||
self.index_path = self.knowledge_dir / "index.json"
|
||||
self.prompt_path = self.repo_root / "templates" / "harvest-prompt.md"
|
||||
|
||||
# Load or create knowledge index
|
||||
self.index = self._load_index()
|
||||
|
||||
# Initialize session reader
|
||||
self.reader = SessionReader()
|
||||
|
||||
# Harvest state file
|
||||
self.state_path = self.knowledge_dir / "harvest_state.json"
|
||||
self.state = self._load_state()
|
||||
|
||||
def _load_index(self) -> Dict[str, Any]:
|
||||
"""Load or create the knowledge index."""
|
||||
if self.index_path.exists():
|
||||
with open(self.index_path, 'r') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
return {
|
||||
"version": 1,
|
||||
"last_updated": datetime.now(timezone.utc).isoformat(),
|
||||
"total_facts": 0,
|
||||
"facts": []
|
||||
}
|
||||
|
||||
def _save_index(self):
|
||||
"""Save the knowledge index."""
|
||||
self.index["last_updated"] = datetime.now(timezone.utc).isoformat()
|
||||
with open(self.index_path, 'w') as f:
|
||||
json.dump(self.index, f, indent=2)
|
||||
|
||||
def _load_state(self) -> Dict[str, Any]:
|
||||
"""Load harvest state."""
|
||||
if self.state_path.exists():
|
||||
with open(self.state_path, 'r') as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
return {
|
||||
"last_harvest": None,
|
||||
"harvested_sessions": [],
|
||||
"total_sessions_processed": 0,
|
||||
"total_facts_extracted": 0
|
||||
}
|
||||
|
||||
def _save_state(self):
|
||||
"""Save harvest state."""
|
||||
with open(self.state_path, 'w') as f:
|
||||
json.dump(self.state, f, indent=2)
|
||||
|
||||
def get_sessions_to_harvest(self, max_age_hours: float = 24) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get sessions that need harvesting.
|
||||
|
||||
Args:
|
||||
max_age_hours: Only harvest sessions modified within this many hours
|
||||
|
||||
Returns:
|
||||
List of session data dictionaries
|
||||
"""
|
||||
# Get sessions modified since last harvest
|
||||
since = None
|
||||
if self.state["last_harvest"]:
|
||||
try:
|
||||
since = datetime.fromisoformat(self.state["last_harvest"].replace('Z', '+00:00'))
|
||||
except (ValueError, AttributeError):
|
||||
pass
|
||||
|
||||
# If no last harvest, use max_age_hours
|
||||
if since is None:
|
||||
since = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
|
||||
|
||||
# Get recent sessions
|
||||
sessions = self.reader.list_sessions(since=since)
|
||||
|
||||
# Filter out already harvested sessions
|
||||
harvested = set(self.state["harvested_sessions"])
|
||||
to_harvest = []
|
||||
|
||||
for path in sessions:
|
||||
session = self.reader.read_session(path)
|
||||
if "error" in session:
|
||||
logger.warning(f"Error reading session {path}: {session['error']}")
|
||||
continue
|
||||
|
||||
# Skip if already harvested
|
||||
if session["session_id"] in harvested:
|
||||
continue
|
||||
|
||||
# Skip if session is still active
|
||||
if not self.reader.is_session_complete(session):
|
||||
continue
|
||||
|
||||
to_harvest.append(session)
|
||||
|
||||
return to_harvest
|
||||
|
||||
def extract_knowledge_from_session(self, session: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Extract knowledge from a single session.
|
||||
|
||||
This is a simplified extraction that looks for patterns in the session.
|
||||
In a full implementation, this would use an LLM with the harvest prompt.
|
||||
|
||||
Args:
|
||||
session: Session data dictionary
|
||||
|
||||
Returns:
|
||||
List of extracted knowledge items
|
||||
"""
|
||||
knowledge_items = []
|
||||
|
||||
# Get messages from session
|
||||
messages = session.get("messages", [])
|
||||
|
||||
# Simple pattern-based extraction
|
||||
for i, msg in enumerate(messages):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
|
||||
if not content or not isinstance(content, str):
|
||||
continue
|
||||
|
||||
# Look for error patterns
|
||||
if "error" in content.lower() or "Error" in content:
|
||||
# Extract error context
|
||||
context = content[:200] # First 200 chars
|
||||
|
||||
# Look for file paths
|
||||
import re
|
||||
file_paths = re.findall(r'[~/.]?[\w/]+\.\w+', context)
|
||||
|
||||
if file_paths:
|
||||
knowledge_items.append({
|
||||
"fact": f"Error encountered with file: {file_paths[0]}",
|
||||
"category": "pitfall",
|
||||
"repo": "global",
|
||||
"confidence": 0.7,
|
||||
"session_id": session["session_id"],
|
||||
"extracted_at": datetime.now(timezone.utc).isoformat()
|
||||
})
|
||||
|
||||
# Look for successful patterns
|
||||
if "success" in content.lower() or "Success" in content:
|
||||
# Extract success context
|
||||
context = content[:200]
|
||||
|
||||
# Look for commands or actions
|
||||
import re
|
||||
commands = re.findall(r'(?:git|npm|pip|python|curl|ssh)\s+[\w\s\-\.]+', context)
|
||||
|
||||
if commands:
|
||||
knowledge_items.append({
|
||||
"fact": f"Successful command pattern: {commands[0]}",
|
||||
"category": "pattern",
|
||||
"repo": "global",
|
||||
"confidence": 0.6,
|
||||
"session_id": session["session_id"],
|
||||
"extracted_at": datetime.now(timezone.utc).isoformat()
|
||||
})
|
||||
|
||||
return knowledge_items
|
||||
|
||||
def harvest_session(self, session: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Harvest knowledge from a single session.
|
||||
|
||||
Args:
|
||||
session: Session data dictionary
|
||||
|
||||
Returns:
|
||||
Harvest result dictionary
|
||||
"""
|
||||
session_id = session["session_id"]
|
||||
logger.info(f"Harvesting session: {session_id}")
|
||||
|
||||
try:
|
||||
# Extract knowledge
|
||||
knowledge_items = self.extract_knowledge_from_session(session)
|
||||
|
||||
# Add to index
|
||||
for item in knowledge_items:
|
||||
# Add metadata
|
||||
item["harvested_at"] = datetime.now(timezone.utc).isoformat()
|
||||
item["session_path"] = session.get("path", "")
|
||||
|
||||
# Add to facts
|
||||
self.index["facts"].append(item)
|
||||
|
||||
# Update state
|
||||
self.state["harvested_sessions"].append(session_id)
|
||||
self.state["total_sessions_processed"] += 1
|
||||
self.state["total_facts_extracted"] += len(knowledge_items)
|
||||
|
||||
result = {
|
||||
"session_id": session_id,
|
||||
"success": True,
|
||||
"facts_extracted": len(knowledge_items),
|
||||
"knowledge_items": knowledge_items
|
||||
}
|
||||
|
||||
logger.info(f"Extracted {len(knowledge_items)} facts from session {session_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error harvesting session {session_id}: {e}")
|
||||
result = {
|
||||
"session_id": session_id,
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"facts_extracted": 0
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def harvest_batch(self, max_sessions: int = 10, max_age_hours: float = 24) -> Dict[str, Any]:
|
||||
"""
|
||||
Harvest a batch of sessions.
|
||||
|
||||
Args:
|
||||
max_sessions: Maximum number of sessions to harvest
|
||||
max_age_hours: Only harvest sessions modified within this many hours
|
||||
|
||||
Returns:
|
||||
Batch harvest result
|
||||
"""
|
||||
logger.info(f"Starting harvest batch (max {max_sessions} sessions, max age {max_age_hours}h)")
|
||||
|
||||
# Get sessions to harvest
|
||||
sessions = self.get_sessions_to_harvest(max_age_hours)
|
||||
|
||||
if not sessions:
|
||||
logger.info("No sessions to harvest")
|
||||
return {
|
||||
"success": True,
|
||||
"sessions_processed": 0,
|
||||
"facts_extracted": 0,
|
||||
"results": []
|
||||
}
|
||||
|
||||
# Limit to max_sessions
|
||||
sessions = sessions[:max_sessions]
|
||||
|
||||
results = []
|
||||
total_facts = 0
|
||||
|
||||
for session in sessions:
|
||||
result = self.harvest_session(session)
|
||||
results.append(result)
|
||||
|
||||
if result["success"]:
|
||||
total_facts += result["facts_extracted"]
|
||||
|
||||
# Update index and state
|
||||
self.index["total_facts"] = len(self.index["facts"])
|
||||
self._save_index()
|
||||
|
||||
self.state["last_harvest"] = datetime.now(timezone.utc).isoformat()
|
||||
self._save_state()
|
||||
|
||||
batch_result = {
|
||||
"success": True,
|
||||
"sessions_processed": len(sessions),
|
||||
"facts_extracted": total_facts,
|
||||
"results": results,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
|
||||
logger.info(f"Harvest batch complete: {len(sessions)} sessions, {total_facts} facts")
|
||||
|
||||
return batch_result
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for the harvester."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Harvest knowledge from completed sessions")
|
||||
parser.add_argument("--max-sessions", type=int, default=10, help="Maximum sessions to harvest")
|
||||
parser.add_argument("--max-age-hours", type=float, default=24, help="Max age in hours")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't save, just report")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
harvester = KnowledgeHarvester()
|
||||
|
||||
if args.dry_run:
|
||||
sessions = harvester.get_sessions_to_harvest(args.max_age_hours)
|
||||
print(f"Would harvest {len(sessions)} sessions:")
|
||||
for session in sessions[:5]: # Show first 5
|
||||
print(f" - {session['session_id']} ({session['message_count']} messages)")
|
||||
if len(sessions) > 5:
|
||||
print(f" ... and {len(sessions) - 5} more")
|
||||
return
|
||||
|
||||
result = harvester.harvest_batch(
|
||||
max_sessions=args.max_sessions,
|
||||
max_age_hours=args.max_age_hours
|
||||
)
|
||||
|
||||
if result["success"]:
|
||||
print(f"Harvest complete: {result['sessions_processed']} sessions, {result['facts_extracted']} facts")
|
||||
else:
|
||||
print(f"Harvest failed: {result.get('error', 'Unknown error')}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user