"""Tool integration for the agent swarm. Provides agents with capabilities for: - File read/write (local filesystem) - Shell command execution (sandboxed) - Python code execution - Git operations - Image / Music / Video generation (creative pipeline) Tools are assigned to agents based on their specialties. """ from __future__ import annotations import ast import logging import math from collections.abc import Callable from dataclasses import dataclass, field from datetime import UTC, datetime from pathlib import Path from config import settings logger = logging.getLogger(__name__) # Lazy imports to handle test mocking _ImportError = None try: from agno.tools import Toolkit from agno.tools.file import FileTools from agno.tools.python import PythonTools from agno.tools.shell import ShellTools _AGNO_TOOLS_AVAILABLE = True except ImportError as e: _AGNO_TOOLS_AVAILABLE = False _ImportError = e # Track tool usage stats _TOOL_USAGE: dict[str, list[dict]] = {} @dataclass class ToolStats: """Statistics for a single tool.""" tool_name: str call_count: int = 0 last_used: str | None = None errors: int = 0 @dataclass class AgentTools: """Tools assigned to an agent.""" agent_id: str agent_name: str toolkit: Toolkit available_tools: list[str] = field(default_factory=list) # Backward-compat alias PersonaTools = AgentTools def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None: """Track tool usage for analytics.""" if agent_id not in _TOOL_USAGE: _TOOL_USAGE[agent_id] = [] _TOOL_USAGE[agent_id].append( { "tool": tool_name, "timestamp": datetime.now(UTC).isoformat(), "success": success, } ) def get_tool_stats(agent_id: str | None = None) -> dict: """Get tool usage statistics. Args: agent_id: Optional agent ID to filter by. If None, returns stats for all agents. Returns: Dict with tool usage statistics. """ if agent_id: usage = _TOOL_USAGE.get(agent_id, []) return { "agent_id": agent_id, "total_calls": len(usage), "tools_used": list(set(u["tool"] for u in usage)), "recent_calls": usage[-10:] if usage else [], } # Return stats for all agents all_stats = {} for aid, usage in _TOOL_USAGE.items(): all_stats[aid] = { "total_calls": len(usage), "tools_used": list(set(u["tool"] for u in usage)), } return all_stats def _safe_eval(node, allowed_names: dict): """Walk an AST and evaluate only safe numeric operations.""" if isinstance(node, ast.Expression): return _safe_eval(node.body, allowed_names) if isinstance(node, ast.Constant): if isinstance(node.value, (int, float, complex)): return node.value raise ValueError(f"Unsupported constant: {node.value!r}") if isinstance(node, ast.UnaryOp): operand = _safe_eval(node.operand, allowed_names) if isinstance(node.op, ast.UAdd): return +operand if isinstance(node.op, ast.USub): return -operand raise ValueError(f"Unsupported unary op: {type(node.op).__name__}") if isinstance(node, ast.BinOp): left = _safe_eval(node.left, allowed_names) right = _safe_eval(node.right, allowed_names) ops = { ast.Add: lambda a, b: a + b, ast.Sub: lambda a, b: a - b, ast.Mult: lambda a, b: a * b, ast.Div: lambda a, b: a / b, ast.FloorDiv: lambda a, b: a // b, ast.Mod: lambda a, b: a % b, ast.Pow: lambda a, b: a**b, } op_fn = ops.get(type(node.op)) if op_fn is None: raise ValueError(f"Unsupported binary op: {type(node.op).__name__}") return op_fn(left, right) if isinstance(node, ast.Name): if node.id in allowed_names: return allowed_names[node.id] raise ValueError(f"Unknown name: {node.id!r}") if isinstance(node, ast.Attribute): value = _safe_eval(node.value, allowed_names) # Only allow attribute access on the math module if value is math: attr = getattr(math, node.attr, None) if attr is not None: return attr raise ValueError(f"Attribute access not allowed: .{node.attr}") if isinstance(node, ast.Call): func = _safe_eval(node.func, allowed_names) if not callable(func): raise ValueError(f"Not callable: {func!r}") args = [_safe_eval(a, allowed_names) for a in node.args] kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords} return func(*args, **kwargs) raise ValueError(f"Unsupported syntax: {type(node).__name__}") def calculator(expression: str) -> str: """Evaluate a mathematical expression and return the exact result. Use this tool for ANY arithmetic: multiplication, division, square roots, exponents, percentages, logarithms, trigonometry, etc. Args: expression: A valid Python math expression, e.g. '347 * 829', 'math.sqrt(17161)', '2**10', 'math.log(100, 10)'. Returns: The exact result as a string. """ allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} allowed_names["math"] = math allowed_names["abs"] = abs allowed_names["round"] = round allowed_names["min"] = min allowed_names["max"] = max try: tree = ast.parse(expression, mode="eval") result = _safe_eval(tree, allowed_names) return str(result) except Exception as e: # broad catch intentional: arbitrary code execution return f"Error evaluating '{expression}': {e}" def _make_smart_read_file(file_tools: FileTools) -> Callable: """Wrap FileTools.read_file so directories auto-list their contents. When the user (or the LLM) passes a directory path to read_file, the raw Agno implementation throws an IsADirectoryError. This wrapper detects that case, lists the directory entries, and returns a helpful message so the model can pick the right file on its own. """ original_read = file_tools.read_file def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str: """Reads the contents of the file `file_name` and returns the contents if successful.""" # LLMs often call read_file(path=...) instead of read_file(file_name=...) if not file_name: file_name = kwargs.get("path", "") if not file_name: return "Error: no file_name or path provided." # Resolve the path the same way FileTools does _safe, resolved = file_tools.check_escape(file_name) if _safe and resolved.is_dir(): entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith(".")) listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)" return ( f"'{file_name}' is a directory, not a file. " f"Files inside:\n{listing}\n\n" "Please call read_file with one of the files listed above." ) return original_read(file_name, encoding=encoding) # Preserve the original docstring for Agno tool schema generation smart_read_file.__doc__ = original_read.__doc__ return smart_read_file def create_research_tools(base_dir: str | Path | None = None): """Create tools for the research agent (Echo). Includes: file reading """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="research") # File reading from config import settings base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit def create_code_tools(base_dir: str | Path | None = None): """Create tools for the code agent (Forge). Includes: shell commands, python execution, file read/write, Aider AI assist """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="code") # Shell commands (sandboxed) shell_tools = ShellTools() toolkit.register(shell_tools.run_shell_command, name="shell") # Python execution python_tools = PythonTools() toolkit.register(python_tools.run_python_code, name="python") # File operations from config import settings base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.save_file, name="write_file") toolkit.register(file_tools.list_files, name="list_files") # Aider AI coding assistant (local with Ollama) aider_tool = create_aider_tool(base_path) toolkit.register(aider_tool.run_aider, name="aider") return toolkit def create_aider_tool(base_path: Path): """Create an Aider tool for AI-assisted coding.""" import subprocess class AiderTool: """Tool that calls Aider (local AI coding assistant) for code generation.""" def __init__(self, base_dir: Path): self.base_dir = base_dir def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str: """Run Aider to generate code changes. Args: prompt: What you want Aider to do (e.g., "add a fibonacci function") model: Ollama model to use (default: qwen3:30b) Returns: Aider's response with the code changes made """ try: # Run aider with the prompt result = subprocess.run( [ "aider", "--no-git", "--model", f"ollama/{model}", "--quiet", prompt, ], capture_output=True, text=True, timeout=120, cwd=str(self.base_dir), ) if result.returncode == 0: return result.stdout if result.stdout else "Code changes applied successfully" else: return f"Aider error: {result.stderr}" except FileNotFoundError: return "Error: Aider not installed. Run: pip install aider" except subprocess.TimeoutExpired: return "Error: Aider timed out after 120 seconds" except (OSError, subprocess.SubprocessError) as e: return f"Error running Aider: {str(e)}" return AiderTool(base_path) def create_data_tools(base_dir: str | Path | None = None): """Create tools for the data agent (Seer). Includes: python execution, file reading, web search for data sources """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="data") # Python execution for analysis python_tools = PythonTools() toolkit.register(python_tools.run_python_code, name="python") # File reading from config import settings base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit def create_writing_tools(base_dir: str | Path | None = None): """Create tools for the writing agent (Quill). Includes: file read/write """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="writing") # File operations base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.save_file, name="write_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit def create_security_tools(base_dir: str | Path | None = None): """Create tools for the security agent (Mace). Includes: shell commands (for scanning), file read """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="security") # Shell for running security scans shell_tools = ShellTools() toolkit.register(shell_tools.run_shell_command, name="shell") # File reading for logs/configs base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit def create_devops_tools(base_dir: str | Path | None = None): """Create tools for the DevOps agent (Helm). Includes: shell commands, file read/write """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") toolkit = Toolkit(name="devops") # Shell for deployment commands shell_tools = ShellTools() toolkit.register(shell_tools.run_shell_command, name="shell") # File operations for config management base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.save_file, name="write_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit def consult_grok(query: str) -> str: """Consult Grok (xAI) for frontier reasoning on complex questions. Use this tool when a question requires advanced reasoning, real-time knowledge, or capabilities beyond the local model. Grok is a premium cloud backend — use sparingly and only for high-complexity queries. Args: query: The question or reasoning task to send to Grok. Returns: Grok's response text, or an error/status message. """ from config import settings from timmy.backends import get_grok_backend, grok_available if not grok_available(): return ( "Grok is not available. Enable with GROK_ENABLED=true " "and set XAI_API_KEY in your .env file." ) backend = get_grok_backend() # Log to Spark if available try: from spark.engine import spark_engine spark_engine.on_tool_executed( agent_id="default", tool_name="consult_grok", success=True, ) except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (consult_grok logging): %s", exc) pass # Generate Lightning invoice for monetization (unless free mode) invoice_info = "" if not settings.grok_free: try: from lightning.factory import get_backend as get_ln_backend ln = get_ln_backend() sats = min(settings.grok_max_sats_per_query, 100) inv = ln.create_invoice(sats, f"Grok query: {query[:50]}") invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]" except (ImportError, OSError, ValueError) as exc: logger.warning("Tool execution failed (Lightning invoice): %s", exc) pass result = backend.run(query) response = result.content if invoice_info: response += invoice_info return response def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None: """Register core execution and file tools.""" # Python execution python_tools = PythonTools() toolkit.register(python_tools.run_python_code, name="python") # Shell commands shell_tools = ShellTools() toolkit.register(shell_tools.run_shell_command, name="shell") # File operations file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.save_file, name="write_file") toolkit.register(file_tools.list_files, name="list_files") # Calculator — exact arithmetic (never let the LLM guess) toolkit.register(calculator, name="calculator") def _register_grok_tool(toolkit: Toolkit) -> None: """Register Grok consultation tool if available.""" try: from timmy.backends import grok_available if grok_available(): toolkit.register(consult_grok, name="consult_grok") logger.info("Grok consultation tool registered") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Grok registration): %s", exc) logger.debug("Grok tool not available") def _register_memory_tools(toolkit: Toolkit) -> None: """Register memory search, write, and forget tools.""" try: from timmy.memory_system import memory_forget, memory_read, memory_search, memory_write toolkit.register(memory_search, name="memory_search") toolkit.register(memory_write, name="memory_write") toolkit.register(memory_read, name="memory_read") toolkit.register(memory_forget, name="memory_forget") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Memory tools registration): %s", exc) logger.debug("Memory tools not available") def _register_agentic_loop_tool(toolkit: Toolkit) -> None: """Register agentic loop tool for background multi-step task execution.""" try: from timmy.agentic_loop import run_agentic_loop def plan_and_execute(task: str) -> str: """Execute a complex multi-step task in the background with progress tracking. Use this when a task requires 3 or more sequential tool calls that may take significant time. The task will run in the background and stream progress updates to the user via WebSocket. Args: task: Full description of the multi-step task to execute. Returns: Task ID and confirmation that background execution has started. """ import asyncio task_id = None async def _launch(): nonlocal task_id result = await run_agentic_loop(task) return result # Spawn as a background task on the running event loop try: asyncio.get_running_loop() future = asyncio.ensure_future(_launch()) task_id = id(future) logger.info("Agentic loop started (task=%s)", task[:80]) except RuntimeError: # No running loop — run synchronously (shouldn't happen in prod) result = asyncio.run(_launch()) return f"Task completed: {result.summary}" return ( "Background task started. I'll execute this step-by-step " "and stream progress updates. You can monitor via the dashboard." ) toolkit.register(plan_and_execute, name="plan_and_execute") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (plan_and_execute registration): %s", exc) logger.debug("plan_and_execute tool not available") def _register_introspection_tools(toolkit: Toolkit) -> None: """Register system introspection tools for runtime environment queries.""" try: from timmy.tools_intro import ( check_ollama_health, get_memory_status, get_system_info, run_self_tests, ) toolkit.register(get_system_info, name="get_system_info") toolkit.register(check_ollama_health, name="check_ollama_health") toolkit.register(get_memory_status, name="get_memory_status") toolkit.register(run_self_tests, name="run_self_tests") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Introspection tools registration): %s", exc) logger.debug("Introspection tools not available") try: from timmy.session_logger import session_history toolkit.register(session_history, name="session_history") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (session_history registration): %s", exc) logger.debug("session_history tool not available") def _register_delegation_tools(toolkit: Toolkit) -> None: """Register inter-agent delegation tools.""" try: from timmy.tools_delegation import delegate_task, delegate_to_kimi, list_swarm_agents toolkit.register(delegate_task, name="delegate_task") toolkit.register(delegate_to_kimi, name="delegate_to_kimi") toolkit.register(list_swarm_agents, name="list_swarm_agents") except Exception as exc: logger.warning("Tool execution failed (Delegation tools registration): %s", exc) logger.debug("Delegation tools not available") def _register_gematria_tool(toolkit: Toolkit) -> None: """Register the gematria computation tool.""" try: from timmy.gematria import gematria toolkit.register(gematria, name="gematria") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Gematria registration): %s", exc) logger.debug("Gematria tool not available") def _register_artifact_tools(toolkit: Toolkit) -> None: """Register artifact tools — notes and decision logging.""" try: from timmy.memory_system import jot_note, log_decision toolkit.register(jot_note, name="jot_note") toolkit.register(log_decision, name="log_decision") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Artifact tools registration): %s", exc) logger.debug("Artifact tools not available") def _register_thinking_tools(toolkit: Toolkit) -> None: """Register thinking/introspection tools for self-reflection.""" try: from timmy.thinking import search_thoughts toolkit.register(search_thoughts, name="thought_search") except (ImportError, AttributeError) as exc: logger.warning("Tool execution failed (Thinking tools registration): %s", exc) logger.debug("Thinking tools not available") def create_full_toolkit(base_dir: str | Path | None = None): """Create a full toolkit with all available tools (for the orchestrator). Includes: web search, file read/write, shell commands, python execution, memory search for contextual recall, and Grok consultation. """ if not _AGNO_TOOLS_AVAILABLE: # Return None when tools aren't available (tests) return None from timmy.tool_safety import DANGEROUS_TOOLS toolkit = Toolkit(name="full") # Set requires_confirmation_tools AFTER construction (avoids agno WARNING # about tools not yet registered) but BEFORE register() calls (so each # Function gets requires_confirmation=True). Fixes #79. toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS) base_path = Path(base_dir) if base_dir else Path(settings.repo_root) _register_core_tools(toolkit, base_path) _register_grok_tool(toolkit) _register_memory_tools(toolkit) _register_agentic_loop_tool(toolkit) _register_introspection_tools(toolkit) _register_delegation_tools(toolkit) _register_gematria_tool(toolkit) _register_artifact_tools(toolkit) _register_thinking_tools(toolkit) # Gitea issue management is now provided by the gitea-mcp server # (wired in as MCPTools in agent.py, not registered here) return toolkit def create_experiment_tools(base_dir: str | Path | None = None): """Create tools for the experiment agent (Lab). Includes: prepare_experiment, run_experiment, evaluate_result, plus shell + file ops for editing training code. """ if not _AGNO_TOOLS_AVAILABLE: raise ImportError(f"Agno tools not available: {_ImportError}") from config import settings toolkit = Toolkit(name="experiment") from timmy.autoresearch import evaluate_result, prepare_experiment, run_experiment workspace = ( Path(base_dir) if base_dir else Path(settings.repo_root) / settings.autoresearch_workspace ) def _prepare(repo_url: str = "https://github.com/karpathy/autoresearch.git") -> str: """Clone and prepare an autoresearch experiment workspace.""" return prepare_experiment(workspace, repo_url) def _run(timeout: int = 0) -> str: """Run a single training experiment with wall-clock timeout.""" t = timeout or settings.autoresearch_time_budget result = run_experiment(workspace, timeout=t, metric_name=settings.autoresearch_metric) if result["success"] and result["metric"] is not None: return ( f"{settings.autoresearch_metric}: {result['metric']:.4f} ({result['duration_s']}s)" ) return result.get("error") or "Experiment failed" def _evaluate(current: float, baseline: float) -> str: """Compare current metric against baseline.""" return evaluate_result(current, baseline, metric_name=settings.autoresearch_metric) toolkit.register(_prepare, name="prepare_experiment") toolkit.register(_run, name="run_experiment") toolkit.register(_evaluate, name="evaluate_result") # Also give Lab access to file + shell tools for editing train.py shell_tools = ShellTools() toolkit.register(shell_tools.run_shell_command, name="shell") base_path = Path(base_dir) if base_dir else Path(settings.repo_root) file_tools = FileTools(base_dir=base_path) toolkit.register(_make_smart_read_file(file_tools), name="read_file") toolkit.register(file_tools.save_file, name="write_file") toolkit.register(file_tools.list_files, name="list_files") return toolkit # Mapping of agent IDs to their toolkits AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = { "echo": create_research_tools, "mace": create_security_tools, "helm": create_devops_tools, "seer": create_data_tools, "forge": create_code_tools, "quill": create_writing_tools, "lab": create_experiment_tools, "pixel": lambda base_dir=None: _create_stub_toolkit("pixel"), "lyra": lambda base_dir=None: _create_stub_toolkit("lyra"), "reel": lambda base_dir=None: _create_stub_toolkit("reel"), } def _create_stub_toolkit(name: str): """Create a minimal Agno toolkit for creative agents. Creative agents use their own dedicated tool modules rather than Agno-wrapped functions. This stub ensures AGENT_TOOLKITS has an entry so ToolExecutor doesn't fall back to the full toolkit. """ if not _AGNO_TOOLS_AVAILABLE: return None toolkit = Toolkit(name=name) return toolkit def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None: """Get the appropriate toolkit for an agent. Args: agent_id: The agent ID (echo, mace, helm, seer, forge, quill) base_dir: Optional base directory for file operations Returns: A Toolkit instance or None if agent_id is not recognized """ factory = AGENT_TOOLKITS.get(agent_id) if factory: return factory(base_dir) return None # Backward-compat alias get_tools_for_persona = get_tools_for_agent PERSONA_TOOLKITS = AGENT_TOOLKITS def _core_tool_catalog() -> dict: """Return core file and execution tools catalog entries.""" return { "shell": { "name": "Shell Commands", "description": "Execute shell commands (sandboxed)", "available_in": ["forge", "mace", "helm", "orchestrator"], }, "python": { "name": "Python Execution", "description": "Execute Python code for analysis and scripting", "available_in": ["forge", "seer", "orchestrator"], }, "read_file": { "name": "Read File", "description": "Read contents of local files", "available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"], }, "write_file": { "name": "Write File", "description": "Write content to local files", "available_in": ["forge", "quill", "helm", "orchestrator"], }, "list_files": { "name": "List Files", "description": "List files in a directory", "available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"], }, } def _analysis_tool_catalog() -> dict: """Return analysis and calculation tools catalog entries.""" return { "calculator": { "name": "Calculator", "description": "Evaluate mathematical expressions with exact results", "available_in": ["orchestrator"], }, } def _ai_tool_catalog() -> dict: """Return AI assistant and frontier reasoning tools catalog entries.""" return { "consult_grok": { "name": "Consult Grok", "description": "Premium frontier reasoning via xAI Grok (opt-in, Lightning-payable)", "available_in": ["orchestrator"], }, "aider": { "name": "Aider AI Assistant", "description": "Local AI coding assistant using Ollama (qwen3:30b or deepseek-coder)", "available_in": ["forge", "orchestrator"], }, } def _introspection_tool_catalog() -> dict: """Return system introspection tools catalog entries.""" return { "get_system_info": { "name": "System Info", "description": "Introspect runtime environment - discover model, Python version, config", "available_in": ["orchestrator"], }, "check_ollama_health": { "name": "Ollama Health", "description": "Check if Ollama is accessible and what models are available", "available_in": ["orchestrator"], }, "get_memory_status": { "name": "Memory Status", "description": "Check status of memory tiers (hot memory, vault)", "available_in": ["orchestrator"], }, "session_history": { "name": "Session History", "description": "Search past conversation logs for messages, tool calls, errors, and decisions", "available_in": ["orchestrator"], }, "thought_search": { "name": "Thought Search", "description": "Query Timmy's own thought history for past reflections and insights", "available_in": ["orchestrator"], }, } def _experiment_tool_catalog() -> dict: """Return ML experiment tools catalog entries.""" return { "prepare_experiment": { "name": "Prepare Experiment", "description": "Clone autoresearch repo and run data preparation for ML experiments", "available_in": ["lab", "orchestrator"], }, "run_experiment": { "name": "Run Experiment", "description": "Execute a time-boxed ML training experiment and capture metrics", "available_in": ["lab", "orchestrator"], }, "evaluate_result": { "name": "Evaluate Result", "description": "Compare experiment metric against baseline to assess improvement", "available_in": ["lab", "orchestrator"], }, } def _import_creative_catalogs(catalog: dict) -> None: """Import and merge creative tool catalogs from creative module.""" # ── Git tools ───────────────────────────────────────────────────────────── try: from creative.tools.git_tools import GIT_TOOL_CATALOG for tool_id, info in GIT_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["forge", "helm", "orchestrator"], } except ImportError: pass # ── Image tools ──────────────────────────────────────────────────────────── try: from creative.tools.image_tools import IMAGE_TOOL_CATALOG for tool_id, info in IMAGE_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["pixel", "orchestrator"], } except ImportError: pass # ── Music tools ──────────────────────────────────────────────────────────── try: from creative.tools.music_tools import MUSIC_TOOL_CATALOG for tool_id, info in MUSIC_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["lyra", "orchestrator"], } except ImportError: pass # ── Video tools ──────────────────────────────────────────────────────────── try: from creative.tools.video_tools import VIDEO_TOOL_CATALOG for tool_id, info in VIDEO_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["reel", "orchestrator"], } except ImportError: pass # ── Creative pipeline ────────────────────────────────────────────────────── try: from creative.director import DIRECTOR_TOOL_CATALOG for tool_id, info in DIRECTOR_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["orchestrator"], } except ImportError: pass # ── Assembler tools ─────────────────────────────────────────────────────── try: from creative.assembler import ASSEMBLER_TOOL_CATALOG for tool_id, info in ASSEMBLER_TOOL_CATALOG.items(): catalog[tool_id] = { "name": info["name"], "description": info["description"], "available_in": ["reel", "orchestrator"], } except ImportError: pass def get_all_available_tools() -> dict[str, dict]: """Get a catalog of all available tools and their descriptions. Returns: Dict mapping tool categories to their tools and descriptions. """ catalog = {} catalog.update(_core_tool_catalog()) catalog.update(_analysis_tool_catalog()) catalog.update(_ai_tool_catalog()) catalog.update(_introspection_tool_catalog()) catalog.update(_experiment_tool_catalog()) _import_creative_catalogs(catalog) return catalog