[claude] refactor: break up monolithic tools.py into a tools/ package (#1215) (#1221)

This commit is contained in:
2026-03-23 22:43:09 +00:00
parent 31c260cc95
commit d697c3d93e
5 changed files with 725 additions and 531 deletions

View File

@@ -0,0 +1,94 @@
"""Tool integration for the agent swarm.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
Sub-modules:
- _base: shared types, tracking state
- file_tools: file-operation toolkit factories (Echo, Quill, Seer)
- system_tools: calculator, AI tools, code/devops toolkit factories
- _registry: full toolkit construction, agent registry, tool catalog
"""
# Re-export everything for backward compatibility — callers that do
# ``from timmy.tools import <symbol>`` continue to work unchanged.
from timmy.tools._base import (
AgentTools,
PersonaTools,
ToolStats,
_AGNO_TOOLS_AVAILABLE,
_ImportError,
_TOOL_USAGE,
_track_tool_usage,
get_tool_stats,
)
from timmy.tools._registry import (
AGENT_TOOLKITS,
PERSONA_TOOLKITS,
_create_stub_toolkit,
_merge_catalog,
create_experiment_tools,
create_full_toolkit,
get_all_available_tools,
get_tools_for_agent,
get_tools_for_persona,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
_safe_eval,
calculator,
consult_grok,
create_aider_tool,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
__all__ = [
# _base
"AgentTools",
"PersonaTools",
"ToolStats",
"_AGNO_TOOLS_AVAILABLE",
"_ImportError",
"_TOOL_USAGE",
"_track_tool_usage",
"get_tool_stats",
# file_tools
"_make_smart_read_file",
"create_data_tools",
"create_research_tools",
"create_writing_tools",
# system_tools
"_safe_eval",
"calculator",
"consult_grok",
"create_aider_tool",
"create_code_tools",
"create_devops_tools",
"create_security_tools",
"web_fetch",
# _registry
"AGENT_TOOLKITS",
"PERSONA_TOOLKITS",
"_create_stub_toolkit",
"_merge_catalog",
"create_experiment_tools",
"create_full_toolkit",
"get_all_available_tools",
"get_tools_for_agent",
"get_tools_for_persona",
]

90
src/timmy/tools/_base.py Normal file
View File

@@ -0,0 +1,90 @@
"""Base types, shared state, and tracking for the Timmy tool system."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: "Toolkit"
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats

View File

@@ -1,532 +1,48 @@
"""Tool integration for the agent swarm.
"""Tool registry, full toolkit construction, and tool catalog.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
Provides:
- Internal _register_* helpers for wiring tools into toolkits
- create_full_toolkit (orchestrator toolkit)
- create_experiment_tools (Lab agent toolkit)
- AGENT_TOOLKITS / get_tools_for_agent registry
- get_all_available_tools catalog
"""
from __future__ import annotations
import ast
import logging
import math
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from config import settings
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
calculator,
consult_grok,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: Toolkit
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def _make_smart_read_file(file_tools: FileTools) -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
import subprocess
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
# ---------------------------------------------------------------------------
# Internal _register_* helpers
# ---------------------------------------------------------------------------
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
@@ -717,6 +233,11 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
raise
# ---------------------------------------------------------------------------
# Full toolkit factories
# ---------------------------------------------------------------------------
def create_full_toolkit(base_dir: str | Path | None = None):
"""Create a full toolkit with all available tools (for the orchestrator).
@@ -727,6 +248,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
# Return None when tools aren't available (tests)
return None
from config import settings
from timmy.tool_safety import DANGEROUS_TOOLS
toolkit = Toolkit(name="full")
@@ -808,19 +330,9 @@ def create_experiment_tools(base_dir: str | Path | None = None):
return toolkit
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
"mace": create_security_tools,
"helm": create_devops_tools,
"seer": create_data_tools,
"forge": create_code_tools,
"quill": create_writing_tools,
"lab": create_experiment_tools,
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
}
# ---------------------------------------------------------------------------
# Agent toolkit registry
# ---------------------------------------------------------------------------
def _create_stub_toolkit(name: str):
@@ -836,7 +348,22 @@ def _create_stub_toolkit(name: str):
return toolkit
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
"mace": create_security_tools,
"helm": create_devops_tools,
"seer": create_data_tools,
"forge": create_code_tools,
"quill": create_writing_tools,
"lab": create_experiment_tools,
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
}
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "Toolkit | None":
"""Get the appropriate toolkit for an agent.
Args:
@@ -852,11 +379,16 @@ def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> To
return None
# Backward-compat alias
# Backward-compat aliases
get_tools_for_persona = get_tools_for_agent
PERSONA_TOOLKITS = AGENT_TOOLKITS
# ---------------------------------------------------------------------------
# Tool catalog
# ---------------------------------------------------------------------------
def _core_tool_catalog() -> dict:
"""Return core file and execution tools catalog entries."""
return {

View File

@@ -0,0 +1,121 @@
"""File operation tools and agent toolkit factories for file-heavy agents.
Provides:
- Smart read_file wrapper (auto-lists directories)
- Toolkit factories for Echo (research), Quill (writing), Seer (data)
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
Toolkit,
)
logger = logging.getLogger(__name__)
def _make_smart_read_file(file_tools: "FileTools") -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit

View File

@@ -0,0 +1,357 @@
"""System, calculation, and AI consultation tools for Timmy agents.
Provides:
- Safe AST-based calculator
- consult_grok (xAI frontier reasoning)
- web_fetch (content extraction)
- Toolkit factories for Forge (code), Mace (security), Helm (devops)
"""
from __future__ import annotations
import ast
import logging
import math
import subprocess
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import _make_smart_read_file
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend — use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit