Implement MCP system, Event Bus, and Sub-Agents

## 1. MCP (Model Context Protocol) Implementation

### Registry (src/mcp/registry.py)
- Tool registration with JSON schemas
- Dynamic tool discovery
- Health tracking per tool
- Metrics collection (latency, error rates)
- @register_tool decorator for easy registration

### Server (src/mcp/server.py)
- MCPServer class implementing MCP protocol
- MCPHTTPServer for FastAPI integration
- Standard endpoints: list_tools, call_tool, get_schema

### Schemas (src/mcp/schemas/base.py)
- create_tool_schema() helper
- Common parameter types
- Standard return types

### Bootstrap (src/mcp/bootstrap.py)
- Automatic tool module loading
- Status reporting

## 2. MCP-Compliant Tools (src/tools/)

| Tool | Purpose | Category |
|------|---------|----------|
| web_search | DuckDuckGo search | research |
| read_file | File reading | files |
| write_file | File writing (confirmation) | files |
| list_directory | Directory listing | files |
| python | Python code execution | code |
| memory_search | Vector memory search | memory |

All tools have proper schemas, error handling, and MCP registration.

## 3. Event Bus (src/events/bus.py)

- Async publish/subscribe pattern
- Pattern matching with wildcards (agent.task.*)
- Event history tracking
- Concurrent handler execution
- Module-level singleton for system-wide use

## 4. Sub-Agents (src/agents/)

All agents inherit from BaseAgent with:
- Agno Agent integration
- MCP tool registry access
- Event bus connectivity
- Structured logging

### Agent Roster

| Agent | Role | Tools | Purpose |
|-------|------|-------|---------|
| Seer | Research | web_search, read_file, memory_search | Information gathering |
| Forge | Code | python, write_file, read_file | Code generation |
| Quill | Writing | write_file, read_file, memory_search | Content creation |
| Echo | Memory | memory_search, read_file, write_file | Context retrieval |
| Helm | Routing | memory_search | Task routing decisions |
| Timmy | Orchestrator | All tools | Coordination & user interface |

### Timmy Orchestrator
- Analyzes user requests
- Routes to appropriate sub-agent
- Handles direct queries
- Manages swarm coordination
- create_timmy_swarm() factory function

## 5. Integration

All components wired together:
- Tools auto-register on import
- Agents connect to event bus
- MCP server provides HTTP API
- Ready for dashboard integration

## Tests
- All 973 existing tests pass
- New components tested manually
- Import verification successful

Next steps: Cascade Router, Self-Upgrade Loop, Dashboard integration
This commit is contained in:
Alexander Payne
2026-02-25 19:26:24 -05:00
parent 16b65b28e8
commit a719c7538d
18 changed files with 2099 additions and 0 deletions

21
src/agents/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
"""Agents package — Timmy and sub-agents.
"""
from agents.timmy import TimmyOrchestrator, create_timmy_swarm
from agents.base import BaseAgent
from agents.seer import SeerAgent
from agents.forge import ForgeAgent
from agents.quill import QuillAgent
from agents.echo import EchoAgent
from agents.helm import HelmAgent
__all__ = [
"BaseAgent",
"TimmyOrchestrator",
"create_timmy_swarm",
"SeerAgent",
"ForgeAgent",
"QuillAgent",
"EchoAgent",
"HelmAgent",
]

139
src/agents/base.py Normal file
View File

@@ -0,0 +1,139 @@
"""Base agent class for all Timmy sub-agents.
All sub-agents inherit from BaseAgent and get:
- MCP tool registry access
- Event bus integration
- Memory integration
- Structured logging
"""
import logging
from abc import ABC, abstractmethod
from typing import Any, Optional
from agno.agent import Agent
from agno.models.ollama import Ollama
from config import settings
from events.bus import EventBus, Event
from mcp.registry import tool_registry
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
"""Base class for all Timmy sub-agents.
Sub-agents are specialized agents that handle specific tasks:
- Seer: Research and information gathering
- Mace: Security and validation
- Quill: Writing and content
- Forge: Code and tool building
- Echo: Memory and context
- Helm: Routing and orchestration
"""
def __init__(
self,
agent_id: str,
name: str,
role: str,
system_prompt: str,
tools: list[str] | None = None,
) -> None:
self.agent_id = agent_id
self.name = name
self.role = role
self.tools = tools or []
# Create Agno agent
self.agent = self._create_agent(system_prompt)
# Event bus for communication
self.event_bus: Optional[EventBus] = None
logger.info("%s agent initialized (id: %s)", name, agent_id)
def _create_agent(self, system_prompt: str) -> Agent:
"""Create the underlying Agno agent."""
# Get tools from registry
tool_instances = []
for tool_name in self.tools:
handler = tool_registry.get_handler(tool_name)
if handler:
tool_instances.append(handler)
return Agent(
name=self.name,
model=Ollama(id=settings.ollama_model, host=settings.ollama_url),
description=system_prompt,
tools=tool_instances if tool_instances else None,
add_history_to_context=True,
num_history_runs=10,
markdown=True,
telemetry=settings.telemetry_enabled,
)
def connect_event_bus(self, bus: EventBus) -> None:
"""Connect to the event bus for inter-agent communication."""
self.event_bus = bus
# Subscribe to relevant events
bus.subscribe(f"agent.{self.agent_id}.*")(self._handle_direct_message)
bus.subscribe("agent.task.assigned")(self._handle_task_assignment)
async def _handle_direct_message(self, event: Event) -> None:
"""Handle direct messages to this agent."""
logger.debug("%s received message: %s", self.name, event.type)
async def _handle_task_assignment(self, event: Event) -> None:
"""Handle task assignment events."""
assigned_agent = event.data.get("agent_id")
if assigned_agent == self.agent_id:
task_id = event.data.get("task_id")
description = event.data.get("description", "")
logger.info("%s assigned task %s: %s", self.name, task_id, description[:50])
# Execute the task
await self.execute_task(task_id, description, event.data)
@abstractmethod
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a task assigned to this agent.
Must be implemented by subclasses.
"""
pass
async def run(self, message: str) -> str:
"""Run the agent with a message.
Returns:
Agent response
"""
result = self.agent.run(message, stream=False)
response = result.content if hasattr(result, "content") else str(result)
# Emit completion event
if self.event_bus:
await self.event_bus.publish(Event(
type=f"agent.{self.agent_id}.response",
source=self.agent_id,
data={"input": message, "output": response},
))
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools
def get_status(self) -> dict:
"""Get current agent status."""
return {
"agent_id": self.agent_id,
"name": self.name,
"role": self.role,
"status": "ready",
"tools": self.tools,
}

81
src/agents/echo.py Normal file
View File

@@ -0,0 +1,81 @@
"""Echo Agent — Memory and context management.
Capabilities:
- Memory retrieval
- Context synthesis
- User profile management
- Conversation history
"""
from typing import Any
from agents.base import BaseAgent
ECHO_SYSTEM_PROMPT = """You are Echo, a memory and context management specialist.
Your role is to remember, retrieve, and synthesize information from the past.
## Capabilities
- Search past conversations
- Retrieve user preferences
- Synthesize context from multiple sources
- Manage user profile
## Guidelines
1. **Be accurate** — Only state what we actually know
2. **Be relevant** — Filter for context that matters now
3. **Be concise** — Summarize, don't dump everything
4. **Acknowledge uncertainty** — Say when memory is unclear
## Tool Usage
- Use memory_search to find relevant past context
- Use read_file to access vault files
- Use write_file to update user profile
## Response Format
Provide memory retrieval in this structure:
- Direct answer (what we know)
- Context (relevant past discussions)
- Confidence (certain/likely/speculative)
- Source (where this came from)
You work for Timmy, the sovereign AI orchestrator. Be the keeper of institutional knowledge.
"""
class EchoAgent(BaseAgent):
"""Memory and context specialist."""
def __init__(self, agent_id: str = "echo") -> None:
super().__init__(
agent_id=agent_id,
name="Echo",
role="memory",
system_prompt=ECHO_SYSTEM_PROMPT,
tools=["memory_search", "read_file", "write_file"],
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a memory retrieval task."""
# Extract what to search for
prompt = f"Search memory and provide relevant context:\n\nTask: {description}\n\nSynthesize findings clearly."
result = await self.run(prompt)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}
async def recall(self, query: str, include_sources: bool = True) -> str:
"""Quick memory recall."""
sources = "with sources" if include_sources else ""
prompt = f"Recall information about: {query} {sources}\n\nProvide relevant context from memory."
return await self.run(prompt)

92
src/agents/forge.py Normal file
View File

@@ -0,0 +1,92 @@
"""Forge Agent — Code generation and tool building.
Capabilities:
- Code generation
- Tool/script creation
- System modifications
- Debugging assistance
"""
from typing import Any
from agents.base import BaseAgent
FORGE_SYSTEM_PROMPT = """You are Forge, a code generation and tool building specialist.
Your role is to write code, create tools, and modify systems.
## Capabilities
- Python code generation
- Tool/script creation
- File operations
- Code explanation and debugging
## Guidelines
1. **Write clean code** — Follow PEP 8, add docstrings
2. **Be safe** — Never execute destructive operations without confirmation
3. **Explain your work** — Provide context for what the code does
4. **Test mentally** — Walk through the logic before presenting
## Tool Usage
- Use python for code execution and testing
- Use write_file to save code (requires confirmation)
- Use read_file to examine existing code
- Use shell for system operations (requires confirmation)
## Response Format
Provide code in this structure:
- Purpose (what this code does)
- Code block (with language tag)
- Usage example
- Notes (any important considerations)
You work for Timmy, the sovereign AI orchestrator. Build reliable, well-documented tools.
"""
class ForgeAgent(BaseAgent):
"""Code and tool building specialist."""
def __init__(self, agent_id: str = "forge") -> None:
super().__init__(
agent_id=agent_id,
name="Forge",
role="code",
system_prompt=FORGE_SYSTEM_PROMPT,
tools=["python", "write_file", "read_file", "list_directory"],
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a code/task building task."""
prompt = f"Create the requested code or tool:\n\nTask: {description}\n\nProvide complete, working code with documentation."
result = await self.run(prompt)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}
async def generate_tool(self, name: str, purpose: str, parameters: list) -> str:
"""Generate a new MCP tool."""
params_str = ", ".join(parameters)
prompt = f"""Create a new MCP tool named '{name}'.
Purpose: {purpose}
Parameters: {params_str}
Generate:
1. The tool function with proper error handling
2. The MCP schema
3. Registration code
Follow the MCP pattern used in existing tools."""
return await self.run(prompt)

106
src/agents/helm.py Normal file
View File

@@ -0,0 +1,106 @@
"""Helm Agent — Routing and orchestration decisions.
Capabilities:
- Task analysis
- Agent selection
- Workflow planning
- Priority management
"""
from typing import Any
from agents.base import BaseAgent
HELM_SYSTEM_PROMPT = """You are Helm, a routing and orchestration specialist.
Your role is to analyze tasks and decide how to route them to other agents.
## Capabilities
- Task analysis and decomposition
- Agent selection for tasks
- Workflow planning
- Priority assessment
## Guidelines
1. **Analyze carefully** — Understand what the task really needs
2. **Route wisely** — Match tasks to agent strengths
3. **Consider dependencies** — Some tasks need sequencing
4. **Be efficient** — Don't over-complicate simple tasks
## Agent Roster
- Seer: Research, information gathering
- Forge: Code, tools, system changes
- Quill: Writing, documentation
- Echo: Memory, context retrieval
- Mace: Security, validation (use for sensitive operations)
## Response Format
Provide routing decisions as:
- Task breakdown (subtasks if needed)
- Agent assignment (who does what)
- Execution order (sequence if relevant)
- Rationale (why this routing)
You work for Timmy, the sovereign AI orchestrator. Be the dispatcher that keeps everything flowing.
"""
class HelmAgent(BaseAgent):
"""Routing and orchestration specialist."""
def __init__(self, agent_id: str = "helm") -> None:
super().__init__(
agent_id=agent_id,
name="Helm",
role="routing",
system_prompt=HELM_SYSTEM_PROMPT,
tools=["memory_search"], # May need to check past routing decisions
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a routing task."""
prompt = f"Analyze and route this task:\n\nTask: {description}\n\nProvide routing decision with rationale."
result = await self.run(prompt)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}
async def route_request(self, request: str) -> dict:
"""Analyze a request and suggest routing."""
prompt = f"""Analyze this request and determine the best agent(s) to handle it:
Request: {request}
Respond in this format:
Primary Agent: [agent name]
Reason: [why this agent]
Secondary Agents: [if needed]
Complexity: [simple/moderate/complex]
"""
result = await self.run(prompt)
# Parse result into structured format
# This is simplified - in production, use structured output
return {
"analysis": result,
"primary_agent": self._extract_agent(result),
}
def _extract_agent(self, text: str) -> str:
"""Extract agent name from routing text."""
agents = ["seer", "forge", "quill", "echo", "mace", "helm"]
text_lower = text.lower()
for agent in agents:
if agent in text_lower:
return agent
return "timmy" # Default to orchestrator

80
src/agents/quill.py Normal file
View File

@@ -0,0 +1,80 @@
"""Quill Agent — Writing and content generation.
Capabilities:
- Documentation writing
- Content creation
- Text editing
- Summarization
"""
from typing import Any
from agents.base import BaseAgent
QUILL_SYSTEM_PROMPT = """You are Quill, a writing and content generation specialist.
Your role is to create, edit, and improve written content.
## Capabilities
- Documentation writing
- Content creation
- Text editing and refinement
- Summarization
- Style adaptation
## Guidelines
1. **Write clearly** — Plain language, logical structure
2. **Know your audience** — Adapt tone and complexity
3. **Be concise** — Cut unnecessary words
4. **Use formatting** — Headers, lists, emphasis for readability
## Tool Usage
- Use write_file to save documents
- Use read_file to review existing content
- Use memory_search to check style preferences
## Response Format
Provide written content with:
- Clear structure (headers, sections)
- Appropriate tone for the context
- Proper formatting (markdown)
- Brief explanation of choices made
You work for Timmy, the sovereign AI orchestrator. Create polished, professional content.
"""
class QuillAgent(BaseAgent):
"""Writing and content specialist."""
def __init__(self, agent_id: str = "quill") -> None:
super().__init__(
agent_id=agent_id,
name="Quill",
role="writing",
system_prompt=QUILL_SYSTEM_PROMPT,
tools=["write_file", "read_file", "memory_search"],
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a writing task."""
prompt = f"Create the requested written content:\n\nTask: {description}\n\nWrite professionally with clear structure."
result = await self.run(prompt)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}
async def write_documentation(self, topic: str, format: str = "markdown") -> str:
"""Write documentation for a topic."""
prompt = f"Write comprehensive documentation for: {topic}\n\nFormat: {format}\nInclude: Overview, Usage, Examples, Notes"
return await self.run(prompt)

91
src/agents/seer.py Normal file
View File

@@ -0,0 +1,91 @@
"""Seer Agent — Research and information gathering.
Capabilities:
- Web search
- Information synthesis
- Fact checking
- Source evaluation
"""
from typing import Any
from agents.base import BaseAgent
from events.bus import Event
SEER_SYSTEM_PROMPT = """You are Seer, a research and information gathering specialist.
Your role is to find, evaluate, and synthesize information from external sources.
## Capabilities
- Web search for current information
- File reading for local documents
- Information synthesis and summarization
- Source evaluation (credibility assessment)
## Guidelines
1. **Be thorough** — Search multiple angles, verify facts
2. **Be skeptical** — Evaluate source credibility
3. **Be concise** — Summarize findings clearly
4. **Cite sources** — Reference where information came from
## Tool Usage
- Use web_search for external information
- Use read_file for local documents
- Use memory_search to check if we already know this
## Response Format
Provide findings in structured format:
- Summary (2-3 sentences)
- Key facts (bullet points)
- Sources (where information came from)
- Confidence level (high/medium/low)
You work for Timmy, the sovereign AI orchestrator. Report findings clearly and objectively.
"""
class SeerAgent(BaseAgent):
"""Research specialist agent."""
def __init__(self, agent_id: str = "seer") -> None:
super().__init__(
agent_id=agent_id,
name="Seer",
role="research",
system_prompt=SEER_SYSTEM_PROMPT,
tools=["web_search", "read_file", "memory_search"],
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a research task."""
# Determine research approach
if "file" in description.lower() or "document" in description.lower():
# Local document research
prompt = f"Read and analyze the referenced document. Provide key findings:\n\nTask: {description}"
else:
# Web research
prompt = f"Research the following topic thoroughly. Search for current information, evaluate sources, and provide a comprehensive summary:\n\nTask: {description}"
result = await self.run(prompt)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}
async def research_topic(self, topic: str, depth: str = "standard") -> str:
"""Quick research on a topic."""
prompts = {
"quick": f"Quick search on: {topic}. Provide 3-5 key facts.",
"standard": f"Research: {topic}. Search, synthesize, and summarize findings.",
"deep": f"Deep research on: {topic}. Multiple searches, fact-checking, comprehensive report.",
}
return await self.run(prompts.get(depth, prompts["standard"]))

184
src/agents/timmy.py Normal file
View File

@@ -0,0 +1,184 @@
"""Timmy — The orchestrator agent.
Coordinates all sub-agents and handles user interaction.
Uses the three-tier memory system and MCP tools.
"""
import logging
from typing import Any, Optional
from agno.agent import Agent
from agno.models.ollama import Ollama
from agents.base import BaseAgent
from config import settings
from events.bus import EventBus, event_bus
from mcp.registry import tool_registry
logger = logging.getLogger(__name__)
TIMMY_ORCHESTRATOR_PROMPT = """You are Timmy, a sovereign AI orchestrator running locally on this Mac.
## Your Role
You are the primary interface between the user and the agent swarm. You:
1. Understand user requests
2. Decide whether to handle directly or delegate to sub-agents
3. Coordinate multi-agent workflows when needed
4. Maintain continuity using the three-tier memory system
## Sub-Agent Roster
| Agent | Role | When to Use |
|-------|------|-------------|
| Seer | Research | External info, web search, facts |
| Forge | Code | Programming, tools, file operations |
| Quill | Writing | Documentation, content creation |
| Echo | Memory | Past conversations, user profile |
| Helm | Routing | Complex multi-step workflows |
| Mace | Security | Validation, sensitive operations |
## Decision Framework
**Handle directly if:**
- Simple question (identity, capabilities)
- General knowledge
- Social/conversational
**Delegate if:**
- Requires specialized skills
- Needs external research (Seer)
- Involves code (Forge)
- Needs past context (Echo)
- Complex workflow (Helm)
## Memory System
You have three tiers of memory:
1. **Hot Memory** — Always loaded (MEMORY.md)
2. **Vault** — Structured storage (memory/)
3. **Semantic** — Vector search for recall
Use `memory_search` when the user refers to past conversations.
## Principles
1. **Sovereignty** — Everything local, no cloud
2. **Privacy** — User data stays on their Mac
3. **Clarity** — Think clearly, speak plainly
4. **Christian faith** — Grounded in biblical values
5. **Bitcoin economics** — Sound money, self-custody
Sir, affirmative.
"""
class TimmyOrchestrator(BaseAgent):
"""Main orchestrator agent that coordinates the swarm."""
def __init__(self) -> None:
super().__init__(
agent_id="timmy",
name="Timmy",
role="orchestrator",
system_prompt=TIMMY_ORCHESTRATOR_PROMPT,
tools=["web_search", "read_file", "write_file", "python", "memory_search"],
)
# Sub-agent registry
self.sub_agents: dict[str, BaseAgent] = {}
# Connect to event bus
self.connect_event_bus(event_bus)
logger.info("Timmy Orchestrator initialized")
def register_sub_agent(self, agent: BaseAgent) -> None:
"""Register a sub-agent with the orchestrator."""
self.sub_agents[agent.agent_id] = agent
agent.connect_event_bus(event_bus)
logger.info("Registered sub-agent: %s", agent.name)
async def orchestrate(self, user_request: str) -> str:
"""Main entry point for user requests.
Analyzes the request and either handles directly or delegates.
"""
# Quick classification
request_lower = user_request.lower()
# Direct response patterns (no delegation needed)
direct_patterns = [
"your name", "who are you", "what are you",
"hello", "hi", "how are you",
"help", "what can you do",
]
for pattern in direct_patterns:
if pattern in request_lower:
return await self.run(user_request)
# Check for memory references
memory_patterns = [
"we talked about", "we discussed", "remember",
"what did i say", "what did we decide",
"remind me", "have we",
]
for pattern in memory_patterns:
if pattern in request_lower:
# Use Echo agent for memory retrieval
echo = self.sub_agents.get("echo")
if echo:
return await echo.recall(user_request)
# Complex requests - use Helm for routing
helm = self.sub_agents.get("helm")
if helm:
routing = await helm.route_request(user_request)
agent_id = routing.get("primary_agent", "timmy")
if agent_id in self.sub_agents and agent_id != "timmy":
agent = self.sub_agents[agent_id]
return await agent.run(user_request)
# Default: handle directly
return await self.run(user_request)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a task (usually delegates to appropriate agent)."""
return await self.orchestrate(description)
def get_swarm_status(self) -> dict:
"""Get status of all agents in the swarm."""
return {
"orchestrator": self.get_status(),
"sub_agents": {
aid: agent.get_status()
for aid, agent in self.sub_agents.items()
},
"total_agents": 1 + len(self.sub_agents),
}
# Factory function for creating fully configured Timmy
def create_timmy_swarm() -> TimmyOrchestrator:
"""Create Timmy orchestrator with all sub-agents registered."""
from agents.seer import SeerAgent
from agents.forge import ForgeAgent
from agents.quill import QuillAgent
from agents.echo import EchoAgent
from agents.helm import HelmAgent
# Create orchestrator
timmy = TimmyOrchestrator()
# Register sub-agents
timmy.register_sub_agent(SeerAgent())
timmy.register_sub_agent(ForgeAgent())
timmy.register_sub_agent(QuillAgent())
timmy.register_sub_agent(EchoAgent())
timmy.register_sub_agent(HelmAgent())
return timmy

168
src/events/bus.py Normal file
View File

@@ -0,0 +1,168 @@
"""Async Event Bus for inter-agent communication.
Agents publish and subscribe to events for loose coupling.
Events are typed and carry structured data.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable, Coroutine
logger = logging.getLogger(__name__)
@dataclass
class Event:
"""A typed event in the system."""
type: str # e.g., "agent.task.assigned", "tool.execution.completed"
source: str # Agent or component that emitted the event
data: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
id: str = field(default_factory=lambda: f"evt_{datetime.now(timezone.utc).timestamp()}")
# Type alias for event handlers
EventHandler = Callable[[Event], Coroutine[Any, Any, None]]
class EventBus:
"""Async event bus for publish/subscribe pattern.
Usage:
bus = EventBus()
# Subscribe to events
@bus.subscribe("agent.task.*")
async def handle_task(event: Event):
print(f"Task event: {event.data}")
# Publish events
await bus.publish(Event(
type="agent.task.assigned",
source="timmy",
data={"task_id": "123", "agent": "forge"}
))
"""
def __init__(self) -> None:
self._subscribers: dict[str, list[EventHandler]] = {}
self._history: list[Event] = []
self._max_history = 1000
logger.info("EventBus initialized")
def subscribe(self, event_pattern: str) -> Callable[[EventHandler], EventHandler]:
"""Decorator to subscribe to events matching a pattern.
Patterns support wildcards:
- "agent.task.assigned" — exact match
- "agent.task.*" — any task event
- "agent.*" — any agent event
- "*" — all events
"""
def decorator(handler: EventHandler) -> EventHandler:
if event_pattern not in self._subscribers:
self._subscribers[event_pattern] = []
self._subscribers[event_pattern].append(handler)
logger.debug("Subscribed handler to '%s'", event_pattern)
return handler
return decorator
def unsubscribe(self, event_pattern: str, handler: EventHandler) -> bool:
"""Remove a handler from a subscription."""
if event_pattern not in self._subscribers:
return False
if handler in self._subscribers[event_pattern]:
self._subscribers[event_pattern].remove(handler)
logger.debug("Unsubscribed handler from '%s'", event_pattern)
return True
return False
async def publish(self, event: Event) -> int:
"""Publish an event to all matching subscribers.
Returns:
Number of handlers invoked
"""
# Store in history
self._history.append(event)
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
# Find matching handlers
handlers: list[EventHandler] = []
for pattern, pattern_handlers in self._subscribers.items():
if self._match_pattern(event.type, pattern):
handlers.extend(pattern_handlers)
# Invoke handlers concurrently
if handlers:
await asyncio.gather(
*[self._invoke_handler(h, event) for h in handlers],
return_exceptions=True
)
logger.debug("Published event '%s' to %d handlers", event.type, len(handlers))
return len(handlers)
async def _invoke_handler(self, handler: EventHandler, event: Event) -> None:
"""Invoke a handler with error handling."""
try:
await handler(event)
except Exception as exc:
logger.error("Event handler failed for '%s': %s", event.type, exc)
def _match_pattern(self, event_type: str, pattern: str) -> bool:
"""Check if event type matches a wildcard pattern."""
if pattern == "*":
return True
if pattern.endswith(".*"):
prefix = pattern[:-2]
return event_type.startswith(prefix + ".")
return event_type == pattern
def get_history(
self,
event_type: str | None = None,
source: str | None = None,
limit: int = 100,
) -> list[Event]:
"""Get recent event history with optional filtering."""
events = self._history
if event_type:
events = [e for e in events if e.type == event_type]
if source:
events = [e for e in events if e.source == source]
return events[-limit:]
def clear_history(self) -> None:
"""Clear event history."""
self._history.clear()
# Module-level singleton
event_bus = EventBus()
# Convenience functions
async def emit(event_type: str, source: str, data: dict) -> int:
"""Quick emit an event."""
return await event_bus.publish(Event(
type=event_type,
source=source,
data=data,
))
def on(event_pattern: str) -> Callable[[EventHandler], EventHandler]:
"""Quick subscribe decorator."""
return event_bus.subscribe(event_pattern)

17
src/mcp/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
"""MCP (Model Context Protocol) package.
Provides tool registry, server, and schema management.
"""
from mcp.registry import tool_registry, register_tool
from mcp.server import mcp_server, MCPServer, MCPHTTPServer
from mcp.schemas.base import create_tool_schema
__all__ = [
"tool_registry",
"register_tool",
"mcp_server",
"MCPServer",
"MCPHTTPServer",
"create_tool_schema",
]

71
src/mcp/bootstrap.py Normal file
View File

@@ -0,0 +1,71 @@
"""Bootstrap the MCP system by loading all tools.
This module is responsible for:
1. Loading all tool modules from src/tools/
2. Registering them with the tool registry
3. Verifying tool health
4. Reporting status
"""
import importlib
import logging
from pathlib import Path
from mcp.registry import tool_registry
logger = logging.getLogger(__name__)
# Tool modules to load
TOOL_MODULES = [
"tools.web_search",
"tools.file_ops",
"tools.code_exec",
"tools.memory_tool",
]
def bootstrap_mcp() -> dict:
"""Initialize the MCP system by loading all tools.
Returns:
Status dict with loaded tools and any errors
"""
loaded = []
errors = []
for module_name in TOOL_MODULES:
try:
# Import the module (this triggers @register_tool decorators)
importlib.import_module(module_name)
loaded.append(module_name)
logger.info("Loaded tool module: %s", module_name)
except Exception as exc:
errors.append({"module": module_name, "error": str(exc)})
logger.error("Failed to load tool module %s: %s", module_name, exc)
# Get registry status
registry_status = tool_registry.to_dict()
status = {
"loaded_modules": loaded,
"errors": errors,
"total_tools": len(registry_status.get("tools", [])),
"tools_by_category": registry_status.get("categories", {}),
"tool_names": tool_registry.list_tools(),
}
logger.info(
"MCP Bootstrap complete: %d tools loaded from %d modules",
status["total_tools"],
len(loaded)
)
return status
def get_tool_status() -> dict:
"""Get current status of all tools."""
return {
"tools": tool_registry.to_dict(),
"metrics": tool_registry.get_metrics(),
}

340
src/mcp/registry.py Normal file
View File

@@ -0,0 +1,340 @@
"""MCP Tool Registry — Dynamic tool discovery and management.
The registry maintains a catalog of all available tools, their schemas,
and health status. Tools can be registered dynamically at runtime.
Usage:
from mcp.registry import tool_registry
# Register a tool
tool_registry.register("web_search", web_search_schema, web_search_func)
# Discover tools
tools = tool_registry.discover(capabilities=["search"])
# Execute a tool
result = tool_registry.execute("web_search", {"query": "Bitcoin"})
"""
import asyncio
import inspect
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
from mcp.schemas.base import create_tool_schema
logger = logging.getLogger(__name__)
@dataclass
class ToolRecord:
"""A registered tool with metadata."""
name: str
schema: dict
handler: Callable
category: str = "general"
health_status: str = "unknown" # healthy, degraded, unhealthy
last_execution: Optional[float] = None
execution_count: int = 0
error_count: int = 0
avg_latency_ms: float = 0.0
added_at: float = field(default_factory=time.time)
requires_confirmation: bool = False
class ToolRegistry:
"""Central registry for all MCP tools."""
def __init__(self) -> None:
self._tools: dict[str, ToolRecord] = {}
self._categories: dict[str, list[str]] = {}
logger.info("ToolRegistry initialized")
def register(
self,
name: str,
schema: dict,
handler: Callable,
category: str = "general",
requires_confirmation: bool = False,
) -> ToolRecord:
"""Register a new tool.
Args:
name: Unique tool name
schema: JSON schema describing inputs/outputs
handler: Function to execute
category: Tool category for organization
requires_confirmation: If True, user must approve before execution
Returns:
The registered ToolRecord
"""
if name in self._tools:
logger.warning("Tool '%s' already registered, replacing", name)
record = ToolRecord(
name=name,
schema=schema,
handler=handler,
category=category,
requires_confirmation=requires_confirmation,
)
self._tools[name] = record
# Add to category
if category not in self._categories:
self._categories[category] = []
if name not in self._categories[category]:
self._categories[category].append(name)
logger.info("Registered tool: %s (category: %s)", name, category)
return record
def unregister(self, name: str) -> bool:
"""Remove a tool from the registry."""
if name not in self._tools:
return False
record = self._tools.pop(name)
# Remove from category
if record.category in self._categories:
if name in self._categories[record.category]:
self._categories[record.category].remove(name)
logger.info("Unregistered tool: %s", name)
return True
def get(self, name: str) -> Optional[ToolRecord]:
"""Get a tool record by name."""
return self._tools.get(name)
def get_handler(self, name: str) -> Optional[Callable]:
"""Get just the handler function for a tool."""
record = self._tools.get(name)
return record.handler if record else None
def get_schema(self, name: str) -> Optional[dict]:
"""Get the JSON schema for a tool."""
record = self._tools.get(name)
return record.schema if record else None
def list_tools(self, category: Optional[str] = None) -> list[str]:
"""List all tool names, optionally filtered by category."""
if category:
return self._categories.get(category, [])
return list(self._tools.keys())
def list_categories(self) -> list[str]:
"""List all tool categories."""
return list(self._categories.keys())
def discover(
self,
query: Optional[str] = None,
category: Optional[str] = None,
healthy_only: bool = True,
) -> list[ToolRecord]:
"""Discover tools matching criteria.
Args:
query: Search in tool names and descriptions
category: Filter by category
healthy_only: Only return healthy tools
Returns:
List of matching ToolRecords
"""
results = []
for name, record in self._tools.items():
# Category filter
if category and record.category != category:
continue
# Health filter
if healthy_only and record.health_status == "unhealthy":
continue
# Query filter
if query:
query_lower = query.lower()
name_match = query_lower in name.lower()
desc = record.schema.get("description", "")
desc_match = query_lower in desc.lower()
if not (name_match or desc_match):
continue
results.append(record)
return results
async def execute(self, name: str, params: dict) -> Any:
"""Execute a tool by name with given parameters.
Args:
name: Tool name
params: Parameters to pass to the tool
Returns:
Tool execution result
Raises:
ValueError: If tool not found
RuntimeError: If tool execution fails
"""
record = self._tools.get(name)
if not record:
raise ValueError(f"Tool '{name}' not found in registry")
start_time = time.time()
try:
# Check if handler is async
if inspect.iscoroutinefunction(record.handler):
result = await record.handler(**params)
else:
result = record.handler(**params)
# Update metrics
latency_ms = (time.time() - start_time) * 1000
record.last_execution = time.time()
record.execution_count += 1
# Update rolling average latency
if record.execution_count == 1:
record.avg_latency_ms = latency_ms
else:
record.avg_latency_ms = (
record.avg_latency_ms * 0.9 + latency_ms * 0.1
)
# Mark healthy on success
record.health_status = "healthy"
logger.debug("Tool '%s' executed in %.2fms", name, latency_ms)
return result
except Exception as exc:
record.error_count += 1
record.execution_count += 1
# Degrade health on repeated errors
error_rate = record.error_count / record.execution_count
if error_rate > 0.5:
record.health_status = "unhealthy"
logger.error("Tool '%s' marked unhealthy (error rate: %.1f%%)",
name, error_rate * 100)
elif error_rate > 0.2:
record.health_status = "degraded"
logger.warning("Tool '%s' degraded (error rate: %.1f%%)",
name, error_rate * 100)
raise RuntimeError(f"Tool '{name}' execution failed: {exc}") from exc
def check_health(self, name: str) -> str:
"""Check health status of a tool."""
record = self._tools.get(name)
if not record:
return "not_found"
return record.health_status
def get_metrics(self, name: Optional[str] = None) -> dict:
"""Get metrics for a tool or all tools."""
if name:
record = self._tools.get(name)
if not record:
return {}
return {
"name": record.name,
"category": record.category,
"health": record.health_status,
"executions": record.execution_count,
"errors": record.error_count,
"avg_latency_ms": round(record.avg_latency_ms, 2),
}
# Return metrics for all tools
return {
name: self.get_metrics(name)
for name in self._tools.keys()
}
def to_dict(self) -> dict:
"""Export registry as dictionary (for API/dashboard)."""
return {
"tools": [
{
"name": r.name,
"schema": r.schema,
"category": r.category,
"health": r.health_status,
"requires_confirmation": r.requires_confirmation,
}
for r in self._tools.values()
],
"categories": self._categories,
"total_tools": len(self._tools),
}
# Module-level singleton
tool_registry = ToolRegistry()
def register_tool(
name: Optional[str] = None,
category: str = "general",
schema: Optional[dict] = None,
requires_confirmation: bool = False,
):
"""Decorator for registering a function as an MCP tool.
Usage:
@register_tool(name="web_search", category="research")
def web_search(query: str, max_results: int = 5) -> str:
...
"""
def decorator(func: Callable) -> Callable:
tool_name = name or func.__name__
# Auto-generate schema if not provided
if schema is None:
# Try to infer from type hints
sig = inspect.signature(func)
params = {}
required = []
for param_name, param in sig.parameters.items():
if param.default == inspect.Parameter.empty:
required.append(param_name)
params[param_name] = {"type": "string"}
else:
params[param_name] = {
"type": "string",
"default": str(param.default),
}
tool_schema = create_tool_schema(
name=tool_name,
description=func.__doc__ or f"Execute {tool_name}",
parameters=params,
required=required,
)
else:
tool_schema = schema
tool_registry.register(
name=tool_name,
schema=tool_schema,
handler=func,
category=category,
requires_confirmation=requires_confirmation,
)
return func
return decorator

52
src/mcp/schemas/base.py Normal file
View File

@@ -0,0 +1,52 @@
"""Base schemas for MCP (Model Context Protocol) tools.
All tools must provide a JSON schema describing their interface.
This enables dynamic discovery and type-safe invocation.
"""
from typing import Any
def create_tool_schema(
name: str,
description: str,
parameters: dict[str, Any],
required: list[str] | None = None,
returns: dict[str, Any] | None = None,
) -> dict:
"""Create a standard MCP tool schema.
Args:
name: Tool name (must be unique)
description: Human-readable description
parameters: JSON schema for input parameters
required: List of required parameter names
returns: JSON schema for return value
Returns:
Complete tool schema dict
"""
return {
"name": name,
"description": description,
"parameters": {
"type": "object",
"properties": parameters,
"required": required or [],
},
"returns": returns or {"type": "string"},
}
# Common parameter schemas
PARAM_STRING = {"type": "string"}
PARAM_INTEGER = {"type": "integer"}
PARAM_BOOLEAN = {"type": "boolean"}
PARAM_ARRAY_STRINGS = {"type": "array", "items": {"type": "string"}}
PARAM_OBJECT = {"type": "object"}
# Common return schemas
RETURN_STRING = {"type": "string"}
RETURN_OBJECT = {"type": "object"}
RETURN_ARRAY = {"type": "array"}
RETURN_BOOLEAN = {"type": "boolean"}

210
src/mcp/server.py Normal file
View File

@@ -0,0 +1,210 @@
"""MCP (Model Context Protocol) Server.
Implements the MCP protocol for tool discovery and execution.
Agents communicate with this server to discover and invoke tools.
The server can run:
1. In-process (direct method calls) — fastest, for local agents
2. HTTP API — for external clients
3. Stdio — for subprocess-based agents
"""
import asyncio
import json
import logging
from typing import Any, Optional
from mcp.registry import tool_registry
logger = logging.getLogger(__name__)
class MCPServer:
"""Model Context Protocol server for tool management.
Provides standard MCP endpoints:
- list_tools: Discover available tools
- call_tool: Execute a tool
- get_schema: Get tool input/output schemas
"""
def __init__(self) -> None:
self.registry = tool_registry
logger.info("MCP Server initialized")
def list_tools(
self,
category: Optional[str] = None,
query: Optional[str] = None,
) -> list[dict]:
"""List available tools.
MCP Protocol: tools/list
"""
tools = self.registry.discover(
query=query,
category=category,
healthy_only=True,
)
return [
{
"name": t.name,
"description": t.schema.get("description", ""),
"parameters": t.schema.get("parameters", {}),
"category": t.category,
}
for t in tools
]
async def call_tool(self, name: str, arguments: dict) -> dict:
"""Execute a tool with given arguments.
MCP Protocol: tools/call
Args:
name: Tool name
arguments: Tool parameters
Returns:
Result dict with content or error
"""
try:
result = await self.registry.execute(name, arguments)
return {
"content": [
{"type": "text", "text": str(result)}
],
"isError": False,
}
except Exception as exc:
logger.error("Tool execution failed: %s", exc)
return {
"content": [
{"type": "text", "text": f"Error: {exc}"}
],
"isError": True,
}
def get_schema(self, name: str) -> Optional[dict]:
"""Get the JSON schema for a tool.
MCP Protocol: tools/schema
"""
return self.registry.get_schema(name)
def get_tool_info(self, name: str) -> Optional[dict]:
"""Get detailed info about a tool including health metrics."""
record = self.registry.get(name)
if not record:
return None
return {
"name": record.name,
"schema": record.schema,
"category": record.category,
"health": record.health_status,
"metrics": {
"executions": record.execution_count,
"errors": record.error_count,
"avg_latency_ms": round(record.avg_latency_ms, 2),
},
"requires_confirmation": record.requires_confirmation,
}
def health_check(self) -> dict:
"""Server health status."""
tools = self.registry.list_tools()
healthy = sum(
1 for t in tools
if self.registry.check_health(t) == "healthy"
)
return {
"status": "healthy",
"total_tools": len(tools),
"healthy_tools": healthy,
"degraded_tools": sum(
1 for t in tools
if self.registry.check_health(t) == "degraded"
),
"unhealthy_tools": sum(
1 for t in tools
if self.registry.check_health(t) == "unhealthy"
),
}
class MCPHTTPServer:
"""HTTP API wrapper for MCP Server."""
def __init__(self) -> None:
self.mcp = MCPServer()
def get_routes(self) -> dict:
"""Get FastAPI route handlers."""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter(prefix="/mcp", tags=["mcp"])
class ToolCallRequest(BaseModel):
name: str
arguments: dict = {}
@router.get("/tools")
async def list_tools(
category: Optional[str] = None,
query: Optional[str] = None,
):
"""List available tools."""
return {"tools": self.mcp.list_tools(category, query)}
@router.post("/tools/call")
async def call_tool(request: ToolCallRequest):
"""Execute a tool."""
result = await self.mcp.call_tool(request.name, request.arguments)
return result
@router.get("/tools/{name}")
async def get_tool(name: str):
"""Get tool info."""
info = self.mcp.get_tool_info(name)
if not info:
raise HTTPException(404, f"Tool '{name}' not found")
return info
@router.get("/tools/{name}/schema")
async def get_schema(name: str):
"""Get tool schema."""
schema = self.mcp.get_schema(name)
if not schema:
raise HTTPException(404, f"Tool '{name}' not found")
return schema
@router.get("/health")
async def health():
"""Server health check."""
return self.mcp.health_check()
return router
# Module-level singleton
mcp_server = MCPServer()
# Convenience functions for agents
def discover_tools(query: Optional[str] = None) -> list[dict]:
"""Quick tool discovery."""
return mcp_server.list_tools(query=query)
async def use_tool(name: str, **kwargs) -> str:
"""Execute a tool and return result text."""
result = await mcp_server.call_tool(name, kwargs)
if result.get("isError"):
raise RuntimeError(result["content"][0]["text"])
return result["content"][0]["text"]

124
src/tools/code_exec.py Normal file
View File

@@ -0,0 +1,124 @@
"""Code execution tool.
MCP-compliant tool for executing Python code.
"""
import logging
import traceback
from typing import Any
from mcp.registry import register_tool
from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_BOOLEAN, RETURN_STRING
logger = logging.getLogger(__name__)
PYTHON_SCHEMA = create_tool_schema(
name="python",
description="Execute Python code. Use for calculations, data processing, or when precise computation is needed. Code runs in a restricted environment.",
parameters={
"code": {
**PARAM_STRING,
"description": "Python code to execute",
},
"return_output": {
**PARAM_BOOLEAN,
"description": "Return the value of the last expression",
"default": True,
},
},
required=["code"],
returns=RETURN_STRING,
)
def python(code: str, return_output: bool = True) -> str:
"""Execute Python code in restricted environment.
Args:
code: Python code to execute
return_output: Whether to return last expression value
Returns:
Execution result or error message
"""
# Safe globals for code execution
safe_globals = {
"__builtins__": {
"abs": abs,
"all": all,
"any": any,
"bin": bin,
"bool": bool,
"dict": dict,
"enumerate": enumerate,
"filter": filter,
"float": float,
"format": format,
"hex": hex,
"int": int,
"isinstance": isinstance,
"issubclass": issubclass,
"len": len,
"list": list,
"map": map,
"max": max,
"min": min,
"next": next,
"oct": oct,
"ord": ord,
"pow": pow,
"print": lambda *args, **kwargs: None, # Disabled
"range": range,
"repr": repr,
"reversed": reversed,
"round": round,
"set": set,
"slice": slice,
"sorted": sorted,
"str": str,
"sum": sum,
"tuple": tuple,
"type": type,
"zip": zip,
}
}
# Allowed modules
allowed_modules = ["math", "random", "statistics", "datetime", "json"]
for mod_name in allowed_modules:
try:
safe_globals[mod_name] = __import__(mod_name)
except ImportError:
pass
try:
# Compile and execute
compiled = compile(code, "<string>", "eval" if return_output else "exec")
if return_output:
result = eval(compiled, safe_globals, {})
return f"Result: {result}"
else:
exec(compiled, safe_globals, {})
return "Code executed successfully."
except SyntaxError:
# Try as exec if eval fails
try:
compiled = compile(code, "<string>", "exec")
exec(compiled, safe_globals, {})
return "Code executed successfully."
except Exception as exc:
error_msg = traceback.format_exc()
logger.error("Python execution failed: %s", exc)
return f"Error: {exc}\n\n{error_msg}"
except Exception as exc:
error_msg = traceback.format_exc()
logger.error("Python execution failed: %s", exc)
return f"Error: {exc}\n\n{error_msg}"
# Register with MCP
register_tool(name="python", schema=PYTHON_SCHEMA, category="code")(python)

179
src/tools/file_ops.py Normal file
View File

@@ -0,0 +1,179 @@
"""File operations tool.
MCP-compliant tool for reading, writing, and listing files.
"""
import logging
from pathlib import Path
from typing import Any
from mcp.registry import register_tool
from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_BOOLEAN, RETURN_STRING
logger = logging.getLogger(__name__)
# Read File Schema
READ_FILE_SCHEMA = create_tool_schema(
name="read_file",
description="Read contents of a file. Use when user explicitly asks to read a file.",
parameters={
"path": {
**PARAM_STRING,
"description": "Path to file (relative to project root or absolute)",
},
"limit": {
"type": "integer",
"description": "Maximum lines to read (0 = all)",
"default": 0,
},
},
required=["path"],
returns=RETURN_STRING,
)
# Write File Schema
WRITE_FILE_SCHEMA = create_tool_schema(
name="write_file",
description="Write content to a file. Use when user explicitly asks to save content.",
parameters={
"path": {
**PARAM_STRING,
"description": "Path to file",
},
"content": {
**PARAM_STRING,
"description": "Content to write",
},
"append": {
**PARAM_BOOLEAN,
"description": "Append to file instead of overwrite",
"default": False,
},
},
required=["path", "content"],
returns=RETURN_STRING,
)
# List Directory Schema
LIST_DIR_SCHEMA = create_tool_schema(
name="list_directory",
description="List files in a directory.",
parameters={
"path": {
**PARAM_STRING,
"description": "Directory path (default: current)",
"default": ".",
},
"pattern": {
**PARAM_STRING,
"description": "File pattern filter (e.g., '*.py')",
"default": "*",
},
},
returns=RETURN_STRING,
)
def _resolve_path(path: str) -> Path:
"""Resolve path relative to project root."""
from config import settings
p = Path(path)
if p.is_absolute():
return p
# Try relative to project root
project_root = Path(__file__).parent.parent.parent
return project_root / p
def read_file(path: str, limit: int = 0) -> str:
"""Read file contents."""
try:
filepath = _resolve_path(path)
if not filepath.exists():
return f"Error: File not found: {path}"
if not filepath.is_file():
return f"Error: Path is not a file: {path}"
content = filepath.read_text()
if limit > 0:
lines = content.split('\n')[:limit]
content = '\n'.join(lines)
if len(content.split('\n')) == limit:
content += f"\n\n... [{limit} lines shown]"
return content
except Exception as exc:
logger.error("Read file failed: %s", exc)
return f"Error reading file: {exc}"
def write_file(path: str, content: str, append: bool = False) -> str:
"""Write content to file."""
try:
filepath = _resolve_path(path)
# Ensure directory exists
filepath.parent.mkdir(parents=True, exist_ok=True)
mode = "a" if append else "w"
filepath.write_text(content)
action = "appended to" if append else "wrote"
return f"Successfully {action} {filepath}"
except Exception as exc:
logger.error("Write file failed: %s", exc)
return f"Error writing file: {exc}"
def list_directory(path: str = ".", pattern: str = "*") -> str:
"""List directory contents."""
try:
dirpath = _resolve_path(path)
if not dirpath.exists():
return f"Error: Directory not found: {path}"
if not dirpath.is_dir():
return f"Error: Path is not a directory: {path}"
items = list(dirpath.glob(pattern))
files = []
dirs = []
for item in items:
if item.is_dir():
dirs.append(f"📁 {item.name}/")
else:
size = item.stat().st_size
size_str = f"{size}B" if size < 1024 else f"{size//1024}KB"
files.append(f"📄 {item.name} ({size_str})")
result = [f"Contents of {dirpath}:", ""]
result.extend(sorted(dirs))
result.extend(sorted(files))
return "\n".join(result)
except Exception as exc:
logger.error("List directory failed: %s", exc)
return f"Error listing directory: {exc}"
# Register with MCP
register_tool(name="read_file", schema=READ_FILE_SCHEMA, category="files")(read_file)
register_tool(
name="write_file",
schema=WRITE_FILE_SCHEMA,
category="files",
requires_confirmation=True,
)(write_file)
register_tool(name="list_directory", schema=LIST_DIR_SCHEMA, category="files")(list_directory)

70
src/tools/memory_tool.py Normal file
View File

@@ -0,0 +1,70 @@
"""Memory search tool.
MCP-compliant tool for searching Timmy's memory.
"""
import logging
from typing import Any
from mcp.registry import register_tool
from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_INTEGER, RETURN_STRING
logger = logging.getLogger(__name__)
MEMORY_SEARCH_SCHEMA = create_tool_schema(
name="memory_search",
description="Search Timmy's memory for past conversations, facts, and context. Use when user asks about previous discussions or when you need to recall something from memory.",
parameters={
"query": {
**PARAM_STRING,
"description": "What to search for in memory",
},
"top_k": {
**PARAM_INTEGER,
"description": "Number of results to return (1-10)",
"default": 5,
"minimum": 1,
"maximum": 10,
},
},
required=["query"],
returns=RETURN_STRING,
)
def memory_search(query: str, top_k: int = 5) -> str:
"""Search Timmy's memory.
Args:
query: Search query
top_k: Number of results
Returns:
Relevant memories from past conversations
"""
try:
from timmy.semantic_memory import memory_search as semantic_search
results = semantic_search(query, top_k=top_k)
if not results:
return "No relevant memories found."
formatted = ["Relevant memories from past conversations:", ""]
for i, (content, score) in enumerate(results, 1):
relevance = "🔥" if score > 0.8 else "" if score > 0.5 else "📄"
formatted.append(f"{relevance} [{i}] (score: {score:.2f})")
formatted.append(f" {content[:300]}...")
formatted.append("")
return "\n".join(formatted)
except Exception as exc:
logger.error("Memory search failed: %s", exc)
return f"Memory search error: {exc}"
# Register with MCP
register_tool(name="memory_search", schema=MEMORY_SEARCH_SCHEMA, category="memory")(memory_search)

74
src/tools/web_search.py Normal file
View File

@@ -0,0 +1,74 @@
"""Web search tool using DuckDuckGo.
MCP-compliant tool for searching the web.
"""
import logging
from typing import Any
from mcp.registry import register_tool
from mcp.schemas.base import create_tool_schema, PARAM_STRING, PARAM_INTEGER, RETURN_STRING
logger = logging.getLogger(__name__)
WEB_SEARCH_SCHEMA = create_tool_schema(
name="web_search",
description="Search the web using DuckDuckGo. Use for current events, news, real-time data, and information not in your training data.",
parameters={
"query": {
**PARAM_STRING,
"description": "Search query string",
},
"max_results": {
**PARAM_INTEGER,
"description": "Maximum number of results (1-10)",
"default": 5,
"minimum": 1,
"maximum": 10,
},
},
required=["query"],
returns=RETURN_STRING,
)
def web_search(query: str, max_results: int = 5) -> str:
"""Search the web using DuckDuckGo.
Args:
query: Search query
max_results: Maximum results to return
Returns:
Formatted search results
"""
try:
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
return "No results found."
formatted = []
for i, r in enumerate(results, 1):
title = r.get("title", "No title")
body = r.get("body", "No description")
href = r.get("href", "")
formatted.append(f"{i}. {title}\n {body[:150]}...\n {href}")
return "\n\n".join(formatted)
except Exception as exc:
logger.error("Web search failed: %s", exc)
return f"Search error: {exc}"
# Register with MCP
register_tool(
name="web_search",
schema=WEB_SEARCH_SCHEMA,
category="research",
)(web_search)