diff --git a/scripts/session_reader.py b/scripts/session_reader.py new file mode 100644 index 0000000..fa5b580 --- /dev/null +++ b/scripts/session_reader.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +""" +Session reader for Compounding Intelligence. +Reads and parses Hermes session files from ~/.hermes/sessions/. +""" + +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Dict, Any, Optional + + +class SessionReader: + """Reads and parses Hermes session files.""" + + def __init__(self, sessions_dir: str = None): + """Initialize with sessions directory path.""" + if sessions_dir is None: + sessions_dir = os.path.expanduser("~/.hermes/sessions") + self.sessions_dir = Path(sessions_dir) + self.supported_extensions = {'.json', '.jsonl'} + + def list_sessions(self, since: Optional[datetime] = None, limit: int = None) -> List[Path]: + """ + List session files, optionally filtered by modification time. + + Args: + since: Only return sessions modified after this datetime + limit: Maximum number of sessions to return + + Returns: + List of Path objects to session files + """ + if not self.sessions_dir.exists(): + return [] + + sessions = [] + for f in self.sessions_dir.iterdir(): + if f.suffix in self.supported_extensions: + if since is not None: + mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc) + if mtime <= since: + continue + sessions.append(f) + + # Sort by modification time (newest first) + sessions.sort(key=lambda p: p.stat().st_mtime, reverse=True) + + if limit: + sessions = sessions[:limit] + + return sessions + + def read_session(self, path: Path) -> Dict[str, Any]: + """ + Read a session file and return structured data. + + Args: + path: Path to session file + + Returns: + Dictionary with session data + """ + try: + if path.suffix == '.jsonl': + return self._read_jsonl_session(path) + elif path.suffix == '.json': + return self._read_json_session(path) + else: + return {"error": f"Unsupported format: {path.suffix}"} + except Exception as e: + return {"error": str(e), "path": str(path)} + + def _read_json_session(self, path: Path) -> Dict[str, Any]: + """Read a JSON format session file.""" + with open(path, 'r') as f: + data = json.load(f) + + return { + "session_id": data.get("session_id", path.stem), + "model": data.get("model", "unknown"), + "created_at": data.get("session_start"), + "last_updated": data.get("last_updated"), + "message_count": data.get("message_count", len(data.get("messages", []))), + "messages": data.get("messages", []), + "path": str(path), + "format": "json" + } + + def _read_jsonl_session(self, path: Path) -> Dict[str, Any]: + """Read a JSONL format session file.""" + messages = [] + session_meta = None + + with open(path, 'r') as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + if entry.get("role") == "session_meta": + session_meta = entry + else: + messages.append(entry) + except json.JSONDecodeError: + continue + + session_id = path.stem + if session_meta: + session_id = session_meta.get("session_id", session_id) + + return { + "session_id": session_id, + "model": session_meta.get("model", "unknown") if session_meta else "unknown", + "created_at": session_meta.get("timestamp") if session_meta else None, + "last_updated": messages[-1].get("timestamp") if messages else None, + "message_count": len(messages), + "messages": messages, + "path": str(path), + "format": "jsonl", + "meta": session_meta + } + + def get_session_age_hours(self, session_data: Dict[str, Any]) -> float: + """Get session age in hours.""" + last_updated = session_data.get("last_updated") + if not last_updated: + return float('inf') + + try: + if isinstance(last_updated, str): + # Handle various timestamp formats + for fmt in [ + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S" + ]: + try: + dt = datetime.strptime(last_updated, fmt) + dt = dt.replace(tzinfo=timezone.utc) + break + except ValueError: + continue + else: + # Try parsing with fromisoformat + dt = datetime.fromisoformat(last_updated.replace('Z', '+00:00')) + else: + dt = last_updated + + now = datetime.now(timezone.utc) + age = now - dt + return age.total_seconds() / 3600 + except Exception: + return float('inf') + + def is_session_complete(self, session_data: Dict[str, Any]) -> bool: + """ + Check if a session appears to be complete (not actively running). + + Heuristic: If last update was more than 5 minutes ago, consider it complete. + """ + age_hours = self.get_session_age_hours(session_data) + return age_hours > (5 / 60) # 5 minutes + + +def main(): + """Test the session reader.""" + reader = SessionReader() + + # List recent sessions + sessions = reader.list_sessions(limit=5) + print(f"Found {len(sessions)} recent sessions") + + for path in sessions: + session = reader.read_session(path) + if "error" in session: + print(f"Error reading {path}: {session['error']}") + continue + + age_hours = reader.get_session_age_hours(session) + complete = reader.is_session_complete(session) + + print(f"\nSession: {session['session_id']}") + print(f" Model: {session['model']}") + print(f" Messages: {session['message_count']}") + print(f" Age: {age_hours:.1f} hours") + print(f" Complete: {complete}") + + +if __name__ == "__main__": + main()