From d697c3d93ec690d38c45ce715ae83955a47e220f Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Mon, 23 Mar 2026 22:43:09 +0000 Subject: [PATCH] [claude] refactor: break up monolithic tools.py into a tools/ package (#1215) (#1221) --- src/timmy/tools/__init__.py | 94 ++++ src/timmy/tools/_base.py | 90 ++++ src/timmy/{tools.py => tools/_registry.py} | 594 +++------------------ src/timmy/tools/file_tools.py | 121 +++++ src/timmy/tools/system_tools.py | 357 +++++++++++++ 5 files changed, 725 insertions(+), 531 deletions(-) create mode 100644 src/timmy/tools/__init__.py create mode 100644 src/timmy/tools/_base.py rename src/timmy/{tools.py => tools/_registry.py} (51%) create mode 100644 src/timmy/tools/file_tools.py create mode 100644 src/timmy/tools/system_tools.py diff --git a/src/timmy/tools/__init__.py b/src/timmy/tools/__init__.py new file mode 100644 index 0000000..dd5d955 --- /dev/null +++ b/src/timmy/tools/__init__.py @@ -0,0 +1,94 @@ +"""Tool integration for the agent swarm. + +Provides agents with capabilities for: +- File read/write (local filesystem) +- Shell command execution (sandboxed) +- Python code execution +- Git operations +- Image / Music / Video generation (creative pipeline) + +Tools are assigned to agents based on their specialties. + +Sub-modules: +- _base: shared types, tracking state +- file_tools: file-operation toolkit factories (Echo, Quill, Seer) +- system_tools: calculator, AI tools, code/devops toolkit factories +- _registry: full toolkit construction, agent registry, tool catalog +""" + +# Re-export everything for backward compatibility — callers that do +# ``from timmy.tools import `` continue to work unchanged. + +from timmy.tools._base import ( + AgentTools, + PersonaTools, + ToolStats, + _AGNO_TOOLS_AVAILABLE, + _ImportError, + _TOOL_USAGE, + _track_tool_usage, + get_tool_stats, +) +from timmy.tools._registry import ( + AGENT_TOOLKITS, + PERSONA_TOOLKITS, + _create_stub_toolkit, + _merge_catalog, + create_experiment_tools, + create_full_toolkit, + get_all_available_tools, + get_tools_for_agent, + get_tools_for_persona, +) +from timmy.tools.file_tools import ( + _make_smart_read_file, + create_data_tools, + create_research_tools, + create_writing_tools, +) +from timmy.tools.system_tools import ( + _safe_eval, + calculator, + consult_grok, + create_aider_tool, + create_code_tools, + create_devops_tools, + create_security_tools, + web_fetch, +) + +__all__ = [ + # _base + "AgentTools", + "PersonaTools", + "ToolStats", + "_AGNO_TOOLS_AVAILABLE", + "_ImportError", + "_TOOL_USAGE", + "_track_tool_usage", + "get_tool_stats", + # file_tools + "_make_smart_read_file", + "create_data_tools", + "create_research_tools", + "create_writing_tools", + # system_tools + "_safe_eval", + "calculator", + "consult_grok", + "create_aider_tool", + "create_code_tools", + "create_devops_tools", + "create_security_tools", + "web_fetch", + # _registry + "AGENT_TOOLKITS", + "PERSONA_TOOLKITS", + "_create_stub_toolkit", + "_merge_catalog", + "create_experiment_tools", + "create_full_toolkit", + "get_all_available_tools", + "get_tools_for_agent", + "get_tools_for_persona", +] diff --git a/src/timmy/tools/_base.py b/src/timmy/tools/_base.py new file mode 100644 index 0000000..98efb08 --- /dev/null +++ b/src/timmy/tools/_base.py @@ -0,0 +1,90 @@ +"""Base types, shared state, and tracking for the Timmy tool system.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import UTC, datetime + +logger = logging.getLogger(__name__) + +# Lazy imports to handle test mocking +_ImportError = None +try: + from agno.tools import Toolkit + from agno.tools.file import FileTools + from agno.tools.python import PythonTools + from agno.tools.shell import ShellTools + + _AGNO_TOOLS_AVAILABLE = True +except ImportError as e: + _AGNO_TOOLS_AVAILABLE = False + _ImportError = e + +# Track tool usage stats +_TOOL_USAGE: dict[str, list[dict]] = {} + + +@dataclass +class ToolStats: + """Statistics for a single tool.""" + + tool_name: str + call_count: int = 0 + last_used: str | None = None + errors: int = 0 + + +@dataclass +class AgentTools: + """Tools assigned to an agent.""" + + agent_id: str + agent_name: str + toolkit: "Toolkit" + available_tools: list[str] = field(default_factory=list) + + +# Backward-compat alias +PersonaTools = AgentTools + + +def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None: + """Track tool usage for analytics.""" + if agent_id not in _TOOL_USAGE: + _TOOL_USAGE[agent_id] = [] + _TOOL_USAGE[agent_id].append( + { + "tool": tool_name, + "timestamp": datetime.now(UTC).isoformat(), + "success": success, + } + ) + + +def get_tool_stats(agent_id: str | None = None) -> dict: + """Get tool usage statistics. + + Args: + agent_id: Optional agent ID to filter by. If None, returns stats for all agents. + + Returns: + Dict with tool usage statistics. + """ + if agent_id: + usage = _TOOL_USAGE.get(agent_id, []) + return { + "agent_id": agent_id, + "total_calls": len(usage), + "tools_used": list(set(u["tool"] for u in usage)), + "recent_calls": usage[-10:] if usage else [], + } + + # Return stats for all agents + all_stats = {} + for aid, usage in _TOOL_USAGE.items(): + all_stats[aid] = { + "total_calls": len(usage), + "tools_used": list(set(u["tool"] for u in usage)), + } + return all_stats diff --git a/src/timmy/tools.py b/src/timmy/tools/_registry.py similarity index 51% rename from src/timmy/tools.py rename to src/timmy/tools/_registry.py index 035a915..e6bd6bb 100644 --- a/src/timmy/tools.py +++ b/src/timmy/tools/_registry.py @@ -1,532 +1,48 @@ -"""Tool integration for the agent swarm. +"""Tool registry, full toolkit construction, and tool catalog. -Provides agents with capabilities for: -- File read/write (local filesystem) -- Shell command execution (sandboxed) -- Python code execution -- Git operations -- Image / Music / Video generation (creative pipeline) - -Tools are assigned to agents based on their specialties. +Provides: +- Internal _register_* helpers for wiring tools into toolkits +- create_full_toolkit (orchestrator toolkit) +- create_experiment_tools (Lab agent toolkit) +- AGENT_TOOLKITS / get_tools_for_agent registry +- get_all_available_tools catalog """ from __future__ import annotations -import ast import logging -import math from collections.abc import Callable -from dataclasses import dataclass, field -from datetime import UTC, datetime from pathlib import Path -from config import settings +from timmy.tools._base import ( + _AGNO_TOOLS_AVAILABLE, + _ImportError, + FileTools, + PythonTools, + ShellTools, + Toolkit, +) +from timmy.tools.file_tools import ( + _make_smart_read_file, + create_data_tools, + create_research_tools, + create_writing_tools, +) +from timmy.tools.system_tools import ( + calculator, + consult_grok, + create_code_tools, + create_devops_tools, + create_security_tools, + web_fetch, +) logger = logging.getLogger(__name__) -# Max characters of user query included in Lightning invoice memo -_INVOICE_MEMO_MAX_LEN = 50 -# Lazy imports to handle test mocking -_ImportError = None -try: - from agno.tools import Toolkit - from agno.tools.file import FileTools - from agno.tools.python import PythonTools - from agno.tools.shell import ShellTools - - _AGNO_TOOLS_AVAILABLE = True -except ImportError as e: - _AGNO_TOOLS_AVAILABLE = False - _ImportError = e - -# Track tool usage stats -_TOOL_USAGE: dict[str, list[dict]] = {} - - -@dataclass -class ToolStats: - """Statistics for a single tool.""" - - tool_name: str - call_count: int = 0 - last_used: str | None = None - errors: int = 0 - - -@dataclass -class AgentTools: - """Tools assigned to an agent.""" - - agent_id: str - agent_name: str - toolkit: Toolkit - available_tools: list[str] = field(default_factory=list) - - -# Backward-compat alias -PersonaTools = AgentTools - - -def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None: - """Track tool usage for analytics.""" - if agent_id not in _TOOL_USAGE: - _TOOL_USAGE[agent_id] = [] - _TOOL_USAGE[agent_id].append( - { - "tool": tool_name, - "timestamp": datetime.now(UTC).isoformat(), - "success": success, - } - ) - - -def get_tool_stats(agent_id: str | None = None) -> dict: - """Get tool usage statistics. - - Args: - agent_id: Optional agent ID to filter by. If None, returns stats for all agents. - - Returns: - Dict with tool usage statistics. - """ - if agent_id: - usage = _TOOL_USAGE.get(agent_id, []) - return { - "agent_id": agent_id, - "total_calls": len(usage), - "tools_used": list(set(u["tool"] for u in usage)), - "recent_calls": usage[-10:] if usage else [], - } - - # Return stats for all agents - all_stats = {} - for aid, usage in _TOOL_USAGE.items(): - all_stats[aid] = { - "total_calls": len(usage), - "tools_used": list(set(u["tool"] for u in usage)), - } - return all_stats - - -def _safe_eval(node, allowed_names: dict): - """Walk an AST and evaluate only safe numeric operations.""" - if isinstance(node, ast.Expression): - return _safe_eval(node.body, allowed_names) - if isinstance(node, ast.Constant): - if isinstance(node.value, (int, float, complex)): - return node.value - raise ValueError(f"Unsupported constant: {node.value!r}") - if isinstance(node, ast.UnaryOp): - operand = _safe_eval(node.operand, allowed_names) - if isinstance(node.op, ast.UAdd): - return +operand - if isinstance(node.op, ast.USub): - return -operand - raise ValueError(f"Unsupported unary op: {type(node.op).__name__}") - if isinstance(node, ast.BinOp): - left = _safe_eval(node.left, allowed_names) - right = _safe_eval(node.right, allowed_names) - ops = { - ast.Add: lambda a, b: a + b, - ast.Sub: lambda a, b: a - b, - ast.Mult: lambda a, b: a * b, - ast.Div: lambda a, b: a / b, - ast.FloorDiv: lambda a, b: a // b, - ast.Mod: lambda a, b: a % b, - ast.Pow: lambda a, b: a**b, - } - op_fn = ops.get(type(node.op)) - if op_fn is None: - raise ValueError(f"Unsupported binary op: {type(node.op).__name__}") - return op_fn(left, right) - if isinstance(node, ast.Name): - if node.id in allowed_names: - return allowed_names[node.id] - raise ValueError(f"Unknown name: {node.id!r}") - if isinstance(node, ast.Attribute): - value = _safe_eval(node.value, allowed_names) - # Only allow attribute access on the math module - if value is math: - attr = getattr(math, node.attr, None) - if attr is not None: - return attr - raise ValueError(f"Attribute access not allowed: .{node.attr}") - if isinstance(node, ast.Call): - func = _safe_eval(node.func, allowed_names) - if not callable(func): - raise ValueError(f"Not callable: {func!r}") - args = [_safe_eval(a, allowed_names) for a in node.args] - kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords} - return func(*args, **kwargs) - raise ValueError(f"Unsupported syntax: {type(node).__name__}") - - -def calculator(expression: str) -> str: - """Evaluate a mathematical expression and return the exact result. - - Use this tool for ANY arithmetic: multiplication, division, square roots, - exponents, percentages, logarithms, trigonometry, etc. - - Args: - expression: A valid Python math expression, e.g. '347 * 829', - 'math.sqrt(17161)', '2**10', 'math.log(100, 10)'. - - Returns: - The exact result as a string. - """ - allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} - allowed_names["math"] = math - allowed_names["abs"] = abs - allowed_names["round"] = round - allowed_names["min"] = min - allowed_names["max"] = max - try: - tree = ast.parse(expression, mode="eval") - result = _safe_eval(tree, allowed_names) - return str(result) - except Exception as e: # broad catch intentional: arbitrary code execution - return f"Error evaluating '{expression}': {e}" - - -def _make_smart_read_file(file_tools: FileTools) -> Callable: - """Wrap FileTools.read_file so directories auto-list their contents. - - When the user (or the LLM) passes a directory path to read_file, - the raw Agno implementation throws an IsADirectoryError. This - wrapper detects that case, lists the directory entries, and returns - a helpful message so the model can pick the right file on its own. - """ - original_read = file_tools.read_file - - def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str: - """Reads the contents of the file `file_name` and returns the contents if successful.""" - # LLMs often call read_file(path=...) instead of read_file(file_name=...) - if not file_name: - file_name = kwargs.get("path", "") - if not file_name: - return "Error: no file_name or path provided." - # Resolve the path the same way FileTools does - _safe, resolved = file_tools.check_escape(file_name) - if _safe and resolved.is_dir(): - entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith(".")) - listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)" - return ( - f"'{file_name}' is a directory, not a file. " - f"Files inside:\n{listing}\n\n" - "Please call read_file with one of the files listed above." - ) - return original_read(file_name, encoding=encoding) - - # Preserve the original docstring for Agno tool schema generation - smart_read_file.__doc__ = original_read.__doc__ - return smart_read_file - - -def create_research_tools(base_dir: str | Path | None = None): - """Create tools for the research agent (Echo). - - Includes: file reading - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="research") - - # File reading - from config import settings - - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.list_files, name="list_files") - - return toolkit - - -def create_code_tools(base_dir: str | Path | None = None): - """Create tools for the code agent (Forge). - - Includes: shell commands, python execution, file read/write, Aider AI assist - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="code") - - # Shell commands (sandboxed) - shell_tools = ShellTools() - toolkit.register(shell_tools.run_shell_command, name="shell") - - # Python execution - python_tools = PythonTools() - toolkit.register(python_tools.run_python_code, name="python") - - # File operations - from config import settings - - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.save_file, name="write_file") - toolkit.register(file_tools.list_files, name="list_files") - - # Aider AI coding assistant (local with Ollama) - aider_tool = create_aider_tool(base_path) - toolkit.register(aider_tool.run_aider, name="aider") - - return toolkit - - -def create_aider_tool(base_path: Path): - """Create an Aider tool for AI-assisted coding.""" - import subprocess - - class AiderTool: - """Tool that calls Aider (local AI coding assistant) for code generation.""" - - def __init__(self, base_dir: Path): - self.base_dir = base_dir - - def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str: - """Run Aider to generate code changes. - - Args: - prompt: What you want Aider to do (e.g., "add a fibonacci function") - model: Ollama model to use (default: qwen3:30b) - - Returns: - Aider's response with the code changes made - """ - try: - # Run aider with the prompt - result = subprocess.run( - [ - "aider", - "--no-git", - "--model", - f"ollama/{model}", - "--quiet", - prompt, - ], - capture_output=True, - text=True, - timeout=120, - cwd=str(self.base_dir), - ) - - if result.returncode == 0: - return result.stdout if result.stdout else "Code changes applied successfully" - else: - return f"Aider error: {result.stderr}" - except FileNotFoundError: - return "Error: Aider not installed. Run: pip install aider" - except subprocess.TimeoutExpired: - return "Error: Aider timed out after 120 seconds" - except (OSError, subprocess.SubprocessError) as e: - return f"Error running Aider: {str(e)}" - - return AiderTool(base_path) - - -def create_data_tools(base_dir: str | Path | None = None): - """Create tools for the data agent (Seer). - - Includes: python execution, file reading, web search for data sources - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="data") - - # Python execution for analysis - python_tools = PythonTools() - toolkit.register(python_tools.run_python_code, name="python") - - # File reading - from config import settings - - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.list_files, name="list_files") - - return toolkit - - -def create_writing_tools(base_dir: str | Path | None = None): - """Create tools for the writing agent (Quill). - - Includes: file read/write - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="writing") - - # File operations - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.save_file, name="write_file") - toolkit.register(file_tools.list_files, name="list_files") - - return toolkit - - -def create_security_tools(base_dir: str | Path | None = None): - """Create tools for the security agent (Mace). - - Includes: shell commands (for scanning), file read - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="security") - - # Shell for running security scans - shell_tools = ShellTools() - toolkit.register(shell_tools.run_shell_command, name="shell") - - # File reading for logs/configs - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.list_files, name="list_files") - - return toolkit - - -def create_devops_tools(base_dir: str | Path | None = None): - """Create tools for the DevOps agent (Helm). - - Includes: shell commands, file read/write - """ - if not _AGNO_TOOLS_AVAILABLE: - raise ImportError(f"Agno tools not available: {_ImportError}") - toolkit = Toolkit(name="devops") - - # Shell for deployment commands - shell_tools = ShellTools() - toolkit.register(shell_tools.run_shell_command, name="shell") - - # File operations for config management - base_path = Path(base_dir) if base_dir else Path(settings.repo_root) - file_tools = FileTools(base_dir=base_path) - toolkit.register(_make_smart_read_file(file_tools), name="read_file") - toolkit.register(file_tools.save_file, name="write_file") - toolkit.register(file_tools.list_files, name="list_files") - - return toolkit - - -def consult_grok(query: str) -> str: - """Consult Grok (xAI) for frontier reasoning on complex questions. - - Use this tool when a question requires advanced reasoning, real-time - knowledge, or capabilities beyond the local model. Grok is a premium - cloud backend — use sparingly and only for high-complexity queries. - - Args: - query: The question or reasoning task to send to Grok. - - Returns: - Grok's response text, or an error/status message. - """ - from config import settings - from timmy.backends import get_grok_backend, grok_available - - if not grok_available(): - return ( - "Grok is not available. Enable with GROK_ENABLED=true " - "and set XAI_API_KEY in your .env file." - ) - - backend = get_grok_backend() - - # Log to Spark if available - try: - from spark.engine import spark_engine - - spark_engine.on_tool_executed( - agent_id="default", - tool_name="consult_grok", - success=True, - ) - except (ImportError, AttributeError) as exc: - logger.warning("Tool execution failed (consult_grok logging): %s", exc) - - # Generate Lightning invoice for monetization (unless free mode) - invoice_info = "" - if not settings.grok_free: - try: - from lightning.factory import get_backend as get_ln_backend - - ln = get_ln_backend() - sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap) - inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}") - invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]" - except (ImportError, OSError, ValueError) as exc: - logger.error("Lightning invoice creation failed: %s", exc) - return "Error: Failed to create Lightning invoice. Please check logs." - - result = backend.run(query) - - response = result.content - if invoice_info: - response += invoice_info - - return response - - -def web_fetch(url: str, max_tokens: int = 4000) -> str: - """Fetch a web page and return its main text content. - - Downloads the URL, extracts readable text using trafilatura, and - truncates to a token budget. Use this to read full articles, docs, - or blog posts that web_search only returns snippets for. - - Args: - url: The URL to fetch (must start with http:// or https://). - max_tokens: Maximum approximate token budget (default 4000). - Text is truncated to max_tokens * 4 characters. - - Returns: - Extracted text content, or an error message on failure. - """ - if not url or not url.startswith(("http://", "https://")): - return f"Error: invalid URL — must start with http:// or https://: {url!r}" - - try: - import requests as _requests - except ImportError: - return "Error: 'requests' package is not installed. Install with: pip install requests" - - try: - import trafilatura - except ImportError: - return ( - "Error: 'trafilatura' package is not installed. Install with: pip install trafilatura" - ) - - try: - resp = _requests.get( - url, - timeout=15, - headers={"User-Agent": "TimmyResearchBot/1.0"}, - ) - resp.raise_for_status() - except _requests.exceptions.Timeout: - return f"Error: request timed out after 15 seconds for {url}" - except _requests.exceptions.HTTPError as exc: - return f"Error: HTTP {exc.response.status_code} for {url}" - except _requests.exceptions.RequestException as exc: - return f"Error: failed to fetch {url} — {exc}" - - text = trafilatura.extract(resp.text, include_tables=True, include_links=True) - if not text: - return f"Error: could not extract readable content from {url}" - - char_budget = max_tokens * 4 - if len(text) > char_budget: - text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]" - - return text +# --------------------------------------------------------------------------- +# Internal _register_* helpers +# --------------------------------------------------------------------------- def _register_web_fetch_tool(toolkit: Toolkit) -> None: @@ -717,6 +233,11 @@ def _register_thinking_tools(toolkit: Toolkit) -> None: raise +# --------------------------------------------------------------------------- +# Full toolkit factories +# --------------------------------------------------------------------------- + + def create_full_toolkit(base_dir: str | Path | None = None): """Create a full toolkit with all available tools (for the orchestrator). @@ -727,6 +248,7 @@ def create_full_toolkit(base_dir: str | Path | None = None): # Return None when tools aren't available (tests) return None + from config import settings from timmy.tool_safety import DANGEROUS_TOOLS toolkit = Toolkit(name="full") @@ -808,19 +330,9 @@ def create_experiment_tools(base_dir: str | Path | None = None): return toolkit -# Mapping of agent IDs to their toolkits -AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = { - "echo": create_research_tools, - "mace": create_security_tools, - "helm": create_devops_tools, - "seer": create_data_tools, - "forge": create_code_tools, - "quill": create_writing_tools, - "lab": create_experiment_tools, - "pixel": lambda base_dir=None: _create_stub_toolkit("pixel"), - "lyra": lambda base_dir=None: _create_stub_toolkit("lyra"), - "reel": lambda base_dir=None: _create_stub_toolkit("reel"), -} +# --------------------------------------------------------------------------- +# Agent toolkit registry +# --------------------------------------------------------------------------- def _create_stub_toolkit(name: str): @@ -836,7 +348,22 @@ def _create_stub_toolkit(name: str): return toolkit -def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None: +# Mapping of agent IDs to their toolkits +AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = { + "echo": create_research_tools, + "mace": create_security_tools, + "helm": create_devops_tools, + "seer": create_data_tools, + "forge": create_code_tools, + "quill": create_writing_tools, + "lab": create_experiment_tools, + "pixel": lambda base_dir=None: _create_stub_toolkit("pixel"), + "lyra": lambda base_dir=None: _create_stub_toolkit("lyra"), + "reel": lambda base_dir=None: _create_stub_toolkit("reel"), +} + + +def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "Toolkit | None": """Get the appropriate toolkit for an agent. Args: @@ -852,11 +379,16 @@ def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> To return None -# Backward-compat alias +# Backward-compat aliases get_tools_for_persona = get_tools_for_agent PERSONA_TOOLKITS = AGENT_TOOLKITS +# --------------------------------------------------------------------------- +# Tool catalog +# --------------------------------------------------------------------------- + + def _core_tool_catalog() -> dict: """Return core file and execution tools catalog entries.""" return { diff --git a/src/timmy/tools/file_tools.py b/src/timmy/tools/file_tools.py new file mode 100644 index 0000000..30a85f7 --- /dev/null +++ b/src/timmy/tools/file_tools.py @@ -0,0 +1,121 @@ +"""File operation tools and agent toolkit factories for file-heavy agents. + +Provides: +- Smart read_file wrapper (auto-lists directories) +- Toolkit factories for Echo (research), Quill (writing), Seer (data) +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from pathlib import Path + +from timmy.tools._base import ( + _AGNO_TOOLS_AVAILABLE, + _ImportError, + FileTools, + PythonTools, + Toolkit, +) + +logger = logging.getLogger(__name__) + + +def _make_smart_read_file(file_tools: "FileTools") -> Callable: + """Wrap FileTools.read_file so directories auto-list their contents. + + When the user (or the LLM) passes a directory path to read_file, + the raw Agno implementation throws an IsADirectoryError. This + wrapper detects that case, lists the directory entries, and returns + a helpful message so the model can pick the right file on its own. + """ + original_read = file_tools.read_file + + def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str: + """Reads the contents of the file `file_name` and returns the contents if successful.""" + # LLMs often call read_file(path=...) instead of read_file(file_name=...) + if not file_name: + file_name = kwargs.get("path", "") + if not file_name: + return "Error: no file_name or path provided." + # Resolve the path the same way FileTools does + _safe, resolved = file_tools.check_escape(file_name) + if _safe and resolved.is_dir(): + entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith(".")) + listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)" + return ( + f"'{file_name}' is a directory, not a file. " + f"Files inside:\n{listing}\n\n" + "Please call read_file with one of the files listed above." + ) + return original_read(file_name, encoding=encoding) + + # Preserve the original docstring for Agno tool schema generation + smart_read_file.__doc__ = original_read.__doc__ + return smart_read_file + + +def create_research_tools(base_dir: str | Path | None = None): + """Create tools for the research agent (Echo). + + Includes: file reading + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="research") + + # File reading + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.list_files, name="list_files") + + return toolkit + + +def create_writing_tools(base_dir: str | Path | None = None): + """Create tools for the writing agent (Quill). + + Includes: file read/write + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="writing") + + # File operations + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.save_file, name="write_file") + toolkit.register(file_tools.list_files, name="list_files") + + return toolkit + + +def create_data_tools(base_dir: str | Path | None = None): + """Create tools for the data agent (Seer). + + Includes: python execution, file reading, web search for data sources + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="data") + + # Python execution for analysis + python_tools = PythonTools() + toolkit.register(python_tools.run_python_code, name="python") + + # File reading + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.list_files, name="list_files") + + return toolkit diff --git a/src/timmy/tools/system_tools.py b/src/timmy/tools/system_tools.py new file mode 100644 index 0000000..2b161b4 --- /dev/null +++ b/src/timmy/tools/system_tools.py @@ -0,0 +1,357 @@ +"""System, calculation, and AI consultation tools for Timmy agents. + +Provides: +- Safe AST-based calculator +- consult_grok (xAI frontier reasoning) +- web_fetch (content extraction) +- Toolkit factories for Forge (code), Mace (security), Helm (devops) +""" + +from __future__ import annotations + +import ast +import logging +import math +import subprocess +from pathlib import Path + +from timmy.tools._base import ( + _AGNO_TOOLS_AVAILABLE, + _ImportError, + FileTools, + PythonTools, + ShellTools, + Toolkit, +) +from timmy.tools.file_tools import _make_smart_read_file + +logger = logging.getLogger(__name__) + +# Max characters of user query included in Lightning invoice memo +_INVOICE_MEMO_MAX_LEN = 50 + + +def _safe_eval(node, allowed_names: dict): + """Walk an AST and evaluate only safe numeric operations.""" + if isinstance(node, ast.Expression): + return _safe_eval(node.body, allowed_names) + if isinstance(node, ast.Constant): + if isinstance(node.value, (int, float, complex)): + return node.value + raise ValueError(f"Unsupported constant: {node.value!r}") + if isinstance(node, ast.UnaryOp): + operand = _safe_eval(node.operand, allowed_names) + if isinstance(node.op, ast.UAdd): + return +operand + if isinstance(node.op, ast.USub): + return -operand + raise ValueError(f"Unsupported unary op: {type(node.op).__name__}") + if isinstance(node, ast.BinOp): + left = _safe_eval(node.left, allowed_names) + right = _safe_eval(node.right, allowed_names) + ops = { + ast.Add: lambda a, b: a + b, + ast.Sub: lambda a, b: a - b, + ast.Mult: lambda a, b: a * b, + ast.Div: lambda a, b: a / b, + ast.FloorDiv: lambda a, b: a // b, + ast.Mod: lambda a, b: a % b, + ast.Pow: lambda a, b: a**b, + } + op_fn = ops.get(type(node.op)) + if op_fn is None: + raise ValueError(f"Unsupported binary op: {type(node.op).__name__}") + return op_fn(left, right) + if isinstance(node, ast.Name): + if node.id in allowed_names: + return allowed_names[node.id] + raise ValueError(f"Unknown name: {node.id!r}") + if isinstance(node, ast.Attribute): + value = _safe_eval(node.value, allowed_names) + # Only allow attribute access on the math module + if value is math: + attr = getattr(math, node.attr, None) + if attr is not None: + return attr + raise ValueError(f"Attribute access not allowed: .{node.attr}") + if isinstance(node, ast.Call): + func = _safe_eval(node.func, allowed_names) + if not callable(func): + raise ValueError(f"Not callable: {func!r}") + args = [_safe_eval(a, allowed_names) for a in node.args] + kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords} + return func(*args, **kwargs) + raise ValueError(f"Unsupported syntax: {type(node).__name__}") + + +def calculator(expression: str) -> str: + """Evaluate a mathematical expression and return the exact result. + + Use this tool for ANY arithmetic: multiplication, division, square roots, + exponents, percentages, logarithms, trigonometry, etc. + + Args: + expression: A valid Python math expression, e.g. '347 * 829', + 'math.sqrt(17161)', '2**10', 'math.log(100, 10)'. + + Returns: + The exact result as a string. + """ + allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} + allowed_names["math"] = math + allowed_names["abs"] = abs + allowed_names["round"] = round + allowed_names["min"] = min + allowed_names["max"] = max + try: + tree = ast.parse(expression, mode="eval") + result = _safe_eval(tree, allowed_names) + return str(result) + except Exception as e: # broad catch intentional: arbitrary code execution + return f"Error evaluating '{expression}': {e}" + + +def consult_grok(query: str) -> str: + """Consult Grok (xAI) for frontier reasoning on complex questions. + + Use this tool when a question requires advanced reasoning, real-time + knowledge, or capabilities beyond the local model. Grok is a premium + cloud backend — use sparingly and only for high-complexity queries. + + Args: + query: The question or reasoning task to send to Grok. + + Returns: + Grok's response text, or an error/status message. + """ + from config import settings + from timmy.backends import get_grok_backend, grok_available + + if not grok_available(): + return ( + "Grok is not available. Enable with GROK_ENABLED=true " + "and set XAI_API_KEY in your .env file." + ) + + backend = get_grok_backend() + + # Log to Spark if available + try: + from spark.engine import spark_engine + + spark_engine.on_tool_executed( + agent_id="default", + tool_name="consult_grok", + success=True, + ) + except (ImportError, AttributeError) as exc: + logger.warning("Tool execution failed (consult_grok logging): %s", exc) + + # Generate Lightning invoice for monetization (unless free mode) + invoice_info = "" + if not settings.grok_free: + try: + from lightning.factory import get_backend as get_ln_backend + + ln = get_ln_backend() + sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap) + inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}") + invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]" + except (ImportError, OSError, ValueError) as exc: + logger.error("Lightning invoice creation failed: %s", exc) + return "Error: Failed to create Lightning invoice. Please check logs." + + result = backend.run(query) + + response = result.content + if invoice_info: + response += invoice_info + + return response + + +def web_fetch(url: str, max_tokens: int = 4000) -> str: + """Fetch a web page and return its main text content. + + Downloads the URL, extracts readable text using trafilatura, and + truncates to a token budget. Use this to read full articles, docs, + or blog posts that web_search only returns snippets for. + + Args: + url: The URL to fetch (must start with http:// or https://). + max_tokens: Maximum approximate token budget (default 4000). + Text is truncated to max_tokens * 4 characters. + + Returns: + Extracted text content, or an error message on failure. + """ + if not url or not url.startswith(("http://", "https://")): + return f"Error: invalid URL — must start with http:// or https://: {url!r}" + + try: + import requests as _requests + except ImportError: + return "Error: 'requests' package is not installed. Install with: pip install requests" + + try: + import trafilatura + except ImportError: + return ( + "Error: 'trafilatura' package is not installed. Install with: pip install trafilatura" + ) + + try: + resp = _requests.get( + url, + timeout=15, + headers={"User-Agent": "TimmyResearchBot/1.0"}, + ) + resp.raise_for_status() + except _requests.exceptions.Timeout: + return f"Error: request timed out after 15 seconds for {url}" + except _requests.exceptions.HTTPError as exc: + return f"Error: HTTP {exc.response.status_code} for {url}" + except _requests.exceptions.RequestException as exc: + return f"Error: failed to fetch {url} — {exc}" + + text = trafilatura.extract(resp.text, include_tables=True, include_links=True) + if not text: + return f"Error: could not extract readable content from {url}" + + char_budget = max_tokens * 4 + if len(text) > char_budget: + text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]" + + return text + + +def create_aider_tool(base_path: Path): + """Create an Aider tool for AI-assisted coding.""" + + class AiderTool: + """Tool that calls Aider (local AI coding assistant) for code generation.""" + + def __init__(self, base_dir: Path): + self.base_dir = base_dir + + def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str: + """Run Aider to generate code changes. + + Args: + prompt: What you want Aider to do (e.g., "add a fibonacci function") + model: Ollama model to use (default: qwen3:30b) + + Returns: + Aider's response with the code changes made + """ + try: + # Run aider with the prompt + result = subprocess.run( + [ + "aider", + "--no-git", + "--model", + f"ollama/{model}", + "--quiet", + prompt, + ], + capture_output=True, + text=True, + timeout=120, + cwd=str(self.base_dir), + ) + + if result.returncode == 0: + return result.stdout if result.stdout else "Code changes applied successfully" + else: + return f"Aider error: {result.stderr}" + except FileNotFoundError: + return "Error: Aider not installed. Run: pip install aider" + except subprocess.TimeoutExpired: + return "Error: Aider timed out after 120 seconds" + except (OSError, subprocess.SubprocessError) as e: + return f"Error running Aider: {str(e)}" + + return AiderTool(base_path) + + +def create_code_tools(base_dir: str | Path | None = None): + """Create tools for the code agent (Forge). + + Includes: shell commands, python execution, file read/write, Aider AI assist + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="code") + + # Shell commands (sandboxed) + shell_tools = ShellTools() + toolkit.register(shell_tools.run_shell_command, name="shell") + + # Python execution + python_tools = PythonTools() + toolkit.register(python_tools.run_python_code, name="python") + + # File operations + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.save_file, name="write_file") + toolkit.register(file_tools.list_files, name="list_files") + + # Aider AI coding assistant (local with Ollama) + aider_tool = create_aider_tool(base_path) + toolkit.register(aider_tool.run_aider, name="aider") + + return toolkit + + +def create_security_tools(base_dir: str | Path | None = None): + """Create tools for the security agent (Mace). + + Includes: shell commands (for scanning), file read + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="security") + + # Shell for running security scans + shell_tools = ShellTools() + toolkit.register(shell_tools.run_shell_command, name="shell") + + # File reading for logs/configs + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.list_files, name="list_files") + + return toolkit + + +def create_devops_tools(base_dir: str | Path | None = None): + """Create tools for the DevOps agent (Helm). + + Includes: shell commands, file read/write + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="devops") + + # Shell for deployment commands + shell_tools = ShellTools() + toolkit.register(shell_tools.run_shell_command, name="shell") + + # File operations for config management + from config import settings + + base_path = Path(base_dir) if base_dir else Path(settings.repo_root) + file_tools = FileTools(base_dir=base_path) + toolkit.register(_make_smart_read_file(file_tools), name="read_file") + toolkit.register(file_tools.save_file, name="write_file") + toolkit.register(file_tools.list_files, name="list_files") + + return toolkit