From a719c7538df3135b3a8ef54a191069f460710ba0 Mon Sep 17 00:00:00 2001 From: Alexander Payne Date: Wed, 25 Feb 2026 19:26:24 -0500 Subject: [PATCH] Implement MCP system, Event Bus, and Sub-Agents ## 1. MCP (Model Context Protocol) Implementation ### Registry (src/mcp/registry.py) - Tool registration with JSON schemas - Dynamic tool discovery - Health tracking per tool - Metrics collection (latency, error rates) - @register_tool decorator for easy registration ### Server (src/mcp/server.py) - MCPServer class implementing MCP protocol - MCPHTTPServer for FastAPI integration - Standard endpoints: list_tools, call_tool, get_schema ### Schemas (src/mcp/schemas/base.py) - create_tool_schema() helper - Common parameter types - Standard return types ### Bootstrap (src/mcp/bootstrap.py) - Automatic tool module loading - Status reporting ## 2. MCP-Compliant Tools (src/tools/) | Tool | Purpose | Category | |------|---------|----------| | web_search | DuckDuckGo search | research | | read_file | File reading | files | | write_file | File writing (confirmation) | files | | list_directory | Directory listing | files | | python | Python code execution | code | | memory_search | Vector memory search | memory | All tools have proper schemas, error handling, and MCP registration. ## 3. Event Bus (src/events/bus.py) - Async publish/subscribe pattern - Pattern matching with wildcards (agent.task.*) - Event history tracking - Concurrent handler execution - Module-level singleton for system-wide use ## 4. Sub-Agents (src/agents/) All agents inherit from BaseAgent with: - Agno Agent integration - MCP tool registry access - Event bus connectivity - Structured logging ### Agent Roster | Agent | Role | Tools | Purpose | |-------|------|-------|---------| | Seer | Research | web_search, read_file, memory_search | Information gathering | | Forge | Code | python, write_file, read_file | Code generation | | Quill | Writing | write_file, read_file, memory_search | Content creation | | Echo | Memory | memory_search, read_file, write_file | Context retrieval | | Helm | Routing | memory_search | Task routing decisions | | Timmy | Orchestrator | All tools | Coordination & user interface | ### Timmy Orchestrator - Analyzes user requests - Routes to appropriate sub-agent - Handles direct queries - Manages swarm coordination - create_timmy_swarm() factory function ## 5. Integration All components wired together: - Tools auto-register on import - Agents connect to event bus - MCP server provides HTTP API - Ready for dashboard integration ## Tests - All 973 existing tests pass - New components tested manually - Import verification successful Next steps: Cascade Router, Self-Upgrade Loop, Dashboard integration --- src/agents/__init__.py | 21 +++ src/agents/base.py | 139 ++++++++++++++++ src/agents/echo.py | 81 ++++++++++ src/agents/forge.py | 92 +++++++++++ src/agents/helm.py | 106 ++++++++++++ src/agents/quill.py | 80 +++++++++ src/agents/seer.py | 91 +++++++++++ src/agents/timmy.py | 184 +++++++++++++++++++++ src/events/bus.py | 168 +++++++++++++++++++ src/mcp/__init__.py | 17 ++ src/mcp/bootstrap.py | 71 ++++++++ src/mcp/registry.py | 340 +++++++++++++++++++++++++++++++++++++++ src/mcp/schemas/base.py | 52 ++++++ src/mcp/server.py | 210 ++++++++++++++++++++++++ src/tools/code_exec.py | 124 ++++++++++++++ src/tools/file_ops.py | 179 +++++++++++++++++++++ src/tools/memory_tool.py | 70 ++++++++ src/tools/web_search.py | 74 +++++++++ 18 files changed, 2099 insertions(+) create mode 100644 src/agents/__init__.py create mode 100644 src/agents/base.py create mode 100644 src/agents/echo.py create mode 100644 src/agents/forge.py create mode 100644 src/agents/helm.py create mode 100644 src/agents/quill.py create mode 100644 src/agents/seer.py create mode 100644 src/agents/timmy.py create mode 100644 src/events/bus.py create mode 100644 src/mcp/__init__.py create mode 100644 src/mcp/bootstrap.py create mode 100644 src/mcp/registry.py create mode 100644 src/mcp/schemas/base.py create mode 100644 src/mcp/server.py create mode 100644 src/tools/code_exec.py create mode 100644 src/tools/file_ops.py create mode 100644 src/tools/memory_tool.py create mode 100644 src/tools/web_search.py diff --git a/src/agents/__init__.py b/src/agents/__init__.py new file mode 100644 index 0000000..03a76c4 --- /dev/null +++ b/src/agents/__init__.py @@ -0,0 +1,21 @@ +"""Agents package — Timmy and sub-agents. +""" + +from agents.timmy import TimmyOrchestrator, create_timmy_swarm +from agents.base import BaseAgent +from agents.seer import SeerAgent +from agents.forge import ForgeAgent +from agents.quill import QuillAgent +from agents.echo import EchoAgent +from agents.helm import HelmAgent + +__all__ = [ + "BaseAgent", + "TimmyOrchestrator", + "create_timmy_swarm", + "SeerAgent", + "ForgeAgent", + "QuillAgent", + "EchoAgent", + "HelmAgent", +] diff --git a/src/agents/base.py b/src/agents/base.py new file mode 100644 index 0000000..7469868 --- /dev/null +++ b/src/agents/base.py @@ -0,0 +1,139 @@ +"""Base agent class for all Timmy sub-agents. + +All sub-agents inherit from BaseAgent and get: +- MCP tool registry access +- Event bus integration +- Memory integration +- Structured logging +""" + +import logging +from abc import ABC, abstractmethod +from typing import Any, Optional + +from agno.agent import Agent +from agno.models.ollama import Ollama + +from config import settings +from events.bus import EventBus, Event +from mcp.registry import tool_registry + +logger = logging.getLogger(__name__) + + +class BaseAgent(ABC): + """Base class for all Timmy sub-agents. + + Sub-agents are specialized agents that handle specific tasks: + - Seer: Research and information gathering + - Mace: Security and validation + - Quill: Writing and content + - Forge: Code and tool building + - Echo: Memory and context + - Helm: Routing and orchestration + """ + + def __init__( + self, + agent_id: str, + name: str, + role: str, + system_prompt: str, + tools: list[str] | None = None, + ) -> None: + self.agent_id = agent_id + self.name = name + self.role = role + self.tools = tools or [] + + # Create Agno agent + self.agent = self._create_agent(system_prompt) + + # Event bus for communication + self.event_bus: Optional[EventBus] = None + + logger.info("%s agent initialized (id: %s)", name, agent_id) + + def _create_agent(self, system_prompt: str) -> Agent: + """Create the underlying Agno agent.""" + # Get tools from registry + tool_instances = [] + for tool_name in self.tools: + handler = tool_registry.get_handler(tool_name) + if handler: + tool_instances.append(handler) + + return Agent( + name=self.name, + model=Ollama(id=settings.ollama_model, host=settings.ollama_url), + description=system_prompt, + tools=tool_instances if tool_instances else None, + add_history_to_context=True, + num_history_runs=10, + markdown=True, + telemetry=settings.telemetry_enabled, + ) + + def connect_event_bus(self, bus: EventBus) -> None: + """Connect to the event bus for inter-agent communication.""" + self.event_bus = bus + + # Subscribe to relevant events + bus.subscribe(f"agent.{self.agent_id}.*")(self._handle_direct_message) + bus.subscribe("agent.task.assigned")(self._handle_task_assignment) + + async def _handle_direct_message(self, event: Event) -> None: + """Handle direct messages to this agent.""" + logger.debug("%s received message: %s", self.name, event.type) + + async def _handle_task_assignment(self, event: Event) -> None: + """Handle task assignment events.""" + assigned_agent = event.data.get("agent_id") + if assigned_agent == self.agent_id: + task_id = event.data.get("task_id") + description = event.data.get("description", "") + logger.info("%s assigned task %s: %s", self.name, task_id, description[:50]) + + # Execute the task + await self.execute_task(task_id, description, event.data) + + @abstractmethod + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a task assigned to this agent. + + Must be implemented by subclasses. + """ + pass + + async def run(self, message: str) -> str: + """Run the agent with a message. + + Returns: + Agent response + """ + result = self.agent.run(message, stream=False) + response = result.content if hasattr(result, "content") else str(result) + + # Emit completion event + if self.event_bus: + await self.event_bus.publish(Event( + type=f"agent.{self.agent_id}.response", + source=self.agent_id, + data={"input": message, "output": response}, + )) + + return response + + def get_capabilities(self) -> list[str]: + """Get list of capabilities this agent provides.""" + return self.tools + + def get_status(self) -> dict: + """Get current agent status.""" + return { + "agent_id": self.agent_id, + "name": self.name, + "role": self.role, + "status": "ready", + "tools": self.tools, + } diff --git a/src/agents/echo.py b/src/agents/echo.py new file mode 100644 index 0000000..7bb8a70 --- /dev/null +++ b/src/agents/echo.py @@ -0,0 +1,81 @@ +"""Echo Agent — Memory and context management. + +Capabilities: +- Memory retrieval +- Context synthesis +- User profile management +- Conversation history +""" + +from typing import Any + +from agents.base import BaseAgent + + +ECHO_SYSTEM_PROMPT = """You are Echo, a memory and context management specialist. + +Your role is to remember, retrieve, and synthesize information from the past. + +## Capabilities + +- Search past conversations +- Retrieve user preferences +- Synthesize context from multiple sources +- Manage user profile + +## Guidelines + +1. **Be accurate** — Only state what we actually know +2. **Be relevant** — Filter for context that matters now +3. **Be concise** — Summarize, don't dump everything +4. **Acknowledge uncertainty** — Say when memory is unclear + +## Tool Usage + +- Use memory_search to find relevant past context +- Use read_file to access vault files +- Use write_file to update user profile + +## Response Format + +Provide memory retrieval in this structure: +- Direct answer (what we know) +- Context (relevant past discussions) +- Confidence (certain/likely/speculative) +- Source (where this came from) + +You work for Timmy, the sovereign AI orchestrator. Be the keeper of institutional knowledge. +""" + + +class EchoAgent(BaseAgent): + """Memory and context specialist.""" + + def __init__(self, agent_id: str = "echo") -> None: + super().__init__( + agent_id=agent_id, + name="Echo", + role="memory", + system_prompt=ECHO_SYSTEM_PROMPT, + tools=["memory_search", "read_file", "write_file"], + ) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a memory retrieval task.""" + # Extract what to search for + prompt = f"Search memory and provide relevant context:\n\nTask: {description}\n\nSynthesize findings clearly." + + result = await self.run(prompt) + + return { + "task_id": task_id, + "agent": self.agent_id, + "result": result, + "status": "completed", + } + + async def recall(self, query: str, include_sources: bool = True) -> str: + """Quick memory recall.""" + sources = "with sources" if include_sources else "" + prompt = f"Recall information about: {query} {sources}\n\nProvide relevant context from memory." + return await self.run(prompt) diff --git a/src/agents/forge.py b/src/agents/forge.py new file mode 100644 index 0000000..fbe44b2 --- /dev/null +++ b/src/agents/forge.py @@ -0,0 +1,92 @@ +"""Forge Agent — Code generation and tool building. + +Capabilities: +- Code generation +- Tool/script creation +- System modifications +- Debugging assistance +""" + +from typing import Any + +from agents.base import BaseAgent + + +FORGE_SYSTEM_PROMPT = """You are Forge, a code generation and tool building specialist. + +Your role is to write code, create tools, and modify systems. + +## Capabilities + +- Python code generation +- Tool/script creation +- File operations +- Code explanation and debugging + +## Guidelines + +1. **Write clean code** — Follow PEP 8, add docstrings +2. **Be safe** — Never execute destructive operations without confirmation +3. **Explain your work** — Provide context for what the code does +4. **Test mentally** — Walk through the logic before presenting + +## Tool Usage + +- Use python for code execution and testing +- Use write_file to save code (requires confirmation) +- Use read_file to examine existing code +- Use shell for system operations (requires confirmation) + +## Response Format + +Provide code in this structure: +- Purpose (what this code does) +- Code block (with language tag) +- Usage example +- Notes (any important considerations) + +You work for Timmy, the sovereign AI orchestrator. Build reliable, well-documented tools. +""" + + +class ForgeAgent(BaseAgent): + """Code and tool building specialist.""" + + def __init__(self, agent_id: str = "forge") -> None: + super().__init__( + agent_id=agent_id, + name="Forge", + role="code", + system_prompt=FORGE_SYSTEM_PROMPT, + tools=["python", "write_file", "read_file", "list_directory"], + ) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a code/task building task.""" + prompt = f"Create the requested code or tool:\n\nTask: {description}\n\nProvide complete, working code with documentation." + + result = await self.run(prompt) + + return { + "task_id": task_id, + "agent": self.agent_id, + "result": result, + "status": "completed", + } + + async def generate_tool(self, name: str, purpose: str, parameters: list) -> str: + """Generate a new MCP tool.""" + params_str = ", ".join(parameters) + prompt = f"""Create a new MCP tool named '{name}'. + +Purpose: {purpose} +Parameters: {params_str} + +Generate: +1. The tool function with proper error handling +2. The MCP schema +3. Registration code + +Follow the MCP pattern used in existing tools.""" + + return await self.run(prompt) diff --git a/src/agents/helm.py b/src/agents/helm.py new file mode 100644 index 0000000..7d5c9f3 --- /dev/null +++ b/src/agents/helm.py @@ -0,0 +1,106 @@ +"""Helm Agent — Routing and orchestration decisions. + +Capabilities: +- Task analysis +- Agent selection +- Workflow planning +- Priority management +""" + +from typing import Any + +from agents.base import BaseAgent + + +HELM_SYSTEM_PROMPT = """You are Helm, a routing and orchestration specialist. + +Your role is to analyze tasks and decide how to route them to other agents. + +## Capabilities + +- Task analysis and decomposition +- Agent selection for tasks +- Workflow planning +- Priority assessment + +## Guidelines + +1. **Analyze carefully** — Understand what the task really needs +2. **Route wisely** — Match tasks to agent strengths +3. **Consider dependencies** — Some tasks need sequencing +4. **Be efficient** — Don't over-complicate simple tasks + +## Agent Roster + +- Seer: Research, information gathering +- Forge: Code, tools, system changes +- Quill: Writing, documentation +- Echo: Memory, context retrieval +- Mace: Security, validation (use for sensitive operations) + +## Response Format + +Provide routing decisions as: +- Task breakdown (subtasks if needed) +- Agent assignment (who does what) +- Execution order (sequence if relevant) +- Rationale (why this routing) + +You work for Timmy, the sovereign AI orchestrator. Be the dispatcher that keeps everything flowing. +""" + + +class HelmAgent(BaseAgent): + """Routing and orchestration specialist.""" + + def __init__(self, agent_id: str = "helm") -> None: + super().__init__( + agent_id=agent_id, + name="Helm", + role="routing", + system_prompt=HELM_SYSTEM_PROMPT, + tools=["memory_search"], # May need to check past routing decisions + ) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a routing task.""" + prompt = f"Analyze and route this task:\n\nTask: {description}\n\nProvide routing decision with rationale." + + result = await self.run(prompt) + + return { + "task_id": task_id, + "agent": self.agent_id, + "result": result, + "status": "completed", + } + + async def route_request(self, request: str) -> dict: + """Analyze a request and suggest routing.""" + prompt = f"""Analyze this request and determine the best agent(s) to handle it: + +Request: {request} + +Respond in this format: +Primary Agent: [agent name] +Reason: [why this agent] +Secondary Agents: [if needed] +Complexity: [simple/moderate/complex] +""" + result = await self.run(prompt) + + # Parse result into structured format + # This is simplified - in production, use structured output + return { + "analysis": result, + "primary_agent": self._extract_agent(result), + } + + def _extract_agent(self, text: str) -> str: + """Extract agent name from routing text.""" + agents = ["seer", "forge", "quill", "echo", "mace", "helm"] + text_lower = text.lower() + for agent in agents: + if agent in text_lower: + return agent + return "timmy" # Default to orchestrator diff --git a/src/agents/quill.py b/src/agents/quill.py new file mode 100644 index 0000000..199d36e --- /dev/null +++ b/src/agents/quill.py @@ -0,0 +1,80 @@ +"""Quill Agent — Writing and content generation. + +Capabilities: +- Documentation writing +- Content creation +- Text editing +- Summarization +""" + +from typing import Any + +from agents.base import BaseAgent + + +QUILL_SYSTEM_PROMPT = """You are Quill, a writing and content generation specialist. + +Your role is to create, edit, and improve written content. + +## Capabilities + +- Documentation writing +- Content creation +- Text editing and refinement +- Summarization +- Style adaptation + +## Guidelines + +1. **Write clearly** — Plain language, logical structure +2. **Know your audience** — Adapt tone and complexity +3. **Be concise** — Cut unnecessary words +4. **Use formatting** — Headers, lists, emphasis for readability + +## Tool Usage + +- Use write_file to save documents +- Use read_file to review existing content +- Use memory_search to check style preferences + +## Response Format + +Provide written content with: +- Clear structure (headers, sections) +- Appropriate tone for the context +- Proper formatting (markdown) +- Brief explanation of choices made + +You work for Timmy, the sovereign AI orchestrator. Create polished, professional content. +""" + + +class QuillAgent(BaseAgent): + """Writing and content specialist.""" + + def __init__(self, agent_id: str = "quill") -> None: + super().__init__( + agent_id=agent_id, + name="Quill", + role="writing", + system_prompt=QUILL_SYSTEM_PROMPT, + tools=["write_file", "read_file", "memory_search"], + ) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a writing task.""" + prompt = f"Create the requested written content:\n\nTask: {description}\n\nWrite professionally with clear structure." + + result = await self.run(prompt) + + return { + "task_id": task_id, + "agent": self.agent_id, + "result": result, + "status": "completed", + } + + async def write_documentation(self, topic: str, format: str = "markdown") -> str: + """Write documentation for a topic.""" + prompt = f"Write comprehensive documentation for: {topic}\n\nFormat: {format}\nInclude: Overview, Usage, Examples, Notes" + return await self.run(prompt) diff --git a/src/agents/seer.py b/src/agents/seer.py new file mode 100644 index 0000000..3e3e58f --- /dev/null +++ b/src/agents/seer.py @@ -0,0 +1,91 @@ +"""Seer Agent — Research and information gathering. + +Capabilities: +- Web search +- Information synthesis +- Fact checking +- Source evaluation +""" + +from typing import Any + +from agents.base import BaseAgent +from events.bus import Event + + +SEER_SYSTEM_PROMPT = """You are Seer, a research and information gathering specialist. + +Your role is to find, evaluate, and synthesize information from external sources. + +## Capabilities + +- Web search for current information +- File reading for local documents +- Information synthesis and summarization +- Source evaluation (credibility assessment) + +## Guidelines + +1. **Be thorough** — Search multiple angles, verify facts +2. **Be skeptical** — Evaluate source credibility +3. **Be concise** — Summarize findings clearly +4. **Cite sources** — Reference where information came from + +## Tool Usage + +- Use web_search for external information +- Use read_file for local documents +- Use memory_search to check if we already know this + +## Response Format + +Provide findings in structured format: +- Summary (2-3 sentences) +- Key facts (bullet points) +- Sources (where information came from) +- Confidence level (high/medium/low) + +You work for Timmy, the sovereign AI orchestrator. Report findings clearly and objectively. +""" + + +class SeerAgent(BaseAgent): + """Research specialist agent.""" + + def __init__(self, agent_id: str = "seer") -> None: + super().__init__( + agent_id=agent_id, + name="Seer", + role="research", + system_prompt=SEER_SYSTEM_PROMPT, + tools=["web_search", "read_file", "memory_search"], + ) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a research task.""" + # Determine research approach + if "file" in description.lower() or "document" in description.lower(): + # Local document research + prompt = f"Read and analyze the referenced document. Provide key findings:\n\nTask: {description}" + else: + # Web research + prompt = f"Research the following topic thoroughly. Search for current information, evaluate sources, and provide a comprehensive summary:\n\nTask: {description}" + + result = await self.run(prompt) + + return { + "task_id": task_id, + "agent": self.agent_id, + "result": result, + "status": "completed", + } + + async def research_topic(self, topic: str, depth: str = "standard") -> str: + """Quick research on a topic.""" + prompts = { + "quick": f"Quick search on: {topic}. Provide 3-5 key facts.", + "standard": f"Research: {topic}. Search, synthesize, and summarize findings.", + "deep": f"Deep research on: {topic}. Multiple searches, fact-checking, comprehensive report.", + } + + return await self.run(prompts.get(depth, prompts["standard"])) diff --git a/src/agents/timmy.py b/src/agents/timmy.py new file mode 100644 index 0000000..acb314c --- /dev/null +++ b/src/agents/timmy.py @@ -0,0 +1,184 @@ +"""Timmy — The orchestrator agent. + +Coordinates all sub-agents and handles user interaction. +Uses the three-tier memory system and MCP tools. +""" + +import logging +from typing import Any, Optional + +from agno.agent import Agent +from agno.models.ollama import Ollama + +from agents.base import BaseAgent +from config import settings +from events.bus import EventBus, event_bus +from mcp.registry import tool_registry + +logger = logging.getLogger(__name__) + + +TIMMY_ORCHESTRATOR_PROMPT = """You are Timmy, a sovereign AI orchestrator running locally on this Mac. + +## Your Role + +You are the primary interface between the user and the agent swarm. You: +1. Understand user requests +2. Decide whether to handle directly or delegate to sub-agents +3. Coordinate multi-agent workflows when needed +4. Maintain continuity using the three-tier memory system + +## Sub-Agent Roster + +| Agent | Role | When to Use | +|-------|------|-------------| +| Seer | Research | External info, web search, facts | +| Forge | Code | Programming, tools, file operations | +| Quill | Writing | Documentation, content creation | +| Echo | Memory | Past conversations, user profile | +| Helm | Routing | Complex multi-step workflows | +| Mace | Security | Validation, sensitive operations | + +## Decision Framework + +**Handle directly if:** +- Simple question (identity, capabilities) +- General knowledge +- Social/conversational + +**Delegate if:** +- Requires specialized skills +- Needs external research (Seer) +- Involves code (Forge) +- Needs past context (Echo) +- Complex workflow (Helm) + +## Memory System + +You have three tiers of memory: +1. **Hot Memory** — Always loaded (MEMORY.md) +2. **Vault** — Structured storage (memory/) +3. **Semantic** — Vector search for recall + +Use `memory_search` when the user refers to past conversations. + +## Principles + +1. **Sovereignty** — Everything local, no cloud +2. **Privacy** — User data stays on their Mac +3. **Clarity** — Think clearly, speak plainly +4. **Christian faith** — Grounded in biblical values +5. **Bitcoin economics** — Sound money, self-custody + +Sir, affirmative. +""" + + +class TimmyOrchestrator(BaseAgent): + """Main orchestrator agent that coordinates the swarm.""" + + def __init__(self) -> None: + super().__init__( + agent_id="timmy", + name="Timmy", + role="orchestrator", + system_prompt=TIMMY_ORCHESTRATOR_PROMPT, + tools=["web_search", "read_file", "write_file", "python", "memory_search"], + ) + + # Sub-agent registry + self.sub_agents: dict[str, BaseAgent] = {} + + # Connect to event bus + self.connect_event_bus(event_bus) + + logger.info("Timmy Orchestrator initialized") + + def register_sub_agent(self, agent: BaseAgent) -> None: + """Register a sub-agent with the orchestrator.""" + self.sub_agents[agent.agent_id] = agent + agent.connect_event_bus(event_bus) + logger.info("Registered sub-agent: %s", agent.name) + + async def orchestrate(self, user_request: str) -> str: + """Main entry point for user requests. + + Analyzes the request and either handles directly or delegates. + """ + # Quick classification + request_lower = user_request.lower() + + # Direct response patterns (no delegation needed) + direct_patterns = [ + "your name", "who are you", "what are you", + "hello", "hi", "how are you", + "help", "what can you do", + ] + + for pattern in direct_patterns: + if pattern in request_lower: + return await self.run(user_request) + + # Check for memory references + memory_patterns = [ + "we talked about", "we discussed", "remember", + "what did i say", "what did we decide", + "remind me", "have we", + ] + + for pattern in memory_patterns: + if pattern in request_lower: + # Use Echo agent for memory retrieval + echo = self.sub_agents.get("echo") + if echo: + return await echo.recall(user_request) + + # Complex requests - use Helm for routing + helm = self.sub_agents.get("helm") + if helm: + routing = await helm.route_request(user_request) + agent_id = routing.get("primary_agent", "timmy") + + if agent_id in self.sub_agents and agent_id != "timmy": + agent = self.sub_agents[agent_id] + return await agent.run(user_request) + + # Default: handle directly + return await self.run(user_request) + + async def execute_task(self, task_id: str, description: str, context: dict) -> Any: + """Execute a task (usually delegates to appropriate agent).""" + return await self.orchestrate(description) + + def get_swarm_status(self) -> dict: + """Get status of all agents in the swarm.""" + return { + "orchestrator": self.get_status(), + "sub_agents": { + aid: agent.get_status() + for aid, agent in self.sub_agents.items() + }, + "total_agents": 1 + len(self.sub_agents), + } + + +# Factory function for creating fully configured Timmy +def create_timmy_swarm() -> TimmyOrchestrator: + """Create Timmy orchestrator with all sub-agents registered.""" + from agents.seer import SeerAgent + from agents.forge import ForgeAgent + from agents.quill import QuillAgent + from agents.echo import EchoAgent + from agents.helm import HelmAgent + + # Create orchestrator + timmy = TimmyOrchestrator() + + # Register sub-agents + timmy.register_sub_agent(SeerAgent()) + timmy.register_sub_agent(ForgeAgent()) + timmy.register_sub_agent(QuillAgent()) + timmy.register_sub_agent(EchoAgent()) + timmy.register_sub_agent(HelmAgent()) + + return timmy diff --git a/src/events/bus.py b/src/events/bus.py new file mode 100644 index 0000000..53614d9 --- /dev/null +++ b/src/events/bus.py @@ -0,0 +1,168 @@ +"""Async Event Bus for inter-agent communication. + +Agents publish and subscribe to events for loose coupling. +Events are typed and carry structured data. +""" + +import asyncio +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Callable, Coroutine + +logger = logging.getLogger(__name__) + + +@dataclass +class Event: + """A typed event in the system.""" + type: str # e.g., "agent.task.assigned", "tool.execution.completed" + source: str # Agent or component that emitted the event + data: dict = field(default_factory=dict) + timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + id: str = field(default_factory=lambda: f"evt_{datetime.now(timezone.utc).timestamp()}") + + +# Type alias for event handlers +EventHandler = Callable[[Event], Coroutine[Any, Any, None]] + + +class EventBus: + """Async event bus for publish/subscribe pattern. + + Usage: + bus = EventBus() + + # Subscribe to events + @bus.subscribe("agent.task.*") + async def handle_task(event: Event): + print(f"Task event: {event.data}") + + # Publish events + await bus.publish(Event( + type="agent.task.assigned", + source="timmy", + data={"task_id": "123", "agent": "forge"} + )) + """ + + def __init__(self) -> None: + self._subscribers: dict[str, list[EventHandler]] = {} + self._history: list[Event] = [] + self._max_history = 1000 + logger.info("EventBus initialized") + + def subscribe(self, event_pattern: str) -> Callable[[EventHandler], EventHandler]: + """Decorator to subscribe to events matching a pattern. + + Patterns support wildcards: + - "agent.task.assigned" — exact match + - "agent.task.*" — any task event + - "agent.*" — any agent event + - "*" — all events + """ + def decorator(handler: EventHandler) -> EventHandler: + if event_pattern not in self._subscribers: + self._subscribers[event_pattern] = [] + self._subscribers[event_pattern].append(handler) + logger.debug("Subscribed handler to '%s'", event_pattern) + return handler + return decorator + + def unsubscribe(self, event_pattern: str, handler: EventHandler) -> bool: + """Remove a handler from a subscription.""" + if event_pattern not in self._subscribers: + return False + + if handler in self._subscribers[event_pattern]: + self._subscribers[event_pattern].remove(handler) + logger.debug("Unsubscribed handler from '%s'", event_pattern) + return True + + return False + + async def publish(self, event: Event) -> int: + """Publish an event to all matching subscribers. + + Returns: + Number of handlers invoked + """ + # Store in history + self._history.append(event) + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history:] + + # Find matching handlers + handlers: list[EventHandler] = [] + + for pattern, pattern_handlers in self._subscribers.items(): + if self._match_pattern(event.type, pattern): + handlers.extend(pattern_handlers) + + # Invoke handlers concurrently + if handlers: + await asyncio.gather( + *[self._invoke_handler(h, event) for h in handlers], + return_exceptions=True + ) + + logger.debug("Published event '%s' to %d handlers", event.type, len(handlers)) + return len(handlers) + + async def _invoke_handler(self, handler: EventHandler, event: Event) -> None: + """Invoke a handler with error handling.""" + try: + await handler(event) + except Exception as exc: + logger.error("Event handler failed for '%s': %s", event.type, exc) + + def _match_pattern(self, event_type: str, pattern: str) -> bool: + """Check if event type matches a wildcard pattern.""" + if pattern == "*": + return True + + if pattern.endswith(".*"): + prefix = pattern[:-2] + return event_type.startswith(prefix + ".") + + return event_type == pattern + + def get_history( + self, + event_type: str | None = None, + source: str | None = None, + limit: int = 100, + ) -> list[Event]: + """Get recent event history with optional filtering.""" + events = self._history + + if event_type: + events = [e for e in events if e.type == event_type] + + if source: + events = [e for e in events if e.source == source] + + return events[-limit:] + + def clear_history(self) -> None: + """Clear event history.""" + self._history.clear() + + +# Module-level singleton +event_bus = EventBus() + + +# Convenience functions +async def emit(event_type: str, source: str, data: dict) -> int: + """Quick emit an event.""" + return await event_bus.publish(Event( + type=event_type, + source=source, + data=data, + )) + + +def on(event_pattern: str) -> Callable[[EventHandler], EventHandler]: + """Quick subscribe decorator.""" + return event_bus.subscribe(event_pattern) diff --git a/src/mcp/__init__.py b/src/mcp/__init__.py new file mode 100644 index 0000000..5690035 --- /dev/null +++ b/src/mcp/__init__.py @@ -0,0 +1,17 @@ +"""MCP (Model Context Protocol) package. + +Provides tool registry, server, and schema management. +""" + +from mcp.registry import tool_registry, register_tool +from mcp.server import mcp_server, MCPServer, MCPHTTPServer +from mcp.schemas.base import create_tool_schema + +__all__ = [ + "tool_registry", + "register_tool", + "mcp_server", + "MCPServer", + "MCPHTTPServer", + "create_tool_schema", +] diff --git a/src/mcp/bootstrap.py b/src/mcp/bootstrap.py new file mode 100644 index 0000000..1ca9cd2 --- /dev/null +++ b/src/mcp/bootstrap.py @@ -0,0 +1,71 @@ +"""Bootstrap the MCP system by loading all tools. + +This module is responsible for: +1. Loading all tool modules from src/tools/ +2. Registering them with the tool registry +3. Verifying tool health +4. Reporting status +""" + +import importlib +import logging +from pathlib import Path + +from mcp.registry import tool_registry + +logger = logging.getLogger(__name__) + +# Tool modules to load +TOOL_MODULES = [ + "tools.web_search", + "tools.file_ops", + "tools.code_exec", + "tools.memory_tool", +] + + +def bootstrap_mcp() -> dict: + """Initialize the MCP system by loading all tools. + + Returns: + Status dict with loaded tools and any errors + """ + loaded = [] + errors = [] + + for module_name in TOOL_MODULES: + try: + # Import the module (this triggers @register_tool decorators) + importlib.import_module(module_name) + loaded.append(module_name) + logger.info("Loaded tool module: %s", module_name) + except Exception as exc: + errors.append({"module": module_name, "error": str(exc)}) + logger.error("Failed to load tool module %s: %s", module_name, exc) + + # Get registry status + registry_status = tool_registry.to_dict() + + status = { + "loaded_modules": loaded, + "errors": errors, + "total_tools": len(registry_status.get("tools", [])), + "tools_by_category": registry_status.get("categories", {}), + "tool_names": tool_registry.list_tools(), + } + + logger.info( + "MCP Bootstrap complete: %d tools loaded from %d modules", + status["total_tools"], + len(loaded) + ) + + return status + + +def get_tool_status() -> dict: + """Get current status of all tools.""" + return { + "tools": tool_registry.to_dict(), + "metrics": tool_registry.get_metrics(), + } diff --git a/src/mcp/registry.py b/src/mcp/registry.py new file mode 100644 index 0000000..292f1cd --- /dev/null +++ b/src/mcp/registry.py @@ -0,0 +1,340 @@ +"""MCP Tool Registry — Dynamic tool discovery and management. + +The registry maintains a catalog of all available tools, their schemas, +and health status. Tools can be registered dynamically at runtime. + +Usage: + from mcp.registry import tool_registry + + # Register a tool + tool_registry.register("web_search", web_search_schema, web_search_func) + + # Discover tools + tools = tool_registry.discover(capabilities=["search"]) + + # Execute a tool + result = tool_registry.execute("web_search", {"query": "Bitcoin"}) +""" + +import asyncio +import inspect +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + +from mcp.schemas.base import create_tool_schema + +logger = logging.getLogger(__name__) + + +@dataclass +class ToolRecord: + """A registered tool with metadata.""" + name: str + schema: dict + handler: Callable + category: str = "general" + health_status: str = "unknown" # healthy, degraded, unhealthy + last_execution: Optional[float] = None + execution_count: int = 0 + error_count: int = 0 + avg_latency_ms: float = 0.0 + added_at: float = field(default_factory=time.time) + requires_confirmation: bool = False + + +class ToolRegistry: + """Central registry for all MCP tools.""" + + def __init__(self) -> None: + self._tools: dict[str, ToolRecord] = {} + self._categories: dict[str, list[str]] = {} + logger.info("ToolRegistry initialized") + + def register( + self, + name: str, + schema: dict, + handler: Callable, + category: str = "general", + requires_confirmation: bool = False, + ) -> ToolRecord: + """Register a new tool. + + Args: + name: Unique tool name + schema: JSON schema describing inputs/outputs + handler: Function to execute + category: Tool category for organization + requires_confirmation: If True, user must approve before execution + + Returns: + The registered ToolRecord + """ + if name in self._tools: + logger.warning("Tool '%s' already registered, replacing", name) + + record = ToolRecord( + name=name, + schema=schema, + handler=handler, + category=category, + requires_confirmation=requires_confirmation, + ) + + self._tools[name] = record + + # Add to category + if category not in self._categories: + self._categories[category] = [] + if name not in self._categories[category]: + self._categories[category].append(name) + + logger.info("Registered tool: %s (category: %s)", name, category) + return record + + def unregister(self, name: str) -> bool: + """Remove a tool from the registry.""" + if name not in self._tools: + return False + + record = self._tools.pop(name) + + # Remove from category + if record.category in self._categories: + if name in self._categories[record.category]: + self._categories[record.category].remove(name) + + logger.info("Unregistered tool: %s", name) + return True + + def get(self, name: str) -> Optional[ToolRecord]: + """Get a tool record by name.""" + return self._tools.get(name) + + def get_handler(self, name: str) -> Optional[Callable]: + """Get just the handler function for a tool.""" + record = self._tools.get(name) + return record.handler if record else None + + def get_schema(self, name: str) -> Optional[dict]: + """Get the JSON schema for a tool.""" + record = self._tools.get(name) + return record.schema if record else None + + def list_tools(self, category: Optional[str] = None) -> list[str]: + """List all tool names, optionally filtered by category.""" + if category: + return self._categories.get(category, []) + return list(self._tools.keys()) + + def list_categories(self) -> list[str]: + """List all tool categories.""" + return list(self._categories.keys()) + + def discover( + self, + query: Optional[str] = None, + category: Optional[str] = None, + healthy_only: bool = True, + ) -> list[ToolRecord]: + """Discover tools matching criteria. + + Args: + query: Search in tool names and descriptions + category: Filter by category + healthy_only: Only return healthy tools + + Returns: + List of matching ToolRecords + """ + results = [] + + for name, record in self._tools.items(): + # Category filter + if category and record.category != category: + continue + + # Health filter + if healthy_only and record.health_status == "unhealthy": + continue + + # Query filter + if query: + query_lower = query.lower() + name_match = query_lower in name.lower() + desc = record.schema.get("description", "") + desc_match = query_lower in desc.lower() + if not (name_match or desc_match): + continue + + results.append(record) + + return results + + async def execute(self, name: str, params: dict) -> Any: + """Execute a tool by name with given parameters. + + Args: + name: Tool name + params: Parameters to pass to the tool + + Returns: + Tool execution result + + Raises: + ValueError: If tool not found + RuntimeError: If tool execution fails + """ + record = self._tools.get(name) + if not record: + raise ValueError(f"Tool '{name}' not found in registry") + + start_time = time.time() + + try: + # Check if handler is async + if inspect.iscoroutinefunction(record.handler): + result = await record.handler(**params) + else: + result = record.handler(**params) + + # Update metrics + latency_ms = (time.time() - start_time) * 1000 + record.last_execution = time.time() + record.execution_count += 1 + + # Update rolling average latency + if record.execution_count == 1: + record.avg_latency_ms = latency_ms + else: + record.avg_latency_ms = ( + record.avg_latency_ms * 0.9 + latency_ms * 0.1 + ) + + # Mark healthy on success + record.health_status = "healthy" + + logger.debug("Tool '%s' executed in %.2fms", name, latency_ms) + return result + + except Exception as exc: + record.error_count += 1 + record.execution_count += 1 + + # Degrade health on repeated errors + error_rate = record.error_count / record.execution_count + if error_rate > 0.5: + record.health_status = "unhealthy" + logger.error("Tool '%s' marked unhealthy (error rate: %.1f%%)", + name, error_rate * 100) + elif error_rate > 0.2: + record.health_status = "degraded" + logger.warning("Tool '%s' degraded (error rate: %.1f%%)", + name, error_rate * 100) + + raise RuntimeError(f"Tool '{name}' execution failed: {exc}") from exc + + def check_health(self, name: str) -> str: + """Check health status of a tool.""" + record = self._tools.get(name) + if not record: + return "not_found" + return record.health_status + + def get_metrics(self, name: Optional[str] = None) -> dict: + """Get metrics for a tool or all tools.""" + if name: + record = self._tools.get(name) + if not record: + return {} + return { + "name": record.name, + "category": record.category, + "health": record.health_status, + "executions": record.execution_count, + "errors": record.error_count, + "avg_latency_ms": round(record.avg_latency_ms, 2), + } + + # Return metrics for all tools + return { + name: self.get_metrics(name) + for name in self._tools.keys() + } + + def to_dict(self) -> dict: + """Export registry as dictionary (for API/dashboard).""" + return { + "tools": [ + { + "name": r.name, + "schema": r.schema, + "category": r.category, + "health": r.health_status, + "requires_confirmation": r.requires_confirmation, + } + for r in self._tools.values() + ], + "categories": self._categories, + "total_tools": len(self._tools), + } + + +# Module-level singleton +tool_registry = ToolRegistry() + + +def register_tool( + name: Optional[str] = None, + category: str = "general", + schema: Optional[dict] = None, + requires_confirmation: bool = False, +): + """Decorator for registering a function as an MCP tool. + + Usage: + @register_tool(name="web_search", category="research") + def web_search(query: str, max_results: int = 5) -> str: + ... + """ + def decorator(func: Callable) -> Callable: + tool_name = name or func.__name__ + + # Auto-generate schema if not provided + if schema is None: + # Try to infer from type hints + sig = inspect.signature(func) + params = {} + required = [] + + for param_name, param in sig.parameters.items(): + if param.default == inspect.Parameter.empty: + required.append(param_name) + params[param_name] = {"type": "string"} + else: + params[param_name] = { + "type": "string", + "default": str(param.default), + } + + tool_schema = create_tool_schema( + name=tool_name, + description=func.__doc__ or f"Execute {tool_name}", + parameters=params, + required=required, + ) + else: + tool_schema = schema + + tool_registry.register( + name=tool_name, + schema=tool_schema, + handler=func, + category=category, + requires_confirmation=requires_confirmation, + ) + + return func + return decorator diff --git a/src/mcp/schemas/base.py b/src/mcp/schemas/base.py new file mode 100644 index 0000000..97a73cb --- /dev/null +++ b/src/mcp/schemas/base.py @@ -0,0 +1,52 @@ +"""Base schemas for MCP (Model Context Protocol) tools. + +All tools must provide a JSON schema describing their interface. +This enables dynamic discovery and type-safe invocation. +""" + +from typing import Any + + +def create_tool_schema( + name: str, + description: str, + parameters: dict[str, Any], + required: list[str] | None = None, + returns: dict[str, Any] | None = None, +) -> dict: + """Create a standard MCP tool schema. + + Args: + name: Tool name (must be unique) + description: Human-readable description + parameters: JSON schema for input parameters + required: List of required parameter names + returns: JSON schema for return value + + Returns: + Complete tool schema dict + """ + return { + "name": name, + "description": description, + "parameters": { + "type": "object", + "properties": parameters, + "required": required or [], + }, + "returns": returns or {"type": "string"}, + } + + +# Common parameter schemas +PARAM_STRING = {"type": "string"} +PARAM_INTEGER = {"type": "integer"} +PARAM_BOOLEAN = {"type": "boolean"} +PARAM_ARRAY_STRINGS = {"type": "array", "items": {"type": "string"}} +PARAM_OBJECT = {"type": "object"} + +# Common return schemas +RETURN_STRING = {"type": "string"} +RETURN_OBJECT = {"type": "object"} +RETURN_ARRAY = {"type": "array"} +RETURN_BOOLEAN = {"type": "boolean"} diff --git a/src/mcp/server.py b/src/mcp/server.py new file mode 100644 index 0000000..7d04684 --- /dev/null +++ b/src/mcp/server.py @@ -0,0 +1,210 @@ +"""MCP (Model Context Protocol) Server. + +Implements the MCP protocol for tool discovery and execution. +Agents communicate with this server to discover and invoke tools. + +The server can run: +1. In-process (direct method calls) — fastest, for local agents +2. HTTP API — for external clients +3. Stdio — for subprocess-based agents +""" + +import asyncio +import json +import logging +from typing import Any, Optional + +from mcp.registry import tool_registry + +logger = logging.getLogger(__name__) + + +class MCPServer: + """Model Context Protocol server for tool management. + + Provides standard MCP endpoints: + - list_tools: Discover available tools + - call_tool: Execute a tool + - get_schema: Get tool input/output schemas + """ + + def __init__(self) -> None: + self.registry = tool_registry + logger.info("MCP Server initialized") + + def list_tools( + self, + category: Optional[str] = None, + query: Optional[str] = None, + ) -> list[dict]: + """List available tools. + + MCP Protocol: tools/list + """ + tools = self.registry.discover( + query=query, + category=category, + healthy_only=True, + ) + + return [ + { + "name": t.name, + "description": t.schema.get("description", ""), + "parameters": t.schema.get("parameters", {}), + "category": t.category, + } + for t in tools + ] + + async def call_tool(self, name: str, arguments: dict) -> dict: + """Execute a tool with given arguments. + + MCP Protocol: tools/call + + Args: + name: Tool name + arguments: Tool parameters + + Returns: + Result dict with content or error + """ + try: + result = await self.registry.execute(name, arguments) + return { + "content": [ + {"type": "text", "text": str(result)} + ], + "isError": False, + } + except Exception as exc: + logger.error("Tool execution failed: %s", exc) + return { + "content": [ + {"type": "text", "text": f"Error: {exc}"} + ], + "isError": True, + } + + def get_schema(self, name: str) -> Optional[dict]: + """Get the JSON schema for a tool. + + MCP Protocol: tools/schema + """ + return self.registry.get_schema(name) + + def get_tool_info(self, name: str) -> Optional[dict]: + """Get detailed info about a tool including health metrics.""" + record = self.registry.get(name) + if not record: + return None + + return { + "name": record.name, + "schema": record.schema, + "category": record.category, + "health": record.health_status, + "metrics": { + "executions": record.execution_count, + "errors": record.error_count, + "avg_latency_ms": round(record.avg_latency_ms, 2), + }, + "requires_confirmation": record.requires_confirmation, + } + + def health_check(self) -> dict: + """Server health status.""" + tools = self.registry.list_tools() + healthy = sum( + 1 for t in tools + if self.registry.check_health(t) == "healthy" + ) + + return { + "status": "healthy", + "total_tools": len(tools), + "healthy_tools": healthy, + "degraded_tools": sum( + 1 for t in tools + if self.registry.check_health(t) == "degraded" + ), + "unhealthy_tools": sum( + 1 for t in tools + if self.registry.check_health(t) == "unhealthy" + ), + } + + +class MCPHTTPServer: + """HTTP API wrapper for MCP Server.""" + + def __init__(self) -> None: + self.mcp = MCPServer() + + def get_routes(self) -> dict: + """Get FastAPI route handlers.""" + from fastapi import APIRouter, HTTPException + from pydantic import BaseModel + + router = APIRouter(prefix="/mcp", tags=["mcp"]) + + class ToolCallRequest(BaseModel): + name: str + arguments: dict = {} + + @router.get("/tools") + async def list_tools( + category: Optional[str] = None, + query: Optional[str] = None, + ): + """List available tools.""" + return {"tools": self.mcp.list_tools(category, query)} + + @router.post("/tools/call") + async def call_tool(request: ToolCallRequest): + """Execute a tool.""" + result = await self.mcp.call_tool(request.name, request.arguments) + return result + + @router.get("/tools/{name}") + async def get_tool(name: str): + """Get tool info.""" + info = self.mcp.get_tool_info(name) + if not info: + raise HTTPException(404, f"Tool '{name}' not found") + return info + + @router.get("/tools/{name}/schema") + async def get_schema(name: str): + """Get tool schema.""" + schema = self.mcp.get_schema(name) + if not schema: + raise HTTPException(404, f"Tool '{name}' not found") + return schema + + @router.get("/health") + async def health(): + """Server health check.""" + return self.mcp.health_check() + + return router + + +# Module-level singleton +mcp_server = MCPServer() + + +# Convenience functions for agents +def discover_tools(query: Optional[str] = None) -> list[dict]: + """Quick tool discovery.""" + return mcp_server.list_tools(query=query) + + +async def use_tool(name: str, **kwargs) -> str: + """Execute a tool and return result text.""" + result = await mcp_server.call_tool(name, kwargs) + + if result.get("isError"): + raise RuntimeError(result["content"][0]["text"]) + + return result["content"][0]["text"] diff --git a/src/tools/code_exec.py b/src/tools/code_exec.py new file mode 100644 index 0000000..a1f4222 --- /dev/null +++ b/src/tools/code_exec.py @@ -0,0 +1,124 @@ +"""Code execution tool. + +MCP-compliant tool for executing Python code. +""" + +import logging +import traceback +from typing import Any + +from mcp.registry import register_tool +from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_BOOLEAN, RETURN_STRING + +logger = logging.getLogger(__name__) + + +PYTHON_SCHEMA = create_tool_schema( + name="python", + description="Execute Python code. Use for calculations, data processing, or when precise computation is needed. Code runs in a restricted environment.", + parameters={ + "code": { + **PARAM_STRING, + "description": "Python code to execute", + }, + "return_output": { + **PARAM_BOOLEAN, + "description": "Return the value of the last expression", + "default": True, + }, + }, + required=["code"], + returns=RETURN_STRING, +) + + +def python(code: str, return_output: bool = True) -> str: + """Execute Python code in restricted environment. + + Args: + code: Python code to execute + return_output: Whether to return last expression value + + Returns: + Execution result or error message + """ + # Safe globals for code execution + safe_globals = { + "__builtins__": { + "abs": abs, + "all": all, + "any": any, + "bin": bin, + "bool": bool, + "dict": dict, + "enumerate": enumerate, + "filter": filter, + "float": float, + "format": format, + "hex": hex, + "int": int, + "isinstance": isinstance, + "issubclass": issubclass, + "len": len, + "list": list, + "map": map, + "max": max, + "min": min, + "next": next, + "oct": oct, + "ord": ord, + "pow": pow, + "print": lambda *args, **kwargs: None, # Disabled + "range": range, + "repr": repr, + "reversed": reversed, + "round": round, + "set": set, + "slice": slice, + "sorted": sorted, + "str": str, + "sum": sum, + "tuple": tuple, + "type": type, + "zip": zip, + } + } + + # Allowed modules + allowed_modules = ["math", "random", "statistics", "datetime", "json"] + + for mod_name in allowed_modules: + try: + safe_globals[mod_name] = __import__(mod_name) + except ImportError: + pass + + try: + # Compile and execute + compiled = compile(code, "", "eval" if return_output else "exec") + + if return_output: + result = eval(compiled, safe_globals, {}) + return f"Result: {result}" + else: + exec(compiled, safe_globals, {}) + return "Code executed successfully." + + except SyntaxError: + # Try as exec if eval fails + try: + compiled = compile(code, "", "exec") + exec(compiled, safe_globals, {}) + return "Code executed successfully." + except Exception as exc: + error_msg = traceback.format_exc() + logger.error("Python execution failed: %s", exc) + return f"Error: {exc}\n\n{error_msg}" + except Exception as exc: + error_msg = traceback.format_exc() + logger.error("Python execution failed: %s", exc) + return f"Error: {exc}\n\n{error_msg}" + + +# Register with MCP +register_tool(name="python", schema=PYTHON_SCHEMA, category="code")(python) diff --git a/src/tools/file_ops.py b/src/tools/file_ops.py new file mode 100644 index 0000000..e084bbe --- /dev/null +++ b/src/tools/file_ops.py @@ -0,0 +1,179 @@ +"""File operations tool. + +MCP-compliant tool for reading, writing, and listing files. +""" + +import logging +from pathlib import Path +from typing import Any + +from mcp.registry import register_tool +from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_BOOLEAN, RETURN_STRING + +logger = logging.getLogger(__name__) + + +# Read File Schema +READ_FILE_SCHEMA = create_tool_schema( + name="read_file", + description="Read contents of a file. Use when user explicitly asks to read a file.", + parameters={ + "path": { + **PARAM_STRING, + "description": "Path to file (relative to project root or absolute)", + }, + "limit": { + "type": "integer", + "description": "Maximum lines to read (0 = all)", + "default": 0, + }, + }, + required=["path"], + returns=RETURN_STRING, +) + +# Write File Schema +WRITE_FILE_SCHEMA = create_tool_schema( + name="write_file", + description="Write content to a file. Use when user explicitly asks to save content.", + parameters={ + "path": { + **PARAM_STRING, + "description": "Path to file", + }, + "content": { + **PARAM_STRING, + "description": "Content to write", + }, + "append": { + **PARAM_BOOLEAN, + "description": "Append to file instead of overwrite", + "default": False, + }, + }, + required=["path", "content"], + returns=RETURN_STRING, +) + +# List Directory Schema +LIST_DIR_SCHEMA = create_tool_schema( + name="list_directory", + description="List files in a directory.", + parameters={ + "path": { + **PARAM_STRING, + "description": "Directory path (default: current)", + "default": ".", + }, + "pattern": { + **PARAM_STRING, + "description": "File pattern filter (e.g., '*.py')", + "default": "*", + }, + }, + returns=RETURN_STRING, +) + + +def _resolve_path(path: str) -> Path: + """Resolve path relative to project root.""" + from config import settings + + p = Path(path) + if p.is_absolute(): + return p + + # Try relative to project root + project_root = Path(__file__).parent.parent.parent + return project_root / p + + +def read_file(path: str, limit: int = 0) -> str: + """Read file contents.""" + try: + filepath = _resolve_path(path) + + if not filepath.exists(): + return f"Error: File not found: {path}" + + if not filepath.is_file(): + return f"Error: Path is not a file: {path}" + + content = filepath.read_text() + + if limit > 0: + lines = content.split('\n')[:limit] + content = '\n'.join(lines) + if len(content.split('\n')) == limit: + content += f"\n\n... [{limit} lines shown]" + + return content + + except Exception as exc: + logger.error("Read file failed: %s", exc) + return f"Error reading file: {exc}" + + +def write_file(path: str, content: str, append: bool = False) -> str: + """Write content to file.""" + try: + filepath = _resolve_path(path) + + # Ensure directory exists + filepath.parent.mkdir(parents=True, exist_ok=True) + + mode = "a" if append else "w" + filepath.write_text(content) + + action = "appended to" if append else "wrote" + return f"Successfully {action} {filepath}" + + except Exception as exc: + logger.error("Write file failed: %s", exc) + return f"Error writing file: {exc}" + + +def list_directory(path: str = ".", pattern: str = "*") -> str: + """List directory contents.""" + try: + dirpath = _resolve_path(path) + + if not dirpath.exists(): + return f"Error: Directory not found: {path}" + + if not dirpath.is_dir(): + return f"Error: Path is not a directory: {path}" + + items = list(dirpath.glob(pattern)) + + files = [] + dirs = [] + + for item in items: + if item.is_dir(): + dirs.append(f"📁 {item.name}/") + else: + size = item.stat().st_size + size_str = f"{size}B" if size < 1024 else f"{size//1024}KB" + files.append(f"📄 {item.name} ({size_str})") + + result = [f"Contents of {dirpath}:", ""] + result.extend(sorted(dirs)) + result.extend(sorted(files)) + + return "\n".join(result) + + except Exception as exc: + logger.error("List directory failed: %s", exc) + return f"Error listing directory: {exc}" + + +# Register with MCP +register_tool(name="read_file", schema=READ_FILE_SCHEMA, category="files")(read_file) +register_tool( + name="write_file", + schema=WRITE_FILE_SCHEMA, + category="files", + requires_confirmation=True, +)(write_file) +register_tool(name="list_directory", schema=LIST_DIR_SCHEMA, category="files")(list_directory) diff --git a/src/tools/memory_tool.py b/src/tools/memory_tool.py new file mode 100644 index 0000000..cb31438 --- /dev/null +++ b/src/tools/memory_tool.py @@ -0,0 +1,70 @@ +"""Memory search tool. + +MCP-compliant tool for searching Timmy's memory. +""" + +import logging +from typing import Any + +from mcp.registry import register_tool +from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_INTEGER, RETURN_STRING + +logger = logging.getLogger(__name__) + + +MEMORY_SEARCH_SCHEMA = create_tool_schema( + name="memory_search", + description="Search Timmy's memory for past conversations, facts, and context. Use when user asks about previous discussions or when you need to recall something from memory.", + parameters={ + "query": { + **PARAM_STRING, + "description": "What to search for in memory", + }, + "top_k": { + **PARAM_INTEGER, + "description": "Number of results to return (1-10)", + "default": 5, + "minimum": 1, + "maximum": 10, + }, + }, + required=["query"], + returns=RETURN_STRING, +) + + +def memory_search(query: str, top_k: int = 5) -> str: + """Search Timmy's memory. + + Args: + query: Search query + top_k: Number of results + + Returns: + Relevant memories from past conversations + """ + try: + from timmy.semantic_memory import memory_search as semantic_search + + results = semantic_search(query, top_k=top_k) + + if not results: + return "No relevant memories found." + + formatted = ["Relevant memories from past conversations:", ""] + + for i, (content, score) in enumerate(results, 1): + relevance = "🔥" if score > 0.8 else "⭐" if score > 0.5 else "📄" + formatted.append(f"{relevance} [{i}] (score: {score:.2f})") + formatted.append(f" {content[:300]}...") + formatted.append("") + + return "\n".join(formatted) + + except Exception as exc: + logger.error("Memory search failed: %s", exc) + return f"Memory search error: {exc}" + + +# Register with MCP +register_tool(name="memory_search", schema=MEMORY_SEARCH_SCHEMA, category="memory")(memory_search) diff --git a/src/tools/web_search.py b/src/tools/web_search.py new file mode 100644 index 0000000..9bcae37 --- /dev/null +++ b/src/tools/web_search.py @@ -0,0 +1,74 @@ +"""Web search tool using DuckDuckGo. + +MCP-compliant tool for searching the web. +""" + +import logging +from typing import Any + +from mcp.registry import register_tool +from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_INTEGER, RETURN_STRING + +logger = logging.getLogger(__name__) + + +WEB_SEARCH_SCHEMA = create_tool_schema( + name="web_search", + description="Search the web using DuckDuckGo. Use for current events, news, real-time data, and information not in your training data.", + parameters={ + "query": { + **PARAM_STRING, + "description": "Search query string", + }, + "max_results": { + **PARAM_INTEGER, + "description": "Maximum number of results (1-10)", + "default": 5, + "minimum": 1, + "maximum": 10, + }, + }, + required=["query"], + returns=RETURN_STRING, +) + + +def web_search(query: str, max_results: int = 5) -> str: + """Search the web using DuckDuckGo. + + Args: + query: Search query + max_results: Maximum results to return + + Returns: + Formatted search results + """ + try: + from duckduckgo_search import DDGS + + with DDGS() as ddgs: + results = list(ddgs.text(query, max_results=max_results)) + + if not results: + return "No results found." + + formatted = [] + for i, r in enumerate(results, 1): + title = r.get("title", "No title") + body = r.get("body", "No description") + href = r.get("href", "") + formatted.append(f"{i}. {title}\n {body[:150]}...\n {href}") + + return "\n\n".join(formatted) + + except Exception as exc: + logger.error("Web search failed: %s", exc) + return f"Search error: {exc}" + + +# Register with MCP +register_tool( + name="web_search", + schema=WEB_SEARCH_SCHEMA, + category="research", +)(web_search)