forked from Rockachopa/Timmy-time-dashboard
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
1007 lines
36 KiB
Python
1007 lines
36 KiB
Python
"""Tool integration for the agent swarm.
|
|
|
|
Provides agents with capabilities for:
|
|
- File read/write (local filesystem)
|
|
- Shell command execution (sandboxed)
|
|
- Python code execution
|
|
- Git operations
|
|
- Image / Music / Video generation (creative pipeline)
|
|
|
|
Tools are assigned to agents based on their specialties.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import logging
|
|
import math
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Lazy imports to handle test mocking
|
|
_ImportError = None
|
|
try:
|
|
from agno.tools import Toolkit
|
|
from agno.tools.file import FileTools
|
|
from agno.tools.python import PythonTools
|
|
from agno.tools.shell import ShellTools
|
|
|
|
_AGNO_TOOLS_AVAILABLE = True
|
|
except ImportError as e:
|
|
_AGNO_TOOLS_AVAILABLE = False
|
|
_ImportError = e
|
|
|
|
# Track tool usage stats
|
|
_TOOL_USAGE: dict[str, list[dict]] = {}
|
|
|
|
|
|
@dataclass
|
|
class ToolStats:
|
|
"""Statistics for a single tool."""
|
|
|
|
tool_name: str
|
|
call_count: int = 0
|
|
last_used: str | None = None
|
|
errors: int = 0
|
|
|
|
|
|
@dataclass
|
|
class AgentTools:
|
|
"""Tools assigned to an agent."""
|
|
|
|
agent_id: str
|
|
agent_name: str
|
|
toolkit: Toolkit
|
|
available_tools: list[str] = field(default_factory=list)
|
|
|
|
|
|
# Backward-compat alias
|
|
PersonaTools = AgentTools
|
|
|
|
|
|
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
|
|
"""Track tool usage for analytics."""
|
|
if agent_id not in _TOOL_USAGE:
|
|
_TOOL_USAGE[agent_id] = []
|
|
_TOOL_USAGE[agent_id].append(
|
|
{
|
|
"tool": tool_name,
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"success": success,
|
|
}
|
|
)
|
|
|
|
|
|
def get_tool_stats(agent_id: str | None = None) -> dict:
|
|
"""Get tool usage statistics.
|
|
|
|
Args:
|
|
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
|
|
|
|
Returns:
|
|
Dict with tool usage statistics.
|
|
"""
|
|
if agent_id:
|
|
usage = _TOOL_USAGE.get(agent_id, [])
|
|
return {
|
|
"agent_id": agent_id,
|
|
"total_calls": len(usage),
|
|
"tools_used": list(set(u["tool"] for u in usage)),
|
|
"recent_calls": usage[-10:] if usage else [],
|
|
}
|
|
|
|
# Return stats for all agents
|
|
all_stats = {}
|
|
for aid, usage in _TOOL_USAGE.items():
|
|
all_stats[aid] = {
|
|
"total_calls": len(usage),
|
|
"tools_used": list(set(u["tool"] for u in usage)),
|
|
}
|
|
return all_stats
|
|
|
|
|
|
def _safe_eval(node, allowed_names: dict):
|
|
"""Walk an AST and evaluate only safe numeric operations."""
|
|
if isinstance(node, ast.Expression):
|
|
return _safe_eval(node.body, allowed_names)
|
|
if isinstance(node, ast.Constant):
|
|
if isinstance(node.value, (int, float, complex)):
|
|
return node.value
|
|
raise ValueError(f"Unsupported constant: {node.value!r}")
|
|
if isinstance(node, ast.UnaryOp):
|
|
operand = _safe_eval(node.operand, allowed_names)
|
|
if isinstance(node.op, ast.UAdd):
|
|
return +operand
|
|
if isinstance(node.op, ast.USub):
|
|
return -operand
|
|
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
|
|
if isinstance(node, ast.BinOp):
|
|
left = _safe_eval(node.left, allowed_names)
|
|
right = _safe_eval(node.right, allowed_names)
|
|
ops = {
|
|
ast.Add: lambda a, b: a + b,
|
|
ast.Sub: lambda a, b: a - b,
|
|
ast.Mult: lambda a, b: a * b,
|
|
ast.Div: lambda a, b: a / b,
|
|
ast.FloorDiv: lambda a, b: a // b,
|
|
ast.Mod: lambda a, b: a % b,
|
|
ast.Pow: lambda a, b: a**b,
|
|
}
|
|
op_fn = ops.get(type(node.op))
|
|
if op_fn is None:
|
|
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
|
|
return op_fn(left, right)
|
|
if isinstance(node, ast.Name):
|
|
if node.id in allowed_names:
|
|
return allowed_names[node.id]
|
|
raise ValueError(f"Unknown name: {node.id!r}")
|
|
if isinstance(node, ast.Attribute):
|
|
value = _safe_eval(node.value, allowed_names)
|
|
# Only allow attribute access on the math module
|
|
if value is math:
|
|
attr = getattr(math, node.attr, None)
|
|
if attr is not None:
|
|
return attr
|
|
raise ValueError(f"Attribute access not allowed: .{node.attr}")
|
|
if isinstance(node, ast.Call):
|
|
func = _safe_eval(node.func, allowed_names)
|
|
if not callable(func):
|
|
raise ValueError(f"Not callable: {func!r}")
|
|
args = [_safe_eval(a, allowed_names) for a in node.args]
|
|
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
|
|
return func(*args, **kwargs)
|
|
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
|
|
|
|
|
|
def calculator(expression: str) -> str:
|
|
"""Evaluate a mathematical expression and return the exact result.
|
|
|
|
Use this tool for ANY arithmetic: multiplication, division, square roots,
|
|
exponents, percentages, logarithms, trigonometry, etc.
|
|
|
|
Args:
|
|
expression: A valid Python math expression, e.g. '347 * 829',
|
|
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
|
|
|
|
Returns:
|
|
The exact result as a string.
|
|
"""
|
|
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
|
|
allowed_names["math"] = math
|
|
allowed_names["abs"] = abs
|
|
allowed_names["round"] = round
|
|
allowed_names["min"] = min
|
|
allowed_names["max"] = max
|
|
try:
|
|
tree = ast.parse(expression, mode="eval")
|
|
result = _safe_eval(tree, allowed_names)
|
|
return str(result)
|
|
except Exception as e: # broad catch intentional: arbitrary code execution
|
|
return f"Error evaluating '{expression}': {e}"
|
|
|
|
|
|
def _make_smart_read_file(file_tools: FileTools) -> Callable:
|
|
"""Wrap FileTools.read_file so directories auto-list their contents.
|
|
|
|
When the user (or the LLM) passes a directory path to read_file,
|
|
the raw Agno implementation throws an IsADirectoryError. This
|
|
wrapper detects that case, lists the directory entries, and returns
|
|
a helpful message so the model can pick the right file on its own.
|
|
"""
|
|
original_read = file_tools.read_file
|
|
|
|
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
|
|
"""Reads the contents of the file `file_name` and returns the contents if successful."""
|
|
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
|
|
if not file_name:
|
|
file_name = kwargs.get("path", "")
|
|
if not file_name:
|
|
return "Error: no file_name or path provided."
|
|
# Resolve the path the same way FileTools does
|
|
_safe, resolved = file_tools.check_escape(file_name)
|
|
if _safe and resolved.is_dir():
|
|
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
|
|
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
|
|
return (
|
|
f"'{file_name}' is a directory, not a file. "
|
|
f"Files inside:\n{listing}\n\n"
|
|
"Please call read_file with one of the files listed above."
|
|
)
|
|
return original_read(file_name, encoding=encoding)
|
|
|
|
# Preserve the original docstring for Agno tool schema generation
|
|
smart_read_file.__doc__ = original_read.__doc__
|
|
return smart_read_file
|
|
|
|
|
|
def create_research_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the research agent (Echo).
|
|
|
|
Includes: file reading
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="research")
|
|
|
|
# File reading
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_code_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the code agent (Forge).
|
|
|
|
Includes: shell commands, python execution, file read/write, Aider AI assist
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="code")
|
|
|
|
# Shell commands (sandboxed)
|
|
shell_tools = ShellTools()
|
|
toolkit.register(shell_tools.run_shell_command, name="shell")
|
|
|
|
# Python execution
|
|
python_tools = PythonTools()
|
|
toolkit.register(python_tools.run_python_code, name="python")
|
|
|
|
# File operations
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
# Aider AI coding assistant (local with Ollama)
|
|
aider_tool = create_aider_tool(base_path)
|
|
toolkit.register(aider_tool.run_aider, name="aider")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_aider_tool(base_path: Path):
|
|
"""Create an Aider tool for AI-assisted coding."""
|
|
import subprocess
|
|
|
|
class AiderTool:
|
|
"""Tool that calls Aider (local AI coding assistant) for code generation."""
|
|
|
|
def __init__(self, base_dir: Path):
|
|
self.base_dir = base_dir
|
|
|
|
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
|
|
"""Run Aider to generate code changes.
|
|
|
|
Args:
|
|
prompt: What you want Aider to do (e.g., "add a fibonacci function")
|
|
model: Ollama model to use (default: qwen3:30b)
|
|
|
|
Returns:
|
|
Aider's response with the code changes made
|
|
"""
|
|
try:
|
|
# Run aider with the prompt
|
|
result = subprocess.run(
|
|
[
|
|
"aider",
|
|
"--no-git",
|
|
"--model",
|
|
f"ollama/{model}",
|
|
"--quiet",
|
|
prompt,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
cwd=str(self.base_dir),
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return result.stdout if result.stdout else "Code changes applied successfully"
|
|
else:
|
|
return f"Aider error: {result.stderr}"
|
|
except FileNotFoundError:
|
|
return "Error: Aider not installed. Run: pip install aider"
|
|
except subprocess.TimeoutExpired:
|
|
return "Error: Aider timed out after 120 seconds"
|
|
except (OSError, subprocess.SubprocessError) as e:
|
|
return f"Error running Aider: {str(e)}"
|
|
|
|
return AiderTool(base_path)
|
|
|
|
|
|
def create_data_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the data agent (Seer).
|
|
|
|
Includes: python execution, file reading, web search for data sources
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="data")
|
|
|
|
# Python execution for analysis
|
|
python_tools = PythonTools()
|
|
toolkit.register(python_tools.run_python_code, name="python")
|
|
|
|
# File reading
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_writing_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the writing agent (Quill).
|
|
|
|
Includes: file read/write
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="writing")
|
|
|
|
# File operations
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_security_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the security agent (Mace).
|
|
|
|
Includes: shell commands (for scanning), file read
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="security")
|
|
|
|
# Shell for running security scans
|
|
shell_tools = ShellTools()
|
|
toolkit.register(shell_tools.run_shell_command, name="shell")
|
|
|
|
# File reading for logs/configs
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_devops_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the DevOps agent (Helm).
|
|
|
|
Includes: shell commands, file read/write
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="devops")
|
|
|
|
# Shell for deployment commands
|
|
shell_tools = ShellTools()
|
|
toolkit.register(shell_tools.run_shell_command, name="shell")
|
|
|
|
# File operations for config management
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def consult_grok(query: str) -> str:
|
|
"""Consult Grok (xAI) for frontier reasoning on complex questions.
|
|
|
|
Use this tool when a question requires advanced reasoning, real-time
|
|
knowledge, or capabilities beyond the local model. Grok is a premium
|
|
cloud backend — use sparingly and only for high-complexity queries.
|
|
|
|
Args:
|
|
query: The question or reasoning task to send to Grok.
|
|
|
|
Returns:
|
|
Grok's response text, or an error/status message.
|
|
"""
|
|
from config import settings
|
|
from timmy.backends import get_grok_backend, grok_available
|
|
|
|
if not grok_available():
|
|
return (
|
|
"Grok is not available. Enable with GROK_ENABLED=true "
|
|
"and set XAI_API_KEY in your .env file."
|
|
)
|
|
|
|
backend = get_grok_backend()
|
|
|
|
# Log to Spark if available
|
|
try:
|
|
from spark.engine import spark_engine
|
|
|
|
spark_engine.on_tool_executed(
|
|
agent_id="default",
|
|
tool_name="consult_grok",
|
|
success=True,
|
|
)
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
|
|
pass
|
|
|
|
# Generate Lightning invoice for monetization (unless free mode)
|
|
invoice_info = ""
|
|
if not settings.grok_free:
|
|
try:
|
|
from lightning.factory import get_backend as get_ln_backend
|
|
|
|
ln = get_ln_backend()
|
|
sats = min(settings.grok_max_sats_per_query, 100)
|
|
inv = ln.create_invoice(sats, f"Grok query: {query[:50]}")
|
|
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
|
|
except (ImportError, OSError, ValueError) as exc:
|
|
logger.warning("Tool execution failed (Lightning invoice): %s", exc)
|
|
pass
|
|
|
|
result = backend.run(query)
|
|
|
|
response = result.content
|
|
if invoice_info:
|
|
response += invoice_info
|
|
|
|
return response
|
|
|
|
|
|
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
|
|
"""Register core execution and file tools."""
|
|
# Python execution
|
|
python_tools = PythonTools()
|
|
toolkit.register(python_tools.run_python_code, name="python")
|
|
|
|
# Shell commands
|
|
shell_tools = ShellTools()
|
|
toolkit.register(shell_tools.run_shell_command, name="shell")
|
|
|
|
# File operations
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
# Calculator — exact arithmetic (never let the LLM guess)
|
|
toolkit.register(calculator, name="calculator")
|
|
|
|
|
|
def _register_grok_tool(toolkit: Toolkit) -> None:
|
|
"""Register Grok consultation tool if available."""
|
|
try:
|
|
from timmy.backends import grok_available
|
|
|
|
if grok_available():
|
|
toolkit.register(consult_grok, name="consult_grok")
|
|
logger.info("Grok consultation tool registered")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Grok registration): %s", exc)
|
|
logger.debug("Grok tool not available")
|
|
|
|
|
|
def _register_memory_tools(toolkit: Toolkit) -> None:
|
|
"""Register memory search, write, and forget tools."""
|
|
try:
|
|
from timmy.memory_system import memory_forget, memory_read, memory_search, memory_write
|
|
|
|
toolkit.register(memory_search, name="memory_search")
|
|
toolkit.register(memory_write, name="memory_write")
|
|
toolkit.register(memory_read, name="memory_read")
|
|
toolkit.register(memory_forget, name="memory_forget")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Memory tools registration): %s", exc)
|
|
logger.debug("Memory tools not available")
|
|
|
|
|
|
def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
|
|
"""Register agentic loop tool for background multi-step task execution."""
|
|
try:
|
|
from timmy.agentic_loop import run_agentic_loop
|
|
|
|
def plan_and_execute(task: str) -> str:
|
|
"""Execute a complex multi-step task in the background with progress tracking.
|
|
|
|
Use this when a task requires 3 or more sequential tool calls that may
|
|
take significant time. The task will run in the background and stream
|
|
progress updates to the user via WebSocket.
|
|
|
|
Args:
|
|
task: Full description of the multi-step task to execute.
|
|
|
|
Returns:
|
|
Task ID and confirmation that background execution has started.
|
|
"""
|
|
import asyncio
|
|
|
|
task_id = None
|
|
|
|
async def _launch():
|
|
nonlocal task_id
|
|
result = await run_agentic_loop(task)
|
|
return result
|
|
|
|
# Spawn as a background task on the running event loop
|
|
try:
|
|
asyncio.get_running_loop()
|
|
future = asyncio.ensure_future(_launch())
|
|
task_id = id(future)
|
|
logger.info("Agentic loop started (task=%s)", task[:80])
|
|
except RuntimeError:
|
|
# No running loop — run synchronously (shouldn't happen in prod)
|
|
result = asyncio.run(_launch())
|
|
return f"Task completed: {result.summary}"
|
|
|
|
return (
|
|
"Background task started. I'll execute this step-by-step "
|
|
"and stream progress updates. You can monitor via the dashboard."
|
|
)
|
|
|
|
toolkit.register(plan_and_execute, name="plan_and_execute")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (plan_and_execute registration): %s", exc)
|
|
logger.debug("plan_and_execute tool not available")
|
|
|
|
|
|
def _register_introspection_tools(toolkit: Toolkit) -> None:
|
|
"""Register system introspection tools for runtime environment queries."""
|
|
try:
|
|
from timmy.tools_intro import (
|
|
check_ollama_health,
|
|
get_memory_status,
|
|
get_system_info,
|
|
run_self_tests,
|
|
)
|
|
|
|
toolkit.register(get_system_info, name="get_system_info")
|
|
toolkit.register(check_ollama_health, name="check_ollama_health")
|
|
toolkit.register(get_memory_status, name="get_memory_status")
|
|
toolkit.register(run_self_tests, name="run_self_tests")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
|
|
logger.debug("Introspection tools not available")
|
|
|
|
try:
|
|
from timmy.mcp_tools import update_gitea_avatar
|
|
|
|
toolkit.register(update_gitea_avatar, name="update_gitea_avatar")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.debug("update_gitea_avatar tool not available: %s", exc)
|
|
|
|
try:
|
|
from timmy.session_logger import self_reflect, session_history
|
|
|
|
toolkit.register(session_history, name="session_history")
|
|
toolkit.register(self_reflect, name="self_reflect")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (session_history registration): %s", exc)
|
|
logger.debug("session_history tool not available")
|
|
|
|
|
|
def _register_delegation_tools(toolkit: Toolkit) -> None:
|
|
"""Register inter-agent delegation tools."""
|
|
try:
|
|
from timmy.tools_delegation import delegate_task, delegate_to_kimi, list_swarm_agents
|
|
|
|
toolkit.register(delegate_task, name="delegate_task")
|
|
toolkit.register(delegate_to_kimi, name="delegate_to_kimi")
|
|
toolkit.register(list_swarm_agents, name="list_swarm_agents")
|
|
except Exception as exc:
|
|
logger.warning("Tool execution failed (Delegation tools registration): %s", exc)
|
|
logger.debug("Delegation tools not available")
|
|
|
|
|
|
def _register_gematria_tool(toolkit: Toolkit) -> None:
|
|
"""Register the gematria computation tool."""
|
|
try:
|
|
from timmy.gematria import gematria
|
|
|
|
toolkit.register(gematria, name="gematria")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Gematria registration): %s", exc)
|
|
logger.debug("Gematria tool not available")
|
|
|
|
|
|
def _register_artifact_tools(toolkit: Toolkit) -> None:
|
|
"""Register artifact tools — notes and decision logging."""
|
|
try:
|
|
from timmy.memory_system import jot_note, log_decision
|
|
|
|
toolkit.register(jot_note, name="jot_note")
|
|
toolkit.register(log_decision, name="log_decision")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Artifact tools registration): %s", exc)
|
|
logger.debug("Artifact tools not available")
|
|
|
|
|
|
def _register_thinking_tools(toolkit: Toolkit) -> None:
|
|
"""Register thinking/introspection tools for self-reflection."""
|
|
try:
|
|
from timmy.thinking import search_thoughts
|
|
|
|
toolkit.register(search_thoughts, name="thought_search")
|
|
except (ImportError, AttributeError) as exc:
|
|
logger.warning("Tool execution failed (Thinking tools registration): %s", exc)
|
|
logger.debug("Thinking tools not available")
|
|
|
|
|
|
def create_full_toolkit(base_dir: str | Path | None = None):
|
|
"""Create a full toolkit with all available tools (for the orchestrator).
|
|
|
|
Includes: web search, file read/write, shell commands, python execution,
|
|
memory search for contextual recall, and Grok consultation.
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
# Return None when tools aren't available (tests)
|
|
return None
|
|
|
|
from timmy.tool_safety import DANGEROUS_TOOLS
|
|
|
|
toolkit = Toolkit(name="full")
|
|
# Set requires_confirmation_tools AFTER construction (avoids agno WARNING
|
|
# about tools not yet registered) but BEFORE register() calls (so each
|
|
# Function gets requires_confirmation=True). Fixes #79.
|
|
toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS)
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
|
|
_register_core_tools(toolkit, base_path)
|
|
_register_grok_tool(toolkit)
|
|
_register_memory_tools(toolkit)
|
|
_register_agentic_loop_tool(toolkit)
|
|
_register_introspection_tools(toolkit)
|
|
_register_delegation_tools(toolkit)
|
|
_register_gematria_tool(toolkit)
|
|
_register_artifact_tools(toolkit)
|
|
_register_thinking_tools(toolkit)
|
|
|
|
# Gitea issue management is now provided by the gitea-mcp server
|
|
# (wired in as MCPTools in agent.py, not registered here)
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_experiment_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the experiment agent (Lab).
|
|
|
|
Includes: prepare_experiment, run_experiment, evaluate_result,
|
|
plus shell + file ops for editing training code.
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
|
|
from config import settings
|
|
|
|
toolkit = Toolkit(name="experiment")
|
|
|
|
from timmy.autoresearch import evaluate_result, prepare_experiment, run_experiment
|
|
|
|
workspace = (
|
|
Path(base_dir) if base_dir else Path(settings.repo_root) / settings.autoresearch_workspace
|
|
)
|
|
|
|
def _prepare(repo_url: str = "https://github.com/karpathy/autoresearch.git") -> str:
|
|
"""Clone and prepare an autoresearch experiment workspace."""
|
|
return prepare_experiment(workspace, repo_url)
|
|
|
|
def _run(timeout: int = 0) -> str:
|
|
"""Run a single training experiment with wall-clock timeout."""
|
|
t = timeout or settings.autoresearch_time_budget
|
|
result = run_experiment(workspace, timeout=t, metric_name=settings.autoresearch_metric)
|
|
if result["success"] and result["metric"] is not None:
|
|
return (
|
|
f"{settings.autoresearch_metric}: {result['metric']:.4f} ({result['duration_s']}s)"
|
|
)
|
|
return result.get("error") or "Experiment failed"
|
|
|
|
def _evaluate(current: float, baseline: float) -> str:
|
|
"""Compare current metric against baseline."""
|
|
return evaluate_result(current, baseline, metric_name=settings.autoresearch_metric)
|
|
|
|
toolkit.register(_prepare, name="prepare_experiment")
|
|
toolkit.register(_run, name="run_experiment")
|
|
toolkit.register(_evaluate, name="evaluate_result")
|
|
|
|
# Also give Lab access to file + shell tools for editing train.py
|
|
shell_tools = ShellTools()
|
|
toolkit.register(shell_tools.run_shell_command, name="shell")
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
# Mapping of agent IDs to their toolkits
|
|
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
|
|
"echo": create_research_tools,
|
|
"mace": create_security_tools,
|
|
"helm": create_devops_tools,
|
|
"seer": create_data_tools,
|
|
"forge": create_code_tools,
|
|
"quill": create_writing_tools,
|
|
"lab": create_experiment_tools,
|
|
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
|
|
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
|
|
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
|
|
}
|
|
|
|
|
|
def _create_stub_toolkit(name: str):
|
|
"""Create a minimal Agno toolkit for creative agents.
|
|
|
|
Creative agents use their own dedicated tool modules rather than
|
|
Agno-wrapped functions. This stub ensures AGENT_TOOLKITS has an
|
|
entry so ToolExecutor doesn't fall back to the full toolkit.
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
return None
|
|
toolkit = Toolkit(name=name)
|
|
return toolkit
|
|
|
|
|
|
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
|
|
"""Get the appropriate toolkit for an agent.
|
|
|
|
Args:
|
|
agent_id: The agent ID (echo, mace, helm, seer, forge, quill)
|
|
base_dir: Optional base directory for file operations
|
|
|
|
Returns:
|
|
A Toolkit instance or None if agent_id is not recognized
|
|
"""
|
|
factory = AGENT_TOOLKITS.get(agent_id)
|
|
if factory:
|
|
return factory(base_dir)
|
|
return None
|
|
|
|
|
|
# Backward-compat alias
|
|
get_tools_for_persona = get_tools_for_agent
|
|
PERSONA_TOOLKITS = AGENT_TOOLKITS
|
|
|
|
|
|
def _core_tool_catalog() -> dict:
|
|
"""Return core file and execution tools catalog entries."""
|
|
return {
|
|
"shell": {
|
|
"name": "Shell Commands",
|
|
"description": "Execute shell commands (sandboxed)",
|
|
"available_in": ["forge", "mace", "helm", "orchestrator"],
|
|
},
|
|
"python": {
|
|
"name": "Python Execution",
|
|
"description": "Execute Python code for analysis and scripting",
|
|
"available_in": ["forge", "seer", "orchestrator"],
|
|
},
|
|
"read_file": {
|
|
"name": "Read File",
|
|
"description": "Read contents of local files",
|
|
"available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"],
|
|
},
|
|
"write_file": {
|
|
"name": "Write File",
|
|
"description": "Write content to local files",
|
|
"available_in": ["forge", "quill", "helm", "orchestrator"],
|
|
},
|
|
"list_files": {
|
|
"name": "List Files",
|
|
"description": "List files in a directory",
|
|
"available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"],
|
|
},
|
|
}
|
|
|
|
|
|
def _analysis_tool_catalog() -> dict:
|
|
"""Return analysis and calculation tools catalog entries."""
|
|
return {
|
|
"calculator": {
|
|
"name": "Calculator",
|
|
"description": "Evaluate mathematical expressions with exact results",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
}
|
|
|
|
|
|
def _ai_tool_catalog() -> dict:
|
|
"""Return AI assistant and frontier reasoning tools catalog entries."""
|
|
return {
|
|
"consult_grok": {
|
|
"name": "Consult Grok",
|
|
"description": "Premium frontier reasoning via xAI Grok (opt-in, Lightning-payable)",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"aider": {
|
|
"name": "Aider AI Assistant",
|
|
"description": "Local AI coding assistant using Ollama (qwen3:30b or deepseek-coder)",
|
|
"available_in": ["forge", "orchestrator"],
|
|
},
|
|
}
|
|
|
|
|
|
def _introspection_tool_catalog() -> dict:
|
|
"""Return system introspection tools catalog entries."""
|
|
return {
|
|
"get_system_info": {
|
|
"name": "System Info",
|
|
"description": "Introspect runtime environment - discover model, Python version, config",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"check_ollama_health": {
|
|
"name": "Ollama Health",
|
|
"description": "Check if Ollama is accessible and what models are available",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"get_memory_status": {
|
|
"name": "Memory Status",
|
|
"description": "Check status of memory tiers (hot memory, vault)",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"session_history": {
|
|
"name": "Session History",
|
|
"description": "Search past conversation logs for messages, tool calls, errors, and decisions",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"thought_search": {
|
|
"name": "Thought Search",
|
|
"description": "Query Timmy's own thought history for past reflections and insights",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"self_reflect": {
|
|
"name": "Self-Reflect",
|
|
"description": "Review recent conversations to spot patterns, low-confidence answers, and errors",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
"update_gitea_avatar": {
|
|
"name": "Update Gitea Avatar",
|
|
"description": "Generate and upload a wizard-themed avatar to Timmy's Gitea profile",
|
|
"available_in": ["orchestrator"],
|
|
},
|
|
}
|
|
|
|
|
|
def _experiment_tool_catalog() -> dict:
|
|
"""Return ML experiment tools catalog entries."""
|
|
return {
|
|
"prepare_experiment": {
|
|
"name": "Prepare Experiment",
|
|
"description": "Clone autoresearch repo and run data preparation for ML experiments",
|
|
"available_in": ["lab", "orchestrator"],
|
|
},
|
|
"run_experiment": {
|
|
"name": "Run Experiment",
|
|
"description": "Execute a time-boxed ML training experiment and capture metrics",
|
|
"available_in": ["lab", "orchestrator"],
|
|
},
|
|
"evaluate_result": {
|
|
"name": "Evaluate Result",
|
|
"description": "Compare experiment metric against baseline to assess improvement",
|
|
"available_in": ["lab", "orchestrator"],
|
|
},
|
|
}
|
|
|
|
|
|
def _import_creative_catalogs(catalog: dict) -> None:
|
|
"""Import and merge creative tool catalogs from creative module."""
|
|
# ── Git tools ─────────────────────────────────────────────────────────────
|
|
try:
|
|
from creative.tools.git_tools import GIT_TOOL_CATALOG
|
|
|
|
for tool_id, info in GIT_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["forge", "helm", "orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# ── Image tools ────────────────────────────────────────────────────────────
|
|
try:
|
|
from creative.tools.image_tools import IMAGE_TOOL_CATALOG
|
|
|
|
for tool_id, info in IMAGE_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["pixel", "orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# ── Music tools ────────────────────────────────────────────────────────────
|
|
try:
|
|
from creative.tools.music_tools import MUSIC_TOOL_CATALOG
|
|
|
|
for tool_id, info in MUSIC_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["lyra", "orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# ── Video tools ────────────────────────────────────────────────────────────
|
|
try:
|
|
from creative.tools.video_tools import VIDEO_TOOL_CATALOG
|
|
|
|
for tool_id, info in VIDEO_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["reel", "orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# ── Creative pipeline ──────────────────────────────────────────────────────
|
|
try:
|
|
from creative.director import DIRECTOR_TOOL_CATALOG
|
|
|
|
for tool_id, info in DIRECTOR_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
# ── Assembler tools ───────────────────────────────────────────────────────
|
|
try:
|
|
from creative.assembler import ASSEMBLER_TOOL_CATALOG
|
|
|
|
for tool_id, info in ASSEMBLER_TOOL_CATALOG.items():
|
|
catalog[tool_id] = {
|
|
"name": info["name"],
|
|
"description": info["description"],
|
|
"available_in": ["reel", "orchestrator"],
|
|
}
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
def get_all_available_tools() -> dict[str, dict]:
|
|
"""Get a catalog of all available tools and their descriptions.
|
|
|
|
Returns:
|
|
Dict mapping tool categories to their tools and descriptions.
|
|
"""
|
|
catalog = {}
|
|
catalog.update(_core_tool_catalog())
|
|
catalog.update(_analysis_tool_catalog())
|
|
catalog.update(_ai_tool_catalog())
|
|
catalog.update(_introspection_tool_catalog())
|
|
catalog.update(_experiment_tool_catalog())
|
|
_import_creative_catalogs(catalog)
|
|
return catalog
|