Files
Timmy-time-dashboard/src/timmy/tools.py
Alexander Whitestone 6db1c16b7f
Some checks failed
Tests / lint (pull_request) Failing after 15s
Tests / test (pull_request) Has been skipped
refactor: Gracefully handle tool registration errors
Refs #938
2026-03-23 14:26:01 -04:00

1033 lines
36 KiB
Python

"""Tool integration for the agent swarm.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
"""
from __future__ import annotations
import ast
import logging
import math
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: Toolkit
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def _make_smart_read_file(file_tools: FileTools) -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
import subprocess
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend — use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
"""Register the web_fetch tool for full-page content extraction."""
try:
toolkit.register(web_fetch, name="web_fetch")
except Exception as exc:
logger.error("Failed to register web_fetch tool: %s", exc)
raise
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
"""Register core execution and file tools."""
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# Shell commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Calculator — exact arithmetic (never let the LLM guess)
toolkit.register(calculator, name="calculator")
def _register_grok_tool(toolkit: Toolkit) -> None:
"""Register Grok consultation tool if available."""
try:
from timmy.backends import grok_available
if grok_available():
toolkit.register(consult_grok, name="consult_grok")
logger.info("Grok consultation tool registered")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Grok tool: %s", exc)
raise
def _register_memory_tools(toolkit: Toolkit) -> None:
"""Register memory search, write, and forget tools."""
try:
from timmy.memory_system import memory_forget, memory_read, memory_search, memory_write
toolkit.register(memory_search, name="memory_search")
toolkit.register(memory_write, name="memory_write")
toolkit.register(memory_read, name="memory_read")
toolkit.register(memory_forget, name="memory_forget")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Memory tools: %s", exc)
raise
def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
"""Register agentic loop tool for background multi-step task execution."""
try:
from timmy.agentic_loop import run_agentic_loop
def plan_and_execute(task: str) -> str:
"""Execute a complex multi-step task in the background with progress tracking.
Use this when a task requires 3 or more sequential tool calls that may
take significant time. The task will run in the background and stream
progress updates to the user via WebSocket.
Args:
task: Full description of the multi-step task to execute.
Returns:
Task ID and confirmation that background execution has started.
"""
import asyncio
task_id = None
async def _launch():
nonlocal task_id
result = await run_agentic_loop(task)
return result
# Spawn as a background task on the running event loop
try:
asyncio.get_running_loop()
future = asyncio.ensure_future(_launch())
task_id = id(future)
logger.info("Agentic loop started (task=%s)", task[:80])
except RuntimeError:
# No running loop — run synchronously (shouldn't happen in prod)
result = asyncio.run(_launch())
return f"Task completed: {result.summary}"
return (
"Background task started. I'll execute this step-by-step "
"and stream progress updates. You can monitor via the dashboard."
)
toolkit.register(plan_and_execute, name="plan_and_execute")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register plan_and_execute tool: %s", exc)
raise
def _register_introspection_tools(toolkit: Toolkit) -> None:
"""Register system introspection tools for runtime environment queries."""
try:
from timmy.tools_intro import (
check_ollama_health,
get_memory_status,
get_system_info,
run_self_tests,
)
toolkit.register(get_system_info, name="get_system_info")
toolkit.register(check_ollama_health, name="check_ollama_health")
toolkit.register(get_memory_status, name="get_memory_status")
toolkit.register(run_self_tests, name="run_self_tests")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Introspection tools: %s", exc)
raise
try:
from timmy.mcp_tools import update_gitea_avatar
toolkit.register(update_gitea_avatar, name="update_gitea_avatar")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register update_gitea_avatar tool: %s", exc)
raise
try:
from timmy.session_logger import self_reflect, session_history
toolkit.register(session_history, name="session_history")
toolkit.register(self_reflect, name="self_reflect")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register session_history tool: %s", exc)
raise
def _register_delegation_tools(toolkit: Toolkit) -> None:
"""Register inter-agent delegation tools."""
try:
from timmy.tools_delegation import delegate_task, delegate_to_kimi, list_swarm_agents
toolkit.register(delegate_task, name="delegate_task")
toolkit.register(delegate_to_kimi, name="delegate_to_kimi")
toolkit.register(list_swarm_agents, name="list_swarm_agents")
except Exception as exc:
logger.error("Failed to register Delegation tools: %s", exc)
raise
def _register_gematria_tool(toolkit: Toolkit) -> None:
"""Register the gematria computation tool."""
try:
from timmy.gematria import gematria
toolkit.register(gematria, name="gematria")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Gematria tool: %s", exc)
raise
def _register_artifact_tools(toolkit: Toolkit) -> None:
"""Register artifact tools — notes and decision logging."""
try:
from timmy.memory_system import jot_note, log_decision
toolkit.register(jot_note, name="jot_note")
toolkit.register(log_decision, name="log_decision")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Artifact tools: %s", exc)
raise
def _register_thinking_tools(toolkit: Toolkit) -> None:
"""Register thinking/introspection tools for self-reflection."""
try:
from timmy.thinking import search_thoughts
toolkit.register(search_thoughts, name="thought_search")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Thinking tools: %s", exc)
raise
def create_full_toolkit(base_dir: str | Path | None = None):
"""Create a full toolkit with all available tools (for the orchestrator).
Includes: web search, file read/write, shell commands, python execution,
memory search for contextual recall, and Grok consultation.
"""
if not _AGNO_TOOLS_AVAILABLE:
# Return None when tools aren't available (tests)
return None
from timmy.tool_safety import DANGEROUS_TOOLS
toolkit = Toolkit(name="full")
# Set requires_confirmation_tools AFTER construction (avoids agno WARNING
# about tools not yet registered) but BEFORE register() calls (so each
# Function gets requires_confirmation=True). Fixes #79.
toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS)
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
_register_core_tools(toolkit, base_path)
_register_web_fetch_tool(toolkit)
_register_grok_tool(toolkit)
_register_memory_tools(toolkit)
_register_agentic_loop_tool(toolkit)
_register_introspection_tools(toolkit)
_register_delegation_tools(toolkit)
_register_gematria_tool(toolkit)
_register_artifact_tools(toolkit)
_register_thinking_tools(toolkit)
# Gitea issue management is now provided by the gitea-mcp server
# (wired in as MCPTools in agent.py, not registered here)
return toolkit
def create_experiment_tools(base_dir: str | Path | None = None):
"""Create tools for the experiment agent (Lab).
Includes: prepare_experiment, run_experiment, evaluate_result,
plus shell + file ops for editing training code.
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
from config import settings
toolkit = Toolkit(name="experiment")
from timmy.autoresearch import evaluate_result, prepare_experiment, run_experiment
workspace = (
Path(base_dir) if base_dir else Path(settings.repo_root) / settings.autoresearch_workspace
)
def _prepare(repo_url: str = "https://github.com/karpathy/autoresearch.git") -> str:
"""Clone and prepare an autoresearch experiment workspace."""
return prepare_experiment(workspace, repo_url)
def _run(timeout: int = 0) -> str:
"""Run a single training experiment with wall-clock timeout."""
t = timeout or settings.autoresearch_time_budget
result = run_experiment(workspace, timeout=t, metric_name=settings.autoresearch_metric)
if result["success"] and result["metric"] is not None:
return (
f"{settings.autoresearch_metric}: {result['metric']:.4f} ({result['duration_s']}s)"
)
return result.get("error") or "Experiment failed"
def _evaluate(current: float, baseline: float) -> str:
"""Compare current metric against baseline."""
return evaluate_result(current, baseline, metric_name=settings.autoresearch_metric)
toolkit.register(_prepare, name="prepare_experiment")
toolkit.register(_run, name="run_experiment")
toolkit.register(_evaluate, name="evaluate_result")
# Also give Lab access to file + shell tools for editing train.py
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
"mace": create_security_tools,
"helm": create_devops_tools,
"seer": create_data_tools,
"forge": create_code_tools,
"quill": create_writing_tools,
"lab": create_experiment_tools,
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
}
def _create_stub_toolkit(name: str):
"""Create a minimal Agno toolkit for creative agents.
Creative agents use their own dedicated tool modules rather than
Agno-wrapped functions. This stub ensures AGENT_TOOLKITS has an
entry so ToolExecutor doesn't fall back to the full toolkit.
"""
if not _AGNO_TOOLS_AVAILABLE:
return None
toolkit = Toolkit(name=name)
return toolkit
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
"""Get the appropriate toolkit for an agent.
Args:
agent_id: The agent ID (echo, mace, helm, seer, forge, quill)
base_dir: Optional base directory for file operations
Returns:
A Toolkit instance or None if agent_id is not recognized
"""
factory = AGENT_TOOLKITS.get(agent_id)
if factory:
return factory(base_dir)
return None
# Backward-compat alias
get_tools_for_persona = get_tools_for_agent
PERSONA_TOOLKITS = AGENT_TOOLKITS
def _core_tool_catalog() -> dict:
"""Return core file and execution tools catalog entries."""
return {
"shell": {
"name": "Shell Commands",
"description": "Execute shell commands (sandboxed)",
"available_in": ["forge", "mace", "helm", "orchestrator"],
},
"python": {
"name": "Python Execution",
"description": "Execute Python code for analysis and scripting",
"available_in": ["forge", "seer", "orchestrator"],
},
"read_file": {
"name": "Read File",
"description": "Read contents of local files",
"available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"],
},
"write_file": {
"name": "Write File",
"description": "Write content to local files",
"available_in": ["forge", "quill", "helm", "orchestrator"],
},
"list_files": {
"name": "List Files",
"description": "List files in a directory",
"available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"],
},
}
def _analysis_tool_catalog() -> dict:
"""Return analysis and calculation tools catalog entries."""
return {
"calculator": {
"name": "Calculator",
"description": "Evaluate mathematical expressions with exact results",
"available_in": ["orchestrator"],
},
"web_fetch": {
"name": "Web Fetch",
"description": "Fetch a web page and extract clean readable text (trafilatura)",
"available_in": ["orchestrator"],
},
}
def _ai_tool_catalog() -> dict:
"""Return AI assistant and frontier reasoning tools catalog entries."""
return {
"consult_grok": {
"name": "Consult Grok",
"description": "Premium frontier reasoning via xAI Grok (opt-in, Lightning-payable)",
"available_in": ["orchestrator"],
},
"aider": {
"name": "Aider AI Assistant",
"description": "Local AI coding assistant using Ollama (qwen3:30b or deepseek-coder)",
"available_in": ["forge", "orchestrator"],
},
}
def _introspection_tool_catalog() -> dict:
"""Return system introspection tools catalog entries."""
return {
"get_system_info": {
"name": "System Info",
"description": "Introspect runtime environment - discover model, Python version, config",
"available_in": ["orchestrator"],
},
"check_ollama_health": {
"name": "Ollama Health",
"description": "Check if Ollama is accessible and what models are available",
"available_in": ["orchestrator"],
},
"get_memory_status": {
"name": "Memory Status",
"description": "Check status of memory tiers (hot memory, vault)",
"available_in": ["orchestrator"],
},
"session_history": {
"name": "Session History",
"description": "Search past conversation logs for messages, tool calls, errors, and decisions",
"available_in": ["orchestrator"],
},
"thought_search": {
"name": "Thought Search",
"description": "Query Timmy's own thought history for past reflections and insights",
"available_in": ["orchestrator"],
},
"self_reflect": {
"name": "Self-Reflect",
"description": "Review recent conversations to spot patterns, low-confidence answers, and errors",
"available_in": ["orchestrator"],
},
"update_gitea_avatar": {
"name": "Update Gitea Avatar",
"description": "Generate and upload a wizard-themed avatar to Timmy's Gitea profile",
"available_in": ["orchestrator"],
},
}
def _experiment_tool_catalog() -> dict:
"""Return ML experiment tools catalog entries."""
return {
"prepare_experiment": {
"name": "Prepare Experiment",
"description": "Clone autoresearch repo and run data preparation for ML experiments",
"available_in": ["lab", "orchestrator"],
},
"run_experiment": {
"name": "Run Experiment",
"description": "Execute a time-boxed ML training experiment and capture metrics",
"available_in": ["lab", "orchestrator"],
},
"evaluate_result": {
"name": "Evaluate Result",
"description": "Compare experiment metric against baseline to assess improvement",
"available_in": ["lab", "orchestrator"],
},
}
_CREATIVE_CATALOG_SOURCES: list[tuple[str, str, list[str]]] = [
("creative.tools.git_tools", "GIT_TOOL_CATALOG", ["forge", "helm", "orchestrator"]),
("creative.tools.image_tools", "IMAGE_TOOL_CATALOG", ["pixel", "orchestrator"]),
("creative.tools.music_tools", "MUSIC_TOOL_CATALOG", ["lyra", "orchestrator"]),
("creative.tools.video_tools", "VIDEO_TOOL_CATALOG", ["reel", "orchestrator"]),
("creative.director", "DIRECTOR_TOOL_CATALOG", ["orchestrator"]),
("creative.assembler", "ASSEMBLER_TOOL_CATALOG", ["reel", "orchestrator"]),
]
def _import_creative_catalogs(catalog: dict) -> None:
"""Import and merge creative tool catalogs from creative module."""
for module_path, attr_name, available_in in _CREATIVE_CATALOG_SOURCES:
_merge_catalog(catalog, module_path, attr_name, available_in)
def _merge_catalog(
catalog: dict, module_path: str, attr_name: str, available_in: list[str]
) -> None:
"""Import a single creative catalog and merge its entries."""
try:
from importlib import import_module
source_catalog = getattr(import_module(module_path), attr_name)
for tool_id, info in source_catalog.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": available_in,
}
except ImportError:
logger.debug("Optional catalog %s.%s not available", module_path, attr_name)
def get_all_available_tools() -> dict[str, dict]:
"""Get a catalog of all available tools and their descriptions.
Returns:
Dict mapping tool categories to their tools and descriptions.
"""
catalog = {}
catalog.update(_core_tool_catalog())
catalog.update(_analysis_tool_catalog())
catalog.update(_ai_tool_catalog())
catalog.update(_introspection_tool_catalog())
catalog.update(_experiment_tool_catalog())
_import_creative_catalogs(catalog)
return catalog