1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/timmy/agents/base.py
2026-03-15 09:38:21 -04:00

222 lines
6.9 KiB
Python

"""Base agent class and configurable SubAgent.
BaseAgent provides:
- MCP tool registry access
- Event bus integration
- Memory integration
- Structured logging
SubAgent is the single seed class for ALL agents. Differentiation
comes entirely from config (agents.yaml), not from Python subclasses.
"""
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Any
import httpx
from agno.agent import Agent
from agno.models.ollama import Ollama
from config import settings
from infrastructure.events.bus import Event, EventBus
try:
from mcp.registry import tool_registry
except ImportError:
tool_registry = None
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
"""Base class for all agents."""
def __init__(
self,
agent_id: str,
name: str,
role: str,
system_prompt: str,
tools: list[str] | None = None,
model: str | None = None,
max_history: int = 10,
) -> None:
self.agent_id = agent_id
self.name = name
self.role = role
self.tools = tools or []
self.model = model or settings.ollama_model
self.max_history = max_history
# Create Agno agent
self.system_prompt = system_prompt
self.agent = self._create_agent(system_prompt)
# Event bus for communication
self.event_bus: EventBus | None = None
logger.info(
"%s agent initialized (id: %s, model: %s)",
name,
agent_id,
self.model,
)
def _create_agent(self, system_prompt: str) -> Agent:
"""Create the underlying Agno agent with per-agent model."""
# Get tools from registry
tool_instances = []
if tool_registry is not None:
for tool_name in self.tools:
handler = tool_registry.get_handler(tool_name)
if handler:
tool_instances.append(handler)
ollama_kwargs = {}
if settings.ollama_num_ctx > 0:
ollama_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
return Agent(
name=self.name,
model=Ollama(id=self.model, host=settings.ollama_url, timeout=300, **ollama_kwargs),
description=system_prompt,
tools=tool_instances if tool_instances else None,
add_history_to_context=True,
num_history_runs=self.max_history,
markdown=False,
telemetry=settings.telemetry_enabled,
)
def connect_event_bus(self, bus: EventBus) -> None:
"""Connect to the event bus for inter-agent communication."""
self.event_bus = bus
# Subscribe to relevant events
bus.subscribe(f"agent.{self.agent_id}.*")(self._handle_direct_message)
bus.subscribe("agent.task.assigned")(self._handle_task_assignment)
async def _handle_direct_message(self, event: Event) -> None:
"""Handle direct messages to this agent."""
logger.debug("%s received message: %s", self.name, event.type)
async def _handle_task_assignment(self, event: Event) -> None:
"""Handle task assignment events."""
assigned_agent = event.data.get("agent_id")
if assigned_agent == self.agent_id:
task_id = event.data.get("task_id")
description = event.data.get("description", "")
logger.info("%s assigned task %s: %s", self.name, task_id, description[:50])
# Execute the task
await self.execute_task(task_id, description, event.data)
@abstractmethod
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a task assigned to this agent.
Must be implemented by subclasses.
"""
pass
async def run(self, message: str) -> str:
"""Run the agent with a message.
Returns:
Agent response
"""
max_retries = 3
last_exception = None
for attempt in range(1, max_retries + 1):
try:
result = self.agent.run(message, stream=False)
response = result.content if hasattr(result, "content") else str(result)
break # Success, exit the retry loop
except (httpx.ConnectError, httpx.ReadError, ConnectionError) as exc:
logger.error("Ollama disconnected: %s", exc)
raise
except Exception as exc:
last_exception = exc
if attempt < max_retries:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
await asyncio.sleep(1)
else:
logger.error(
"Agent run failed after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
# Emit completion event
if self.event_bus:
await self.event_bus.publish(
Event(
type=f"agent.{self.agent_id}.response",
source=self.agent_id,
data={"input": message, "output": response},
)
)
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools
def get_status(self) -> dict:
"""Get current agent status."""
return {
"agent_id": self.agent_id,
"name": self.name,
"role": self.role,
"model": self.model,
"status": "ready",
"tools": self.tools,
}
class SubAgent(BaseAgent):
"""Concrete agent — the single seed class for all agents.
Every agent in the system is an instance of SubAgent, differentiated
only by the config values passed in from agents.yaml. No subclassing
needed — add new agents by editing YAML, not Python.
"""
def __init__(
self,
agent_id: str,
name: str,
role: str,
system_prompt: str,
tools: list[str] | None = None,
model: str | None = None,
max_history: int = 10,
) -> None:
super().__init__(
agent_id=agent_id,
name=name,
role=role,
system_prompt=system_prompt,
tools=tools,
model=model,
max_history=max_history,
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:
"""Execute a task by running the agent with the description."""
result = await self.run(description)
return {
"task_id": task_id,
"agent": self.agent_id,
"result": result,
"status": "completed",
}