195 lines
6.6 KiB
Python
195 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Session reader for Compounding Intelligence.
|
|
Reads and parses Hermes session files from ~/.hermes/sessions/.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
|
class SessionReader:
|
|
"""Reads and parses Hermes session files."""
|
|
|
|
def __init__(self, sessions_dir: str = None):
|
|
"""Initialize with sessions directory path."""
|
|
if sessions_dir is None:
|
|
sessions_dir = os.path.expanduser("~/.hermes/sessions")
|
|
self.sessions_dir = Path(sessions_dir)
|
|
self.supported_extensions = {'.json', '.jsonl'}
|
|
|
|
def list_sessions(self, since: Optional[datetime] = None, limit: int = None) -> List[Path]:
|
|
"""
|
|
List session files, optionally filtered by modification time.
|
|
|
|
Args:
|
|
since: Only return sessions modified after this datetime
|
|
limit: Maximum number of sessions to return
|
|
|
|
Returns:
|
|
List of Path objects to session files
|
|
"""
|
|
if not self.sessions_dir.exists():
|
|
return []
|
|
|
|
sessions = []
|
|
for f in self.sessions_dir.iterdir():
|
|
if f.suffix in self.supported_extensions:
|
|
if since is not None:
|
|
mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc)
|
|
if mtime <= since:
|
|
continue
|
|
sessions.append(f)
|
|
|
|
# Sort by modification time (newest first)
|
|
sessions.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
if limit:
|
|
sessions = sessions[:limit]
|
|
|
|
return sessions
|
|
|
|
def read_session(self, path: Path) -> Dict[str, Any]:
|
|
"""
|
|
Read a session file and return structured data.
|
|
|
|
Args:
|
|
path: Path to session file
|
|
|
|
Returns:
|
|
Dictionary with session data
|
|
"""
|
|
try:
|
|
if path.suffix == '.jsonl':
|
|
return self._read_jsonl_session(path)
|
|
elif path.suffix == '.json':
|
|
return self._read_json_session(path)
|
|
else:
|
|
return {"error": f"Unsupported format: {path.suffix}"}
|
|
except Exception as e:
|
|
return {"error": str(e), "path": str(path)}
|
|
|
|
def _read_json_session(self, path: Path) -> Dict[str, Any]:
|
|
"""Read a JSON format session file."""
|
|
with open(path, 'r') as f:
|
|
data = json.load(f)
|
|
|
|
return {
|
|
"session_id": data.get("session_id", path.stem),
|
|
"model": data.get("model", "unknown"),
|
|
"created_at": data.get("session_start"),
|
|
"last_updated": data.get("last_updated"),
|
|
"message_count": data.get("message_count", len(data.get("messages", []))),
|
|
"messages": data.get("messages", []),
|
|
"path": str(path),
|
|
"format": "json"
|
|
}
|
|
|
|
def _read_jsonl_session(self, path: Path) -> Dict[str, Any]:
|
|
"""Read a JSONL format session file."""
|
|
messages = []
|
|
session_meta = None
|
|
|
|
with open(path, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
if entry.get("role") == "session_meta":
|
|
session_meta = entry
|
|
else:
|
|
messages.append(entry)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
session_id = path.stem
|
|
if session_meta:
|
|
session_id = session_meta.get("session_id", session_id)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"model": session_meta.get("model", "unknown") if session_meta else "unknown",
|
|
"created_at": session_meta.get("timestamp") if session_meta else None,
|
|
"last_updated": messages[-1].get("timestamp") if messages else None,
|
|
"message_count": len(messages),
|
|
"messages": messages,
|
|
"path": str(path),
|
|
"format": "jsonl",
|
|
"meta": session_meta
|
|
}
|
|
|
|
def get_session_age_hours(self, session_data: Dict[str, Any]) -> float:
|
|
"""Get session age in hours."""
|
|
last_updated = session_data.get("last_updated")
|
|
if not last_updated:
|
|
return float('inf')
|
|
|
|
try:
|
|
if isinstance(last_updated, str):
|
|
# Handle various timestamp formats
|
|
for fmt in [
|
|
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
"%Y-%m-%dT%H:%M:%SZ",
|
|
"%Y-%m-%dT%H:%M:%S.%f",
|
|
"%Y-%m-%dT%H:%M:%S"
|
|
]:
|
|
try:
|
|
dt = datetime.strptime(last_updated, fmt)
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
# Try parsing with fromisoformat
|
|
dt = datetime.fromisoformat(last_updated.replace('Z', '+00:00'))
|
|
else:
|
|
dt = last_updated
|
|
|
|
now = datetime.now(timezone.utc)
|
|
age = now - dt
|
|
return age.total_seconds() / 3600
|
|
except Exception:
|
|
return float('inf')
|
|
|
|
def is_session_complete(self, session_data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Check if a session appears to be complete (not actively running).
|
|
|
|
Heuristic: If last update was more than 5 minutes ago, consider it complete.
|
|
"""
|
|
age_hours = self.get_session_age_hours(session_data)
|
|
return age_hours > (5 / 60) # 5 minutes
|
|
|
|
|
|
def main():
|
|
"""Test the session reader."""
|
|
reader = SessionReader()
|
|
|
|
# List recent sessions
|
|
sessions = reader.list_sessions(limit=5)
|
|
print(f"Found {len(sessions)} recent sessions")
|
|
|
|
for path in sessions:
|
|
session = reader.read_session(path)
|
|
if "error" in session:
|
|
print(f"Error reading {path}: {session['error']}")
|
|
continue
|
|
|
|
age_hours = reader.get_session_age_hours(session)
|
|
complete = reader.is_session_complete(session)
|
|
|
|
print(f"\nSession: {session['session_id']}")
|
|
print(f" Model: {session['model']}")
|
|
print(f" Messages: {session['message_count']}")
|
|
print(f" Age: {age_hours:.1f} hours")
|
|
print(f" Complete: {complete}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|