#!/usr/bin/env python3 """File Tools Module - LLM agent file manipulation tools.""" import errno import json import logging import os import threading from pathlib import Path from tools.file_operations import ShellFileOperations from agent.redact import redact_sensitive_text logger = logging.getLogger(__name__) _EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS} # --------------------------------------------------------------------------- # Read-size guard: cap the character count returned to the model. # We're model-agnostic so we can't count tokens; characters are a safe proxy. # 100K chars ≈ 25–35K tokens across typical tokenisers. Files larger than # this in a single read are a context-window hazard — the model should use # offset+limit to read the relevant section. # # Configurable via config.yaml: file_read_max_chars: 200000 # --------------------------------------------------------------------------- _DEFAULT_MAX_READ_CHARS = 100_000 _max_read_chars_cached: int | None = None def _get_max_read_chars() -> int: """Return the configured max characters per file read. Reads ``file_read_max_chars`` from config.yaml on first call, caches the result for the lifetime of the process. Falls back to the built-in default if the config is missing or invalid. """ global _max_read_chars_cached if _max_read_chars_cached is not None: return _max_read_chars_cached try: from hermes_cli.config import load_config cfg = load_config() val = cfg.get("file_read_max_chars") if isinstance(val, (int, float)) and val > 0: _max_read_chars_cached = int(val) return _max_read_chars_cached except Exception: pass _max_read_chars_cached = _DEFAULT_MAX_READ_CHARS return _max_read_chars_cached # If the total file size exceeds this AND the caller didn't specify a narrow # range (limit <= 200), we include a hint encouraging targeted reads. _LARGE_FILE_HINT_BYTES = 512_000 # 512 KB # --------------------------------------------------------------------------- # Device path blocklist — reading these hangs the process (infinite output # or blocking on input). Checked by path only (no I/O). # --------------------------------------------------------------------------- _BLOCKED_DEVICE_PATHS = frozenset({ # Infinite output — never reach EOF "/dev/zero", "/dev/random", "/dev/urandom", "/dev/full", # Blocks waiting for input "/dev/stdin", "/dev/tty", "/dev/console", # Nonsensical to read "/dev/stdout", "/dev/stderr", # fd aliases "/dev/fd/0", "/dev/fd/1", "/dev/fd/2", }) def _is_blocked_device(filepath: str) -> bool: """Return True if the path would hang the process (infinite output or blocking input). Uses the *literal* path — no symlink resolution — because the model specifies paths directly and realpath follows symlinks all the way through (e.g. /dev/stdin → /proc/self/fd/0 → /dev/pts/0), defeating the check. """ normalized = os.path.expanduser(filepath) if normalized in _BLOCKED_DEVICE_PATHS: return True # /proc/self/fd/0-2 and /proc//fd/0-2 are Linux aliases for stdio if normalized.startswith("/proc/") and normalized.endswith( ("/fd/0", "/fd/1", "/fd/2") ): return True return False # Paths that file tools should refuse to write to without going through the # terminal tool's approval system. These match prefixes after os.path.realpath. _SENSITIVE_PATH_PREFIXES = ("/etc/", "/boot/", "/usr/lib/systemd/") _SENSITIVE_EXACT_PATHS = {"/var/run/docker.sock", "/run/docker.sock"} def _check_sensitive_path(filepath: str) -> str | None: """Return an error message if the path targets a sensitive system location.""" try: resolved = os.path.realpath(os.path.expanduser(filepath)) except (OSError, ValueError): resolved = filepath for prefix in _SENSITIVE_PATH_PREFIXES: if resolved.startswith(prefix): return ( f"Refusing to write to sensitive system path: {filepath}\n" "Use the terminal tool with sudo if you need to modify system files." ) if resolved in _SENSITIVE_EXACT_PATHS: return ( f"Refusing to write to sensitive system path: {filepath}\n" "Use the terminal tool with sudo if you need to modify system files." ) return None def _is_expected_write_exception(exc: Exception) -> bool: """Return True for expected write denials that should not hit error logs.""" if isinstance(exc, PermissionError): return True if isinstance(exc, OSError) and exc.errno in _EXPECTED_WRITE_ERRNOS: return True return False _file_ops_lock = threading.Lock() _file_ops_cache: dict = {} # Track files read per task to detect re-read loops and deduplicate reads. # Per task_id we store: # "last_key": the key of the most recent read/search call (or None) # "consecutive": how many times that exact call has been repeated in a row # "read_history": set of (path, offset, limit) tuples for get_read_files_summary # "dedup": dict mapping (resolved_path, offset, limit) → mtime float # Used to skip re-reads of unchanged files. Reset on # context compression (the original content is summarised # away so the model needs the full content again). # "read_timestamps": dict mapping resolved_path → modification-time float # recorded when the file was last read (or written) by # this task. Used by write_file and patch to detect # external changes between the agent's read and write. # Updated after successful writes so consecutive edits # by the same task don't trigger false warnings. _read_tracker_lock = threading.Lock() _read_tracker: dict = {} def _get_file_ops(task_id: str = "default") -> ShellFileOperations: """Get or create ShellFileOperations for a terminal environment. Respects the TERMINAL_ENV setting -- if the task_id doesn't have an environment yet, creates one using the configured backend (local, docker, modal, etc.) rather than always defaulting to local. Thread-safe: uses the same per-task creation locks as terminal_tool to prevent duplicate sandbox creation from concurrent tool calls. """ from tools.terminal_tool import ( _active_environments, _env_lock, _create_environment, _get_env_config, _last_activity, _start_cleanup_thread, _creation_locks, _creation_locks_lock, ) import time # Fast path: check cache -- but also verify the underlying environment # is still alive (it may have been killed by the cleanup thread). with _file_ops_lock: cached = _file_ops_cache.get(task_id) if cached is not None: with _env_lock: if task_id in _active_environments: _last_activity[task_id] = time.time() return cached else: # Environment was cleaned up -- invalidate stale cache entry with _file_ops_lock: _file_ops_cache.pop(task_id, None) # Need to ensure the environment exists before building file_ops. # Acquire per-task lock so only one thread creates the sandbox. with _creation_locks_lock: if task_id not in _creation_locks: _creation_locks[task_id] = threading.Lock() task_lock = _creation_locks[task_id] with task_lock: # Double-check: another thread may have created it while we waited with _env_lock: if task_id in _active_environments: _last_activity[task_id] = time.time() terminal_env = _active_environments[task_id] else: terminal_env = None if terminal_env is None: from tools.terminal_tool import _task_env_overrides config = _get_env_config() env_type = config["env_type"] overrides = _task_env_overrides.get(task_id, {}) if env_type == "docker": image = overrides.get("docker_image") or config["docker_image"] elif env_type == "singularity": image = overrides.get("singularity_image") or config["singularity_image"] elif env_type == "modal": image = overrides.get("modal_image") or config["modal_image"] elif env_type == "daytona": image = overrides.get("daytona_image") or config["daytona_image"] else: image = "" cwd = overrides.get("cwd") or config["cwd"] logger.info("Creating new %s environment for task %s...", env_type, task_id[:8]) container_config = None if env_type in ("docker", "singularity", "modal", "daytona"): container_config = { "container_cpu": config.get("container_cpu", 1), "container_memory": config.get("container_memory", 5120), "container_disk": config.get("container_disk", 51200), "container_persistent": config.get("container_persistent", True), "docker_volumes": config.get("docker_volumes", []), } ssh_config = None if env_type == "ssh": ssh_config = { "host": config.get("ssh_host", ""), "user": config.get("ssh_user", ""), "port": config.get("ssh_port", 22), "key": config.get("ssh_key", ""), "persistent": config.get("ssh_persistent", False), } local_config = None if env_type == "local": local_config = { "persistent": config.get("local_persistent", False), } terminal_env = _create_environment( env_type=env_type, image=image, cwd=cwd, timeout=config["timeout"], ssh_config=ssh_config, container_config=container_config, local_config=local_config, task_id=task_id, host_cwd=config.get("host_cwd"), ) with _env_lock: _active_environments[task_id] = terminal_env _last_activity[task_id] = time.time() _start_cleanup_thread() logger.info("%s environment ready for task %s", env_type, task_id[:8]) # Build file_ops from the (guaranteed live) environment and cache it file_ops = ShellFileOperations(terminal_env) with _file_ops_lock: _file_ops_cache[task_id] = file_ops return file_ops def clear_file_ops_cache(task_id: str = None): """Clear the file operations cache.""" with _file_ops_lock: if task_id: _file_ops_cache.pop(task_id, None) else: _file_ops_cache.clear() def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str: """Read a file with pagination and line numbers.""" try: # ── Device path guard ───────────────────────────────────────── # Block paths that would hang the process (infinite output, # blocking on input). Pure path check — no I/O. if _is_blocked_device(path): return json.dumps({ "error": ( f"Cannot read '{path}': this is a device file that would " "block or produce infinite output." ), }) # ── Hermes internal path guard ──────────────────────────────── # Prevent prompt injection via catalog or hub metadata files. import pathlib as _pathlib from hermes_constants import get_hermes_home as _get_hh _resolved = _pathlib.Path(path).expanduser().resolve() _hermes_home = _get_hh().resolve() _blocked_dirs = [ _hermes_home / "skills" / ".hub" / "index-cache", _hermes_home / "skills" / ".hub", ] for _blocked in _blocked_dirs: try: _resolved.relative_to(_blocked) return json.dumps({ "error": ( f"Access denied: {path} is an internal Hermes cache file " "and cannot be read directly to prevent prompt injection. " "Use the skills_list or skill_view tools instead." ) }) except ValueError: pass # ── Dedup check ─────────────────────────────────────────────── # If we already read this exact (path, offset, limit) and the # file hasn't been modified since, return a lightweight stub # instead of re-sending the same content. Saves context tokens. resolved_str = str(_resolved) dedup_key = (resolved_str, offset, limit) with _read_tracker_lock: task_data = _read_tracker.setdefault(task_id, { "last_key": None, "consecutive": 0, "read_history": set(), "dedup": {}, }) cached_mtime = task_data.get("dedup", {}).get(dedup_key) if cached_mtime is not None: try: current_mtime = os.path.getmtime(resolved_str) if current_mtime == cached_mtime: return json.dumps({ "content": ( "File unchanged since last read. The content from " "the earlier read_file result in this conversation is " "still current — refer to that instead of re-reading." ), "path": path, "dedup": True, }, ensure_ascii=False) except OSError: pass # stat failed — fall through to full read # ── Perform the read ────────────────────────────────────────── file_ops = _get_file_ops(task_id) result = file_ops.read_file(path, offset, limit) if result.content: result.content = redact_sensitive_text(result.content) result_dict = result.to_dict() # ── Character-count guard ───────────────────────────────────── # We're model-agnostic so we can't count tokens; characters are # the best proxy we have. If the read produced an unreasonable # amount of content, reject it and tell the model to narrow down. # Note: we check the formatted content (with line-number prefixes), # not the raw file size, because that's what actually enters context. content_len = len(result.content or "") file_size = result_dict.get("file_size", 0) max_chars = _get_max_read_chars() if content_len > max_chars: total_lines = result_dict.get("total_lines", "unknown") return json.dumps({ "error": ( f"Read produced {content_len:,} characters which exceeds " f"the safety limit ({max_chars:,} chars). " "Use offset and limit to read a smaller range. " f"The file has {total_lines} lines total." ), "path": path, "total_lines": total_lines, "file_size": file_size, }, ensure_ascii=False) # Large-file hint: if the file is big and the caller didn't ask # for a narrow window, nudge toward targeted reads. if (file_size and file_size > _LARGE_FILE_HINT_BYTES and limit > 200 and result_dict.get("truncated")): result_dict.setdefault("_hint", ( f"This file is large ({file_size:,} bytes). " "Consider reading only the section you need with offset and limit " "to keep context usage efficient." )) # ── Track for consecutive-loop detection ────────────────────── read_key = ("read", path, offset, limit) with _read_tracker_lock: # Ensure "dedup" key exists (backward compat with old tracker state) if "dedup" not in task_data: task_data["dedup"] = {} task_data["read_history"].add((path, offset, limit)) if task_data["last_key"] == read_key: task_data["consecutive"] += 1 else: task_data["last_key"] = read_key task_data["consecutive"] = 1 count = task_data["consecutive"] # Store mtime at read time for two purposes: # 1. Dedup: skip identical re-reads of unchanged files. # 2. Staleness: warn on write/patch if the file changed since # the agent last read it (external edit, concurrent agent, etc.). try: _mtime_now = os.path.getmtime(resolved_str) task_data["dedup"][dedup_key] = _mtime_now task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now except OSError: pass # Can't stat — skip tracking for this entry if count >= 4: # Hard block: stop returning content to break the loop return json.dumps({ "error": ( f"BLOCKED: You have read this exact file region {count} times in a row. " "The content has NOT changed. You already have this information. " "STOP re-reading and proceed with your task." ), "path": path, "already_read": count, }, ensure_ascii=False) elif count >= 3: result_dict["_warning"] = ( f"You have read this exact file region {count} times consecutively. " "The content has not changed since your last read. Use the information you already have. " "If you are stuck in a loop, stop reading and proceed with writing or responding." ) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) def get_read_files_summary(task_id: str = "default") -> list: """Return a list of files read in this session for the given task. Used by context compression to preserve file-read history across compression boundaries. """ with _read_tracker_lock: task_data = _read_tracker.get(task_id, {}) read_history = task_data.get("read_history", set()) seen_paths: dict = {} for (path, offset, limit) in read_history: if path not in seen_paths: seen_paths[path] = [] seen_paths[path].append(f"lines {offset}-{offset + limit - 1}") return [ {"path": p, "regions": regions} for p, regions in sorted(seen_paths.items()) ] def clear_read_tracker(task_id: str = None): """Clear the read tracker. Call with a task_id to clear just that task, or without to clear all. Should be called when a session is destroyed to prevent memory leaks in long-running gateway processes. """ with _read_tracker_lock: if task_id: _read_tracker.pop(task_id, None) else: _read_tracker.clear() def reset_file_dedup(task_id: str = None): """Clear the deduplication cache for file reads. Called after context compression — the original read content has been summarised away, so the model needs the full content if it reads the same file again. Without this, reads after compression would return a "file unchanged" stub pointing at content that no longer exists in context. Call with a task_id to clear just that task, or without to clear all. """ with _read_tracker_lock: if task_id: task_data = _read_tracker.get(task_id) if task_data and "dedup" in task_data: task_data["dedup"].clear() else: for task_data in _read_tracker.values(): if "dedup" in task_data: task_data["dedup"].clear() def notify_other_tool_call(task_id: str = "default"): """Reset consecutive read/search counter for a task. Called by the tool dispatcher (model_tools.py) whenever a tool OTHER than read_file / search_files is executed. This ensures we only warn or block on *truly consecutive* repeated reads — if the agent does anything else in between (write, patch, terminal, etc.) the counter resets and the next read is treated as fresh. """ with _read_tracker_lock: task_data = _read_tracker.get(task_id) if task_data: task_data["last_key"] = None task_data["consecutive"] = 0 def _update_read_timestamp(filepath: str, task_id: str) -> None: """Record the file's current modification time after a successful write. Called after write_file and patch so that consecutive edits by the same task don't trigger false staleness warnings — each write refreshes the stored timestamp to match the file's new state. """ try: resolved = str(Path(filepath).expanduser().resolve()) current_mtime = os.path.getmtime(resolved) except (OSError, ValueError): return with _read_tracker_lock: task_data = _read_tracker.get(task_id) if task_data is not None: task_data.setdefault("read_timestamps", {})[resolved] = current_mtime def _check_file_staleness(filepath: str, task_id: str) -> str | None: """Check whether a file was modified since the agent last read it. Returns a warning string if the file is stale (mtime changed since the last read_file call for this task), or None if the file is fresh or was never read. Does not block — the write still proceeds. """ try: resolved = str(Path(filepath).expanduser().resolve()) except (OSError, ValueError): return None with _read_tracker_lock: task_data = _read_tracker.get(task_id) if not task_data: return None read_mtime = task_data.get("read_timestamps", {}).get(resolved) if read_mtime is None: return None # File was never read — nothing to compare against try: current_mtime = os.path.getmtime(resolved) except OSError: return None # Can't stat — file may have been deleted, let write handle it if current_mtime != read_mtime: return ( f"Warning: {filepath} was modified since you last read it " "(external edit or concurrent agent). The content you read may be " "stale. Consider re-reading the file to verify before writing." ) return None def write_file_tool(path: str, content: str, task_id: str = "default") -> str: """Write content to a file.""" sensitive_err = _check_sensitive_path(path) if sensitive_err: return json.dumps({"error": sensitive_err}, ensure_ascii=False) try: stale_warning = _check_file_staleness(path, task_id) file_ops = _get_file_ops(task_id) result = file_ops.write_file(path, content) result_dict = result.to_dict() if stale_warning: result_dict["_warning"] = stale_warning # Refresh the stored timestamp so consecutive writes by this # task don't trigger false staleness warnings. _update_read_timestamp(path, task_id) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: if _is_expected_write_exception(e): logger.debug("write_file expected denial: %s: %s", type(e).__name__, e) else: logger.error("write_file error: %s: %s", type(e).__name__, e, exc_info=True) return json.dumps({"error": str(e)}, ensure_ascii=False) def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, new_string: str = None, replace_all: bool = False, patch: str = None, task_id: str = "default") -> str: """Patch a file using replace mode or V4A patch format.""" # Check sensitive paths for both replace (explicit path) and V4A patch (extract paths) _paths_to_check = [] if path: _paths_to_check.append(path) if mode == "patch" and patch: import re as _re for _m in _re.finditer(r'^\*\*\*\s+(?:Update|Add|Delete)\s+File:\s*(.+)$', patch, _re.MULTILINE): _paths_to_check.append(_m.group(1).strip()) for _p in _paths_to_check: sensitive_err = _check_sensitive_path(_p) if sensitive_err: return json.dumps({"error": sensitive_err}, ensure_ascii=False) try: # Check staleness for all files this patch will touch. stale_warnings = [] for _p in _paths_to_check: _sw = _check_file_staleness(_p, task_id) if _sw: stale_warnings.append(_sw) file_ops = _get_file_ops(task_id) if mode == "replace": if not path: return json.dumps({"error": "path required"}) if old_string is None or new_string is None: return json.dumps({"error": "old_string and new_string required"}) result = file_ops.patch_replace(path, old_string, new_string, replace_all) elif mode == "patch": if not patch: return json.dumps({"error": "patch content required"}) result = file_ops.patch_v4a(patch) else: return json.dumps({"error": f"Unknown mode: {mode}"}) result_dict = result.to_dict() if stale_warnings: result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings) # Refresh stored timestamps for all successfully-patched paths so # consecutive edits by this task don't trigger false warnings. if not result_dict.get("error"): for _p in _paths_to_check: _update_read_timestamp(_p, task_id) result_json = json.dumps(result_dict, ensure_ascii=False) # Hint when old_string not found — saves iterations where the agent # retries with stale content instead of re-reading the file. if result_dict.get("error") and "Could not find" in str(result_dict["error"]): result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]" return result_json except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) def search_tool(pattern: str, target: str = "content", path: str = ".", file_glob: str = None, limit: int = 50, offset: int = 0, output_mode: str = "content", context: int = 0, task_id: str = "default") -> str: """Search for content or files.""" try: # Track searches to detect *consecutive* repeated search loops. # Include pagination args so users can page through truncated # results without tripping the repeated-search guard. search_key = ( "search", pattern, target, str(path), file_glob or "", limit, offset, ) with _read_tracker_lock: task_data = _read_tracker.setdefault(task_id, { "last_key": None, "consecutive": 0, "read_history": set(), }) if task_data["last_key"] == search_key: task_data["consecutive"] += 1 else: task_data["last_key"] = search_key task_data["consecutive"] = 1 count = task_data["consecutive"] if count >= 4: return json.dumps({ "error": ( f"BLOCKED: You have run this exact search {count} times in a row. " "The results have NOT changed. You already have this information. " "STOP re-searching and proceed with your task." ), "pattern": pattern, "already_searched": count, }, ensure_ascii=False) file_ops = _get_file_ops(task_id) result = file_ops.search( pattern=pattern, path=path, target=target, file_glob=file_glob, limit=limit, offset=offset, output_mode=output_mode, context=context ) if hasattr(result, 'matches'): for m in result.matches: if hasattr(m, 'content') and m.content: m.content = redact_sensitive_text(m.content) result_dict = result.to_dict() if count >= 3: result_dict["_warning"] = ( f"You have run this exact search {count} times consecutively. " "The results have not changed. Use the information you already have." ) result_json = json.dumps(result_dict, ensure_ascii=False) # Hint when results were truncated — explicit next offset is clearer # than relying on the model to infer it from total_count vs match count. if result_dict.get("truncated"): next_offset = offset + limit result_json += f"\n\n[Hint: Results truncated. Use offset={next_offset} to see more, or narrow with a more specific pattern or file_glob.]" return result_json except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) FILE_TOOLS = [ {"name": "read_file", "function": read_file_tool}, {"name": "write_file", "function": write_file_tool}, {"name": "patch", "function": patch_tool}, {"name": "search_files", "function": search_tool} ] def get_file_tools(): """Get the list of file tool definitions.""" return FILE_TOOLS # --------------------------------------------------------------------------- # Schemas + Registry # --------------------------------------------------------------------------- from tools.registry import registry def _check_file_reqs(): """Lazy wrapper to avoid circular import with tools/__init__.py.""" from tools import check_file_requirements return check_file_requirements() READ_FILE_SCHEMA = { "name": "read_file", "description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "Path to the file to read (absolute, relative, or ~/path)"}, "offset": {"type": "integer", "description": "Line number to start reading from (1-indexed, default: 1)", "default": 1, "minimum": 1}, "limit": {"type": "integer", "description": "Maximum number of lines to read (default: 500, max: 2000)", "default": 500, "maximum": 2000} }, "required": ["path"] } } WRITE_FILE_SCHEMA = { "name": "write_file", "description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits.", "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "Path to the file to write (will be created if it doesn't exist, overwritten if it does)"}, "content": {"type": "string", "description": "Complete content to write to the file"} }, "required": ["path", "content"] } } PATCH_SCHEMA = { "name": "patch", "description": "Targeted find-and-replace edits in files. Use this instead of sed/awk in terminal. Uses fuzzy matching (9 strategies) so minor whitespace/indentation differences won't break it. Returns a unified diff. Auto-runs syntax checks after editing.\n\nReplace mode (default): find a unique string and replace it.\nPatch mode: apply V4A multi-file patches for bulk changes.", "parameters": { "type": "object", "properties": { "mode": {"type": "string", "enum": ["replace", "patch"], "description": "Edit mode: 'replace' for targeted find-and-replace, 'patch' for V4A multi-file patches", "default": "replace"}, "path": {"type": "string", "description": "File path to edit (required for 'replace' mode)"}, "old_string": {"type": "string", "description": "Text to find in the file (required for 'replace' mode). Must be unique in the file unless replace_all=true. Include enough surrounding context to ensure uniqueness."}, "new_string": {"type": "string", "description": "Replacement text (required for 'replace' mode). Can be empty string to delete the matched text."}, "replace_all": {"type": "boolean", "description": "Replace all occurrences instead of requiring a unique match (default: false)", "default": False}, "patch": {"type": "string", "description": "V4A format patch content (required for 'patch' mode). Format:\n*** Begin Patch\n*** Update File: path/to/file\n@@ context hint @@\n context line\n-removed line\n+added line\n*** End Patch"} }, "required": ["mode"] } } SEARCH_FILES_SCHEMA = { "name": "search_files", "description": "Search file contents or find files by name. Use this instead of grep/rg/find/ls in terminal. Ripgrep-backed, faster than shell equivalents.\n\nContent search (target='content'): Regex search inside files. Output modes: full matches with line numbers, file paths only, or match counts.\n\nFile search (target='files'): Find files by glob pattern (e.g., '*.py', '*config*'). Also use this instead of ls — results sorted by modification time.", "parameters": { "type": "object", "properties": { "pattern": {"type": "string", "description": "Regex pattern for content search, or glob pattern (e.g., '*.py') for file search"}, "target": {"type": "string", "enum": ["content", "files"], "description": "'content' searches inside file contents, 'files' searches for files by name", "default": "content"}, "path": {"type": "string", "description": "Directory or file to search in (default: current working directory)", "default": "."}, "file_glob": {"type": "string", "description": "Filter files by pattern in grep mode (e.g., '*.py' to only search Python files)"}, "limit": {"type": "integer", "description": "Maximum number of results to return (default: 50)", "default": 50}, "offset": {"type": "integer", "description": "Skip first N results for pagination (default: 0)", "default": 0}, "output_mode": {"type": "string", "enum": ["content", "files_only", "count"], "description": "Output format for grep mode: 'content' shows matching lines with line numbers, 'files_only' lists file paths, 'count' shows match counts per file", "default": "content"}, "context": {"type": "integer", "description": "Number of context lines before and after each match (grep mode only)", "default": 0} }, "required": ["pattern"] } } def _handle_read_file(args, **kw): tid = kw.get("task_id") or "default" return read_file_tool(path=args.get("path", ""), offset=args.get("offset", 1), limit=args.get("limit", 500), task_id=tid) def _handle_write_file(args, **kw): tid = kw.get("task_id") or "default" return write_file_tool(path=args.get("path", ""), content=args.get("content", ""), task_id=tid) def _handle_patch(args, **kw): tid = kw.get("task_id") or "default" return patch_tool( mode=args.get("mode", "replace"), path=args.get("path"), old_string=args.get("old_string"), new_string=args.get("new_string"), replace_all=args.get("replace_all", False), patch=args.get("patch"), task_id=tid) def _handle_search_files(args, **kw): tid = kw.get("task_id") or "default" target_map = {"grep": "content", "find": "files"} raw_target = args.get("target", "content") target = target_map.get(raw_target, raw_target) return search_tool( pattern=args.get("pattern", ""), target=target, path=args.get("path", "."), file_glob=args.get("file_glob"), limit=args.get("limit", 50), offset=args.get("offset", 0), output_mode=args.get("output_mode", "content"), context=args.get("context", 0), task_id=tid) registry.register(name="read_file", toolset="file", schema=READ_FILE_SCHEMA, handler=_handle_read_file, check_fn=_check_file_reqs, emoji="📖") registry.register(name="write_file", toolset="file", schema=WRITE_FILE_SCHEMA, handler=_handle_write_file, check_fn=_check_file_reqs, emoji="✍️") registry.register(name="patch", toolset="file", schema=PATCH_SCHEMA, handler=_handle_patch, check_fn=_check_file_reqs, emoji="🔧") registry.register(name="search_files", toolset="file", schema=SEARCH_FILES_SCHEMA, handler=_handle_search_files, check_fn=_check_file_reqs, emoji="🔎")