"""YAML-driven agent factory. Reads config/agents.yaml and builds agent instances from a single seed class (SubAgent). All agent differentiation lives in YAML — no Python changes needed to add, remove, or reconfigure agents. Usage: from timmy.agents.loader import load_agents, get_agent, list_agents from timmy.agents.loader import get_routing_config, route_request agents = load_agents() # dict of agent_id -> SubAgent forge = get_agent("coder") # single agent by id target = route_request("fix bug") # pattern-based routing """ from __future__ import annotations import logging import re from pathlib import Path from typing import Any import yaml from config import settings logger = logging.getLogger(__name__) # Module-level cache _agents: dict[str, Any] | None = None _config: dict[str, Any] | None = None # Default config path (relative to repo root) _CONFIG_FILENAME = "config/agents.yaml" def _find_config_path() -> Path: """Locate agents.yaml relative to the repo root.""" repo_root = Path(settings.repo_root) config_path = repo_root / _CONFIG_FILENAME if not config_path.exists(): raise FileNotFoundError( f"Agent config not found: {config_path}\nCreate {_CONFIG_FILENAME} in your repo root." ) return config_path def _load_config(force_reload: bool = False) -> dict[str, Any]: """Load and cache the agents.yaml config.""" global _config if _config is not None and not force_reload: return _config config_path = _find_config_path() with open(config_path) as f: _config = yaml.safe_load(f) logger.info("Loaded agent config from %s", config_path) return _config def _resolve_model(agent_model: str | None, defaults: dict) -> str: """Resolve agent model, falling back to defaults then settings.""" if agent_model: return agent_model default_model = defaults.get("model") if default_model: return default_model return settings.ollama_model def _resolve_prompt_tier(agent_tier: str | None, defaults: dict) -> str: """Resolve prompt tier, falling back to defaults.""" return agent_tier or defaults.get("prompt_tier", "lite") def _build_system_prompt(agent_cfg: dict, prompt_tier: str) -> str: """Build the full system prompt for an agent. Combines the agent's custom prompt with the appropriate base prompt (full or lite) from the prompts module. """ from timmy.prompts import get_system_prompt # Get base prompt for the tier tools_enabled = prompt_tier == "full" base_prompt = get_system_prompt(tools_enabled=tools_enabled) # Prepend the agent's custom prompt custom_prompt = agent_cfg.get("prompt", "").strip() if custom_prompt: return f"{custom_prompt}\n\n{base_prompt}" return base_prompt def load_agents(force_reload: bool = False) -> dict[str, Any]: """Load all agents from YAML config. Returns a dict of agent_id -> SubAgent instances. Agents are cached after first load; pass force_reload=True to re-read. """ global _agents if _agents is not None and not force_reload: return _agents from timmy.agents.base import SubAgent config = _load_config(force_reload=force_reload) defaults = config.get("defaults", {}) agents_cfg = config.get("agents", {}) _agents = {} for agent_id, agent_cfg in agents_cfg.items(): model = _resolve_model(agent_cfg.get("model"), defaults) prompt_tier = _resolve_prompt_tier(agent_cfg.get("prompt_tier"), defaults) system_prompt = _build_system_prompt(agent_cfg, prompt_tier) max_history = agent_cfg.get("max_history", defaults.get("max_history", 10)) tools = agent_cfg.get("tools", defaults.get("tools", [])) agent = SubAgent( agent_id=agent_id, name=agent_cfg.get("name", agent_id.title()), role=agent_cfg.get("role", "general"), system_prompt=system_prompt, tools=tools, model=model, max_history=max_history, ) _agents[agent_id] = agent logger.info( "Loaded agent: %s (model=%s, tools=%d, tier=%s)", agent_id, model, len(tools), prompt_tier, ) logger.info("Total agents loaded: %d", len(_agents)) return _agents def get_agent(agent_id: str) -> Any: """Get a single agent by ID. Loads config if not already loaded.""" agents = load_agents() agent = agents.get(agent_id) if agent is None: available = ", ".join(sorted(agents.keys())) raise KeyError(f"Unknown agent: {agent_id!r}. Available: {available}") return agent def list_agents() -> list[dict[str, Any]]: """List all agents with their metadata (for tools_intro, delegation, etc.).""" config = _load_config() defaults = config.get("defaults", {}) agents_cfg = config.get("agents", {}) result = [] for agent_id, agent_cfg in agents_cfg.items(): result.append( { "id": agent_id, "name": agent_cfg.get("name", agent_id.title()), "role": agent_cfg.get("role", "general"), "model": _resolve_model(agent_cfg.get("model"), defaults), "tools": agent_cfg.get("tools", defaults.get("tools", [])), "status": "available", } ) return result # ── Routing ──────────────────────────────────────────────────────────────── def get_routing_config() -> dict[str, Any]: """Get the routing configuration.""" config = _load_config() return config.get("routing", {"method": "pattern", "patterns": {}}) def _matches_pattern(pattern: str, message: str) -> bool: """Check if a pattern matches using word-boundary matching. For single-word patterns, uses \b word boundaries. For multi-word patterns, all words must appear as whole words (in any order). """ pattern_lower = pattern.lower() message_lower = message.lower() words = pattern_lower.split() for word in words: # Use word boundary regex to match whole words only if not re.search(rf"\b{re.escape(word)}\b", message_lower): return False return True def route_request(user_message: str) -> str | None: """Route a user request to an agent using pattern matching. Returns the agent_id of the best match, or None if no pattern matches (meaning the orchestrator should handle it directly). """ routing = get_routing_config() if routing.get("method") != "pattern": return None patterns = routing.get("patterns", {}) for agent_id, keywords in patterns.items(): for keyword in keywords: if _matches_pattern(keyword, user_message): logger.debug("Routed to %s (matched: %r)", agent_id, keyword) return agent_id return None def route_request_with_match(user_message: str) -> tuple[str | None, str | None]: """Route a user request and return both the agent and the matched pattern. Returns a tuple of (agent_id, matched_pattern). If no match, returns (None, None). """ routing = get_routing_config() if routing.get("method") != "pattern": return None, None patterns = routing.get("patterns", {}) for agent_id, keywords in patterns.items(): for keyword in keywords: if _matches_pattern(keyword, user_message): return agent_id, keyword return None, None def reload_agents() -> dict[str, Any]: """Force reload agents from YAML. Call after editing agents.yaml.""" global _agents, _config _agents = None _config = None return load_agents(force_reload=True)