"""Conversation context management for Timmy. Tracks conversation state, intent, and context to improve: - Contextual understanding across multi-turn conversations - Smarter tool usage decisions - Natural reference to prior exchanges """ import logging from dataclasses import dataclass, field from datetime import datetime logger = logging.getLogger(__name__) @dataclass class ConversationContext: """Tracks the current conversation state.""" user_name: str | None = None current_topic: str | None = None last_intent: str | None = None turn_count: int = 0 started_at: datetime = field(default_factory=datetime.now) def update_topic(self, topic: str) -> None: """Update the current conversation topic.""" self.current_topic = topic self.turn_count += 1 def set_user_name(self, name: str) -> None: """Remember the user's name.""" self.user_name = name logger.info("User name set to: %s", name) def get_context_summary(self) -> str: """Generate a context summary for the prompt.""" parts = [] if self.user_name: parts.append(f"User's name is {self.user_name}") if self.current_topic: parts.append(f"Current topic: {self.current_topic}") if self.turn_count > 0: parts.append(f"Conversation turn: {self.turn_count}") return " | ".join(parts) if parts else "" class ConversationManager: """Manages conversation context across sessions.""" def __init__(self) -> None: self._contexts: dict[str, ConversationContext] = {} def get_context(self, session_id: str) -> ConversationContext: """Get or create context for a session.""" if session_id not in self._contexts: self._contexts[session_id] = ConversationContext() return self._contexts[session_id] def clear_context(self, session_id: str) -> None: """Clear context for a session.""" if session_id in self._contexts: del self._contexts[session_id] # Words that look like names but are actually verbs/UI states _NAME_BLOCKLIST = frozenset( { "sending", "loading", "pending", "processing", "typing", "working", "going", "trying", "looking", "getting", "doing", "waiting", "running", "checking", "coming", "leaving", "thinking", "reading", "writing", "watching", "listening", "playing", "eating", "sleeping", "sitting", "standing", "walking", "talking", "asking", "telling", "feeling", "hoping", "wondering", "glad", "happy", "sorry", "sure", "fine", "good", "great", "okay", "here", "there", "back", "done", "ready", "busy", "free", "available", "interested", "confused", "lost", "stuck", "curious", "excited", "tired", "not", "also", "just", "still", "already", "currently", } ) # Verb/adjective suffixes that never appear on real names _NON_NAME_SUFFIXES = ("ing", "tion", "sion", "ness", "ment", "ful", "less", "ous", "ive", "ble") def extract_user_name(self, message: str) -> str | None: """Try to extract user's name from message. Requires the candidate word to be capitalized in the original message (real names are written with a capital letter). Also rejects words in the blocklist and common verb/adjective suffixes. """ message_lower = message.lower() # Common patterns patterns = [ "my name is ", "i'm ", "i am ", "call me ", ] for pattern in patterns: if pattern in message_lower: idx = message_lower.find(pattern) + len(pattern) remainder = message[idx:].strip() if not remainder: continue # Take first word as name (from original message for case info) raw_name = remainder.split()[0].strip(".,!?;:") if not raw_name or len(raw_name) < 2: continue # Require first letter to be uppercase in the original text # (names are capitalized; "I am serving..." is not a name) if not raw_name[0].isupper(): continue # Reject common verbs, adjectives, and UI-state words if raw_name.lower() in self._NAME_BLOCKLIST: continue # Reject words with verb/adjective suffixes if any(raw_name.lower().endswith(s) for s in self._NON_NAME_SUFFIXES): continue return raw_name.capitalize() return None _TOOL_KEYWORDS = frozenset( { "search", "look up", "find", "google", "current price", "latest", "today's", "news", "weather", "stock price", "read file", "write file", "save", "calculate", "compute", "run ", "execute", "shell", "command", "install", } ) _CHAT_ONLY_KEYWORDS = frozenset( { "hello", "hi ", "hey", "how are you", "what's up", "your name", "who are you", "what are you", "thanks", "thank you", "bye", "goodbye", "tell me about yourself", "what can you do", } ) _SIMPLE_QUESTION_PREFIXES = ("what is", "who is", "how does", "why is", "when did", "where is") _TIME_WORDS = ("today", "now", "current", "latest", "this week", "this month") def _is_chat_only(self, message_lower: str) -> bool: """Return True if the message matches a chat-only pattern.""" return any(kw in message_lower for kw in self._CHAT_ONLY_KEYWORDS) def _has_tool_keyword(self, message_lower: str) -> bool: """Return True if the message contains a tool-related keyword.""" return any(kw in message_lower for kw in self._TOOL_KEYWORDS) def _is_simple_question(self, message_lower: str) -> bool | None: """Check if message is a simple question. Returns True if it needs tools (real-time info), False if it doesn't, or None if the message isn't a simple question. """ for prefix in self._SIMPLE_QUESTION_PREFIXES: if message_lower.startswith(prefix): return any(t in message_lower for t in self._TIME_WORDS) return None def should_use_tools(self, message: str, context: ConversationContext) -> bool: """Determine if this message likely requires tools. Returns True if tools are likely needed, False for simple chat. """ message_lower = message.lower().strip() if self._is_chat_only(message_lower): return False if self._has_tool_keyword(message_lower): return True simple = self._is_simple_question(message_lower) if simple is not None: return simple return False # Module-level singleton conversation_manager = ConversationManager()