"""Session logging for Timmy - captures interactions, errors, and decisions. Timmy requested: "I'd love to see a detailed log of all my interactions, including any mistakes or errors that occur during the session." """ import json import logging from datetime import date, datetime from pathlib import Path from typing import Any logger = logging.getLogger(__name__) class SessionLogger: """Logs Timmy's interactions to a session file.""" def __init__(self, logs_dir: str | Path | None = None): """Initialize session logger. Args: logs_dir: Directory for log files. Defaults to /logs in repo root. """ from config import settings if logs_dir is None: self.logs_dir = Path(settings.repo_root) / "logs" else: self.logs_dir = Path(logs_dir) # Create logs directory if it doesn't exist self.logs_dir.mkdir(parents=True, exist_ok=True) # Session file path self.session_file = self.logs_dir / f"session_{date.today().isoformat()}.jsonl" # In-memory buffer self._buffer: list[dict] = [] def record_message(self, role: str, content: str, confidence: float | None = None) -> None: """Record a user message. Args: role: "user" or "timmy" content: The message content confidence: Optional confidence score (0.0 to 1.0) """ entry = { "type": "message", "role": role, "content": content, "timestamp": datetime.now().isoformat(), } if confidence is not None: entry["confidence"] = confidence self._buffer.append(entry) def record_tool_call(self, tool_name: str, args: dict, result: str) -> None: """Record a tool call. Args: tool_name: Name of the tool called args: Arguments passed to the tool result: Result from the tool """ # Truncate long results result_preview = result[:500] if isinstance(result, str) else str(result)[:500] self._buffer.append( { "type": "tool_call", "tool": tool_name, "args": args, "result": result_preview, "timestamp": datetime.now().isoformat(), } ) def record_error(self, error: str, context: str | None = None) -> None: """Record an error. Args: error: Error message context: Optional context about what was happening """ self._buffer.append( { "type": "error", "error": error, "context": context, "timestamp": datetime.now().isoformat(), } ) def record_decision(self, decision: str, rationale: str | None = None) -> None: """Record a decision Timmy made. Args: decision: What was decided rationale: Why that decision was made """ self._buffer.append( { "type": "decision", "decision": decision, "rationale": rationale, "timestamp": datetime.now().isoformat(), } ) def flush(self) -> Path: """Flush buffer to disk. Returns: Path to the session file """ if not self._buffer: return self.session_file with open(self.session_file, "a") as f: for entry in self._buffer: f.write(json.dumps(entry) + "\n") logger.info("Flushed %d entries to %s", len(self._buffer), self.session_file) self._buffer.clear() return self.session_file def get_session_summary(self) -> dict[str, Any]: """Get a summary of the current session. Returns: Dict with session statistics """ if not self.session_file.exists(): return { "exists": False, "entries": 0, } entries = [] with open(self.session_file) as f: for line in f: if line.strip(): entries.append(json.loads(line)) return { "exists": True, "file": str(self.session_file), "entries": len(entries), "messages": sum(1 for e in entries if e.get("type") == "message"), "tool_calls": sum(1 for e in entries if e.get("type") == "tool_call"), "errors": sum(1 for e in entries if e.get("type") == "error"), "decisions": sum(1 for e in entries if e.get("type") == "decision"), } def search(self, query: str, role: str | None = None, limit: int = 10) -> list[dict]: """Search across all session logs for entries matching a query. Args: query: Case-insensitive substring to search for. role: Optional role filter ("user", "timmy", "system"). limit: Maximum number of results to return. Returns: List of matching entries (most recent first), each with type, timestamp, and relevant content fields. """ query_lower = query.lower() matches: list[dict] = [] # Collect all session files, sorted newest first log_files = sorted(self.logs_dir.glob("session_*.jsonl"), reverse=True) for log_file in log_files: if len(matches) >= limit: break try: with open(log_file) as f: # Read all lines, reverse so newest entries come first lines = [ln for ln in f if ln.strip()] for line in reversed(lines): if len(matches) >= limit: break try: entry = json.loads(line) except json.JSONDecodeError: continue # Role filter if role and entry.get("role") != role: continue # Search in text-bearing fields searchable = " ".join( str(entry.get(k, "")) for k in ("content", "error", "decision", "rationale", "result", "tool") ).lower() if query_lower in searchable: entry["_source_file"] = log_file.name matches.append(entry) except OSError: continue return matches # Global session logger instance _session_logger: SessionLogger | None = None def get_session_logger() -> SessionLogger: """Get or create the global session logger.""" global _session_logger if _session_logger is None: _session_logger = SessionLogger() return _session_logger def get_session_summary() -> dict[str, Any]: """Get summary of current session logs. Returns: Dict with session statistics (entries, messages, errors, etc.) """ logger = get_session_logger() return logger.get_session_summary() def flush_session_logs() -> str: """Flush current session logs to disk. Returns: Path to the log file """ logger = get_session_logger() path = logger.flush() return str(path) def session_history(query: str, role: str = "", limit: int = 10) -> str: """Search Timmy's past conversation history. Find messages, tool calls, errors, and decisions from past sessions that match the query. Results are returned most-recent first. Args: query: What to search for (case-insensitive substring match). role: Optional filter by role — "user", "timmy", or "" for all. limit: Maximum results to return (default 10). Returns: Formatted string of matching session entries. """ sl = get_session_logger() # Flush buffer first so current session is searchable sl.flush() results = sl.search(query, role=role or None, limit=limit) if not results: return f"No session history found matching '{query}'." lines = [f"Found {len(results)} result(s) for '{query}':\n"] for entry in results: ts = entry.get("timestamp", "?")[:19] etype = entry.get("type", "?") source = entry.get("_source_file", "") if etype == "message": who = entry.get("role", "?") text = entry.get("content", "")[:200] lines.append(f"[{ts}] {who}: {text}") elif etype == "tool_call": tool = entry.get("tool", "?") result = entry.get("result", "")[:100] lines.append(f"[{ts}] tool:{tool} → {result}") elif etype == "error": err = entry.get("error", "")[:200] lines.append(f"[{ts}] ERROR: {err}") elif etype == "decision": dec = entry.get("decision", "")[:200] lines.append(f"[{ts}] DECIDED: {dec}") else: lines.append(f"[{ts}] {etype}: {json.dumps(entry)[:200]}") if source: lines[-1] += f" ({source})" return "\n".join(lines)