122 lines
4.2 KiB
Python
122 lines
4.2 KiB
Python
"""File operation tools and agent toolkit factories for file-heavy agents.
|
|
|
|
Provides:
|
|
- Smart read_file wrapper (auto-lists directories)
|
|
- Toolkit factories for Echo (research), Quill (writing), Seer (data)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections.abc import Callable
|
|
from pathlib import Path
|
|
|
|
from timmy.tools._base import (
|
|
_AGNO_TOOLS_AVAILABLE,
|
|
_ImportError,
|
|
FileTools,
|
|
PythonTools,
|
|
Toolkit,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _make_smart_read_file(file_tools: "FileTools") -> Callable:
|
|
"""Wrap FileTools.read_file so directories auto-list their contents.
|
|
|
|
When the user (or the LLM) passes a directory path to read_file,
|
|
the raw Agno implementation throws an IsADirectoryError. This
|
|
wrapper detects that case, lists the directory entries, and returns
|
|
a helpful message so the model can pick the right file on its own.
|
|
"""
|
|
original_read = file_tools.read_file
|
|
|
|
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
|
|
"""Reads the contents of the file `file_name` and returns the contents if successful."""
|
|
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
|
|
if not file_name:
|
|
file_name = kwargs.get("path", "")
|
|
if not file_name:
|
|
return "Error: no file_name or path provided."
|
|
# Resolve the path the same way FileTools does
|
|
_safe, resolved = file_tools.check_escape(file_name)
|
|
if _safe and resolved.is_dir():
|
|
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
|
|
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
|
|
return (
|
|
f"'{file_name}' is a directory, not a file. "
|
|
f"Files inside:\n{listing}\n\n"
|
|
"Please call read_file with one of the files listed above."
|
|
)
|
|
return original_read(file_name, encoding=encoding)
|
|
|
|
# Preserve the original docstring for Agno tool schema generation
|
|
smart_read_file.__doc__ = original_read.__doc__
|
|
return smart_read_file
|
|
|
|
|
|
def create_research_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the research agent (Echo).
|
|
|
|
Includes: file reading
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="research")
|
|
|
|
# File reading
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_writing_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the writing agent (Quill).
|
|
|
|
Includes: file read/write
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="writing")
|
|
|
|
# File operations
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.save_file, name="write_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|
|
|
|
|
|
def create_data_tools(base_dir: str | Path | None = None):
|
|
"""Create tools for the data agent (Seer).
|
|
|
|
Includes: python execution, file reading, web search for data sources
|
|
"""
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
raise ImportError(f"Agno tools not available: {_ImportError}")
|
|
toolkit = Toolkit(name="data")
|
|
|
|
# Python execution for analysis
|
|
python_tools = PythonTools()
|
|
toolkit.register(python_tools.run_python_code, name="python")
|
|
|
|
# File reading
|
|
from config import settings
|
|
|
|
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
|
file_tools = FileTools(base_dir=base_path)
|
|
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
|
|
toolkit.register(file_tools.list_files, name="list_files")
|
|
|
|
return toolkit
|