"""Session logging for Timmy - captures interactions, errors, and decisions. Timmy requested: "I'd love to see a detailed log of all my interactions, including any mistakes or errors that occur during the session." """ import json import logging from datetime import date, datetime from pathlib import Path from typing import Any logger = logging.getLogger(__name__) class SessionLogger: """Logs Timmy's interactions to a session file.""" def __init__(self, logs_dir: str | Path | None = None): """Initialize session logger. Args: logs_dir: Directory for log files. Defaults to /logs in repo root. """ from config import settings if logs_dir is None: self.logs_dir = Path(settings.repo_root) / "logs" else: self.logs_dir = Path(logs_dir) # Create logs directory if it doesn't exist self.logs_dir.mkdir(parents=True, exist_ok=True) # Session file path self.session_file = self.logs_dir / f"session_{date.today().isoformat()}.jsonl" # In-memory buffer self._buffer: list[dict] = [] def record_message(self, role: str, content: str, confidence: float | None = None) -> None: """Record a user message. Args: role: "user" or "timmy" content: The message content confidence: Optional confidence score (0.0 to 1.0) """ entry = { "type": "message", "role": role, "content": content, "timestamp": datetime.now().isoformat(), } if confidence is not None: entry["confidence"] = confidence self._buffer.append(entry) def record_tool_call(self, tool_name: str, args: dict, result: str) -> None: """Record a tool call. Args: tool_name: Name of the tool called args: Arguments passed to the tool result: Result from the tool """ # Truncate long results result_preview = result[:500] if isinstance(result, str) else str(result)[:500] self._buffer.append( { "type": "tool_call", "tool": tool_name, "args": args, "result": result_preview, "timestamp": datetime.now().isoformat(), } ) def record_error(self, error: str, context: str | None = None) -> None: """Record an error. Args: error: Error message context: Optional context about what was happening """ self._buffer.append( { "type": "error", "error": error, "context": context, "timestamp": datetime.now().isoformat(), } ) def record_decision(self, decision: str, rationale: str | None = None) -> None: """Record a decision Timmy made. Args: decision: What was decided rationale: Why that decision was made """ self._buffer.append( { "type": "decision", "decision": decision, "rationale": rationale, "timestamp": datetime.now().isoformat(), } ) def flush(self) -> Path: """Flush buffer to disk. Returns: Path to the session file """ if not self._buffer: return self.session_file with open(self.session_file, "a") as f: for entry in self._buffer: f.write(json.dumps(entry) + "\n") logger.info("Flushed %d entries to %s", len(self._buffer), self.session_file) self._buffer.clear() return self.session_file def get_session_summary(self) -> dict[str, Any]: """Get a summary of the current session. Returns: Dict with session statistics """ if not self.session_file.exists(): return { "exists": False, "entries": 0, } entries = [] with open(self.session_file) as f: for line in f: if line.strip(): entries.append(json.loads(line)) return { "exists": True, "file": str(self.session_file), "entries": len(entries), "messages": sum(1 for e in entries if e.get("type") == "message"), "tool_calls": sum(1 for e in entries if e.get("type") == "tool_call"), "errors": sum(1 for e in entries if e.get("type") == "error"), "decisions": sum(1 for e in entries if e.get("type") == "decision"), } def get_recent_entries(self, limit: int = 50) -> list[dict]: """Load recent entries across all session logs. Args: limit: Maximum number of entries to return. Returns: List of entries (most recent first). """ entries: list[dict] = [] log_files = sorted(self.logs_dir.glob("session_*.jsonl"), reverse=True) for log_file in log_files: if len(entries) >= limit: break try: with open(log_file) as f: lines = [ln for ln in f if ln.strip()] for line in reversed(lines): if len(entries) >= limit: break try: entries.append(json.loads(line)) except json.JSONDecodeError: continue except OSError: continue return entries def search(self, query: str, role: str | None = None, limit: int = 10) -> list[dict]: """Search across all session logs for entries matching a query. Args: query: Case-insensitive substring to search for. role: Optional role filter ("user", "timmy", "system"). limit: Maximum number of results to return. Returns: List of matching entries (most recent first), each with type, timestamp, and relevant content fields. """ query_lower = query.lower() matches: list[dict] = [] # Collect all session files, sorted newest first log_files = sorted(self.logs_dir.glob("session_*.jsonl"), reverse=True) for log_file in log_files: if len(matches) >= limit: break try: with open(log_file) as f: # Read all lines, reverse so newest entries come first lines = [ln for ln in f if ln.strip()] for line in reversed(lines): if len(matches) >= limit: break try: entry = json.loads(line) except json.JSONDecodeError: continue # Role filter if role and entry.get("role") != role: continue # Search in text-bearing fields searchable = " ".join( str(entry.get(k, "")) for k in ("content", "error", "decision", "rationale", "result", "tool") ).lower() if query_lower in searchable: entry["_source_file"] = log_file.name matches.append(entry) except OSError: continue return matches # Global session logger instance _session_logger: SessionLogger | None = None def get_session_logger() -> SessionLogger: """Get or create the global session logger.""" global _session_logger if _session_logger is None: _session_logger = SessionLogger() return _session_logger def get_session_summary() -> dict[str, Any]: """Get summary of current session logs. Returns: Dict with session statistics (entries, messages, errors, etc.) """ logger = get_session_logger() return logger.get_session_summary() def flush_session_logs() -> str: """Flush current session logs to disk. Returns: Path to the log file """ logger = get_session_logger() path = logger.flush() return str(path) def session_history(query: str, role: str = "", limit: int = 10) -> str: """Search Timmy's past conversation history. Find messages, tool calls, errors, and decisions from past sessions that match the query. Results are returned most-recent first. Args: query: What to search for (case-insensitive substring match). role: Optional filter by role — "user", "timmy", or "" for all. limit: Maximum results to return (default 10). Returns: Formatted string of matching session entries. """ sl = get_session_logger() # Flush buffer first so current session is searchable sl.flush() results = sl.search(query, role=role or None, limit=limit) if not results: return f"No session history found matching '{query}'." lines = [f"Found {len(results)} result(s) for '{query}':\n"] for entry in results: ts = entry.get("timestamp", "?")[:19] etype = entry.get("type", "?") source = entry.get("_source_file", "") if etype == "message": who = entry.get("role", "?") text = entry.get("content", "")[:200] lines.append(f"[{ts}] {who}: {text}") elif etype == "tool_call": tool = entry.get("tool", "?") result = entry.get("result", "")[:100] lines.append(f"[{ts}] tool:{tool} → {result}") elif etype == "error": err = entry.get("error", "")[:200] lines.append(f"[{ts}] ERROR: {err}") elif etype == "decision": dec = entry.get("decision", "")[:200] lines.append(f"[{ts}] DECIDED: {dec}") else: lines.append(f"[{ts}] {etype}: {json.dumps(entry)[:200]}") if source: lines[-1] += f" ({source})" return "\n".join(lines) # --------------------------------------------------------------------------- # Confidence threshold used for flagging low-confidence responses # --------------------------------------------------------------------------- _LOW_CONFIDENCE_THRESHOLD = 0.5 def self_reflect(limit: int = 30) -> str: """Review recent conversations and reflect on Timmy's own behavior. Scans past session entries for patterns: low-confidence responses, errors, repeated topics, and conversation quality signals. Returns a structured reflection that Timmy can use to improve. Args: limit: How many recent entries to review (default 30). Returns: A formatted self-reflection report. """ sl = get_session_logger() sl.flush() entries = sl.get_recent_entries(limit=limit) if not entries: return "No conversation history to reflect on yet." # Categorize entries messages = [e for e in entries if e.get("type") == "message"] errors = [e for e in entries if e.get("type") == "error"] timmy_msgs = [e for e in messages if e.get("role") == "timmy"] user_msgs = [e for e in messages if e.get("role") == "user"] # 1. Low-confidence responses low_conf = [ m for m in timmy_msgs if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD ] # 2. Identify repeated user topics (simple word frequency) topic_counts: dict[str, int] = {} for m in user_msgs: for word in (m.get("content") or "").lower().split(): cleaned = word.strip(".,!?\"'()[]") if len(cleaned) > 3: topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1 repeated = sorted( ((w, c) for w, c in topic_counts.items() if c >= 3), key=lambda x: x[1], reverse=True, )[:5] # Build reflection report sections: list[str] = ["## Self-Reflection Report\n"] sections.append( f"Reviewed {len(entries)} recent entries: " f"{len(user_msgs)} user messages, " f"{len(timmy_msgs)} responses, " f"{len(errors)} errors.\n" ) # Low confidence if low_conf: sections.append(f"### Low-Confidence Responses ({len(low_conf)})") for m in low_conf[:5]: ts = (m.get("timestamp") or "?")[:19] conf = m.get("confidence", 0) text = (m.get("content") or "")[:120] sections.append(f"- [{ts}] confidence={conf:.0%}: {text}") sections.append("") else: sections.append( "### Low-Confidence Responses\nNone found — all responses above threshold.\n" ) # Errors if errors: sections.append(f"### Errors ({len(errors)})") for e in errors[:5]: ts = (e.get("timestamp") or "?")[:19] err = (e.get("error") or "")[:120] sections.append(f"- [{ts}] {err}") sections.append("") else: sections.append("### Errors\nNo errors recorded.\n") # Repeated topics if repeated: sections.append("### Recurring Topics") for word, count in repeated: sections.append(f'- "{word}" ({count} mentions)') sections.append("") else: sections.append("### Recurring Topics\nNo strong patterns detected.\n") # Actionable summary insights: list[str] = [] if low_conf: insights.append("Consider studying topics where confidence was low.") if errors: insights.append("Review error patterns for recurring infrastructure issues.") if repeated: top_topic = repeated[0][0] insights.append( f'User frequently asks about "{top_topic}" — consider deepening knowledge here.' ) if not insights: insights.append("Conversations look healthy. Keep up the good work.") sections.append("### Insights") for insight in insights: sections.append(f"- {insight}") return "\n".join(sections)