1169 lines
44 KiB
Python
1169 lines
44 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
File Operations Module
|
|
|
|
Provides file manipulation capabilities (read, write, patch, search) that work
|
|
across all terminal backends (local, docker, singularity, ssh, modal, daytona).
|
|
|
|
The key insight is that all file operations can be expressed as shell commands,
|
|
so we wrap the terminal backend's execute() interface to provide a unified file API.
|
|
|
|
Usage:
|
|
from tools.file_operations import ShellFileOperations
|
|
from tools.terminal_tool import _active_environments
|
|
|
|
# Get file operations for a terminal environment
|
|
file_ops = ShellFileOperations(terminal_env)
|
|
|
|
# Read a file
|
|
result = file_ops.read_file("/path/to/file.py")
|
|
|
|
# Write a file
|
|
result = file_ops.write_file("/path/to/new.py", "print('hello')")
|
|
|
|
# Search for content
|
|
result = file_ops.search("TODO", path=".", file_glob="*.py")
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import difflib
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Write-path deny list — blocks writes to sensitive system/credential files
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_HOME = str(Path.home())
|
|
|
|
WRITE_DENIED_PATHS = {
|
|
os.path.realpath(p) for p in [
|
|
os.path.join(_HOME, ".ssh", "authorized_keys"),
|
|
os.path.join(_HOME, ".ssh", "id_rsa"),
|
|
os.path.join(_HOME, ".ssh", "id_ed25519"),
|
|
os.path.join(_HOME, ".ssh", "config"),
|
|
str(get_hermes_home() / ".env"),
|
|
os.path.join(_HOME, ".bashrc"),
|
|
os.path.join(_HOME, ".zshrc"),
|
|
os.path.join(_HOME, ".profile"),
|
|
os.path.join(_HOME, ".bash_profile"),
|
|
os.path.join(_HOME, ".zprofile"),
|
|
os.path.join(_HOME, ".netrc"),
|
|
os.path.join(_HOME, ".pgpass"),
|
|
os.path.join(_HOME, ".npmrc"),
|
|
os.path.join(_HOME, ".pypirc"),
|
|
"/etc/sudoers",
|
|
"/etc/passwd",
|
|
"/etc/shadow",
|
|
]
|
|
}
|
|
|
|
WRITE_DENIED_PREFIXES = [
|
|
os.path.realpath(p) + os.sep for p in [
|
|
os.path.join(_HOME, ".ssh"),
|
|
os.path.join(_HOME, ".aws"),
|
|
os.path.join(_HOME, ".gnupg"),
|
|
os.path.join(_HOME, ".kube"),
|
|
"/etc/sudoers.d",
|
|
"/etc/systemd",
|
|
os.path.join(_HOME, ".docker"),
|
|
os.path.join(_HOME, ".azure"),
|
|
os.path.join(_HOME, ".config", "gh"),
|
|
]
|
|
]
|
|
|
|
|
|
def _get_safe_write_root() -> Optional[str]:
|
|
"""Return the resolved HERMES_WRITE_SAFE_ROOT path, or None if unset.
|
|
|
|
When set, all write_file/patch operations are constrained to this
|
|
directory tree. Writes outside it are denied even if the target is
|
|
not on the static deny list. Opt-in hardening for gateway/messaging
|
|
deployments that should only touch a workspace checkout.
|
|
"""
|
|
root = os.getenv("HERMES_WRITE_SAFE_ROOT", "")
|
|
if not root:
|
|
return None
|
|
try:
|
|
return os.path.realpath(os.path.expanduser(root))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _is_write_denied(path: str) -> bool:
|
|
"""Return True if path is on the write deny list."""
|
|
resolved = os.path.realpath(os.path.expanduser(str(path)))
|
|
|
|
# 1) Static deny list
|
|
if resolved in WRITE_DENIED_PATHS:
|
|
return True
|
|
for prefix in WRITE_DENIED_PREFIXES:
|
|
if resolved.startswith(prefix):
|
|
return True
|
|
|
|
# 2) Optional safe-root sandbox
|
|
safe_root = _get_safe_write_root()
|
|
if safe_root:
|
|
if not (resolved == safe_root or resolved.startswith(safe_root + os.sep)):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# Result Data Classes
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class ReadResult:
|
|
"""Result from reading a file."""
|
|
content: str = ""
|
|
total_lines: int = 0
|
|
file_size: int = 0
|
|
truncated: bool = False
|
|
hint: Optional[str] = None
|
|
is_binary: bool = False
|
|
is_image: bool = False
|
|
base64_content: Optional[str] = None
|
|
mime_type: Optional[str] = None
|
|
dimensions: Optional[str] = None # For images: "WIDTHxHEIGHT"
|
|
error: Optional[str] = None
|
|
similar_files: List[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None and v != []}
|
|
|
|
|
|
@dataclass
|
|
class WriteResult:
|
|
"""Result from writing a file."""
|
|
bytes_written: int = 0
|
|
dirs_created: bool = False
|
|
error: Optional[str] = None
|
|
warning: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {k: v for k, v in self.__dict__.items() if v is not None}
|
|
|
|
|
|
@dataclass
|
|
class PatchResult:
|
|
"""Result from patching a file."""
|
|
success: bool = False
|
|
diff: str = ""
|
|
files_modified: List[str] = field(default_factory=list)
|
|
files_created: List[str] = field(default_factory=list)
|
|
files_deleted: List[str] = field(default_factory=list)
|
|
lint: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"success": self.success}
|
|
if self.diff:
|
|
result["diff"] = self.diff
|
|
if self.files_modified:
|
|
result["files_modified"] = self.files_modified
|
|
if self.files_created:
|
|
result["files_created"] = self.files_created
|
|
if self.files_deleted:
|
|
result["files_deleted"] = self.files_deleted
|
|
if self.lint:
|
|
result["lint"] = self.lint
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class SearchMatch:
|
|
"""A single search match."""
|
|
path: str
|
|
line_number: int
|
|
content: str
|
|
mtime: float = 0.0 # Modification time for sorting
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
"""Result from searching."""
|
|
matches: List[SearchMatch] = field(default_factory=list)
|
|
files: List[str] = field(default_factory=list)
|
|
counts: Dict[str, int] = field(default_factory=dict)
|
|
total_count: int = 0
|
|
truncated: bool = False
|
|
error: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
result = {"total_count": self.total_count}
|
|
if self.matches:
|
|
result["matches"] = [
|
|
{"path": m.path, "line": m.line_number, "content": m.content}
|
|
for m in self.matches
|
|
]
|
|
if self.files:
|
|
result["files"] = self.files
|
|
if self.counts:
|
|
result["counts"] = self.counts
|
|
if self.truncated:
|
|
result["truncated"] = True
|
|
if self.error:
|
|
result["error"] = self.error
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class LintResult:
|
|
"""Result from linting a file."""
|
|
success: bool = True
|
|
skipped: bool = False
|
|
output: str = ""
|
|
message: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
if self.skipped:
|
|
return {"status": "skipped", "message": self.message}
|
|
return {
|
|
"status": "ok" if self.success else "error",
|
|
"output": self.output
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ExecuteResult:
|
|
"""Result from executing a shell command."""
|
|
stdout: str = ""
|
|
exit_code: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Abstract Interface
|
|
# =============================================================================
|
|
|
|
class FileOperations(ABC):
|
|
"""Abstract interface for file operations across terminal backends."""
|
|
|
|
@abstractmethod
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""Read a file with pagination support."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""Write content to a file, creating directories as needed."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""Replace text in a file using fuzzy matching."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""Apply a V4A format patch."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""Search for content or files."""
|
|
...
|
|
|
|
|
|
# =============================================================================
|
|
# Shell-based Implementation
|
|
# =============================================================================
|
|
|
|
# Binary file extensions (fast path check)
|
|
BINARY_EXTENSIONS = {
|
|
# Images
|
|
'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico', '.tiff', '.tif',
|
|
'.svg', # SVG is text but often treated as binary
|
|
# Audio/Video
|
|
'.mp3', '.mp4', '.wav', '.avi', '.mov', '.mkv', '.flac', '.ogg', '.webm',
|
|
# Archives
|
|
'.zip', '.tar', '.gz', '.bz2', '.xz', '.7z', '.rar',
|
|
# Documents
|
|
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
|
|
# Compiled/Binary
|
|
'.exe', '.dll', '.so', '.dylib', '.o', '.a', '.pyc', '.pyo', '.class',
|
|
'.wasm', '.bin',
|
|
# Fonts
|
|
'.ttf', '.otf', '.woff', '.woff2', '.eot',
|
|
# Other
|
|
'.db', '.sqlite', '.sqlite3',
|
|
}
|
|
|
|
# Image extensions (subset of binary that we can return as base64)
|
|
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
|
|
|
|
# Linters by file extension
|
|
LINTERS = {
|
|
'.py': 'python -m py_compile {file} 2>&1',
|
|
'.js': 'node --check {file} 2>&1',
|
|
'.ts': 'npx tsc --noEmit {file} 2>&1',
|
|
'.go': 'go vet {file} 2>&1',
|
|
'.rs': 'rustfmt --check {file} 2>&1',
|
|
}
|
|
|
|
# Max limits for read operations
|
|
MAX_LINES = 2000
|
|
MAX_LINE_LENGTH = 2000
|
|
MAX_FILE_SIZE = 50 * 1024 # 50KB
|
|
|
|
|
|
class ShellFileOperations(FileOperations):
|
|
"""
|
|
File operations implemented via shell commands.
|
|
|
|
Works with ANY terminal backend that has execute(command, cwd) method.
|
|
This includes local, docker, singularity, ssh, modal, and daytona environments.
|
|
"""
|
|
|
|
def __init__(self, terminal_env, cwd: str = None):
|
|
"""
|
|
Initialize file operations with a terminal environment.
|
|
|
|
Args:
|
|
terminal_env: Any object with execute(command, cwd) method.
|
|
Returns {"output": str, "returncode": int}
|
|
cwd: Working directory (defaults to env's cwd or current directory)
|
|
"""
|
|
self.env = terminal_env
|
|
# Determine cwd from various possible sources.
|
|
# IMPORTANT: do NOT fall back to os.getcwd() -- that's the HOST's local
|
|
# path which doesn't exist inside container/cloud backends (modal, docker).
|
|
# If nothing provides a cwd, use "/" as a safe universal default.
|
|
self.cwd = cwd or getattr(terminal_env, 'cwd', None) or \
|
|
getattr(getattr(terminal_env, 'config', None), 'cwd', None) or "/"
|
|
|
|
# Cache for command availability checks
|
|
self._command_cache: Dict[str, bool] = {}
|
|
|
|
def _exec(self, command: str, cwd: str = None, timeout: int = None,
|
|
stdin_data: str = None) -> ExecuteResult:
|
|
"""Execute command via terminal backend.
|
|
|
|
Args:
|
|
stdin_data: If provided, piped to the process's stdin instead of
|
|
embedding in the command string. Bypasses ARG_MAX.
|
|
"""
|
|
kwargs = {}
|
|
if timeout:
|
|
kwargs['timeout'] = timeout
|
|
if stdin_data is not None:
|
|
kwargs['stdin_data'] = stdin_data
|
|
|
|
result = self.env.execute(command, cwd=cwd or self.cwd, **kwargs)
|
|
return ExecuteResult(
|
|
stdout=result.get("output", ""),
|
|
exit_code=result.get("returncode", 0)
|
|
)
|
|
|
|
def _has_command(self, cmd: str) -> bool:
|
|
"""Check if a command exists in the environment (cached)."""
|
|
if cmd not in self._command_cache:
|
|
result = self._exec(f"command -v {cmd} >/dev/null 2>&1 && echo 'yes'")
|
|
self._command_cache[cmd] = result.stdout.strip() == 'yes'
|
|
return self._command_cache[cmd]
|
|
|
|
def _is_likely_binary(self, path: str, content_sample: str = None) -> bool:
|
|
"""
|
|
Check if a file is likely binary.
|
|
|
|
Uses extension check (fast) + content analysis (fallback).
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
if ext in BINARY_EXTENSIONS:
|
|
return True
|
|
|
|
# Content analysis: >30% non-printable chars = binary
|
|
if content_sample:
|
|
if not content_sample:
|
|
return False
|
|
non_printable = sum(1 for c in content_sample[:1000]
|
|
if ord(c) < 32 and c not in '\n\r\t')
|
|
return non_printable / min(len(content_sample), 1000) > 0.30
|
|
|
|
return False
|
|
|
|
def _is_image(self, path: str) -> bool:
|
|
"""Check if file is an image we can return as base64."""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
return ext in IMAGE_EXTENSIONS
|
|
|
|
def _add_line_numbers(self, content: str, start_line: int = 1) -> str:
|
|
"""Add line numbers to content in LINE_NUM|CONTENT format."""
|
|
lines = content.split('\n')
|
|
numbered = []
|
|
for i, line in enumerate(lines, start=start_line):
|
|
# Truncate long lines
|
|
if len(line) > MAX_LINE_LENGTH:
|
|
line = line[:MAX_LINE_LENGTH] + "... [truncated]"
|
|
numbered.append(f"{i:6d}|{line}")
|
|
return '\n'.join(numbered)
|
|
|
|
def _expand_path(self, path: str) -> str:
|
|
"""
|
|
Expand shell-style paths like ~ and ~user to absolute paths.
|
|
|
|
This must be done BEFORE shell escaping, since ~ doesn't expand
|
|
inside single quotes.
|
|
"""
|
|
if not path:
|
|
return path
|
|
|
|
# Handle ~ and ~user
|
|
if path.startswith('~'):
|
|
# Get home directory via the terminal environment
|
|
result = self._exec("echo $HOME")
|
|
if result.exit_code == 0 and result.stdout.strip():
|
|
home = result.stdout.strip()
|
|
if path == '~':
|
|
return home
|
|
elif path.startswith('~/'):
|
|
return home + path[1:] # Replace ~ with home
|
|
# ~username format - extract and validate username before
|
|
# letting shell expand it (prevent shell injection via
|
|
# paths like "~; rm -rf /").
|
|
rest = path[1:] # strip leading ~
|
|
slash_idx = rest.find('/')
|
|
username = rest[:slash_idx] if slash_idx >= 0 else rest
|
|
if username and re.fullmatch(r'[a-zA-Z0-9._-]+', username):
|
|
# Only expand ~username (not the full path) to avoid shell
|
|
# injection via path suffixes like "~user/$(malicious)".
|
|
expand_result = self._exec(f"echo ~{username}")
|
|
if expand_result.exit_code == 0 and expand_result.stdout.strip():
|
|
user_home = expand_result.stdout.strip()
|
|
suffix = path[1 + len(username):] # e.g. "/rest/of/path"
|
|
return user_home + suffix
|
|
|
|
return path
|
|
|
|
def _escape_shell_arg(self, arg: str) -> str:
|
|
"""Escape a string for safe use in shell commands."""
|
|
# Use single quotes and escape any single quotes in the string
|
|
return "'" + arg.replace("'", "'\"'\"'") + "'"
|
|
|
|
def _unified_diff(self, old_content: str, new_content: str, filename: str) -> str:
|
|
"""Generate unified diff between old and new content."""
|
|
old_lines = old_content.splitlines(keepends=True)
|
|
new_lines = new_content.splitlines(keepends=True)
|
|
diff = difflib.unified_diff(
|
|
old_lines, new_lines,
|
|
fromfile=f"a/{filename}",
|
|
tofile=f"b/{filename}"
|
|
)
|
|
return ''.join(diff)
|
|
|
|
# =========================================================================
|
|
# READ Implementation
|
|
# =========================================================================
|
|
|
|
def read_file(self, path: str, offset: int = 1, limit: int = 500) -> ReadResult:
|
|
"""
|
|
Read a file with pagination, binary detection, and line numbers.
|
|
|
|
Args:
|
|
path: File path (absolute or relative to cwd)
|
|
offset: Line number to start from (1-indexed, default 1)
|
|
limit: Maximum lines to return (default 500, max 2000)
|
|
|
|
Returns:
|
|
ReadResult with content, metadata, or error info
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Clamp limit
|
|
limit = min(limit, MAX_LINES)
|
|
|
|
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
if stat_result.exit_code != 0:
|
|
# File not found - try to suggest similar files
|
|
return self._suggest_similar_files(path)
|
|
|
|
try:
|
|
file_size = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
|
|
# Check if file is too large
|
|
if file_size > MAX_FILE_SIZE:
|
|
# Still try to read, but warn
|
|
pass
|
|
|
|
# Images are never inlined — redirect to the vision tool
|
|
if self._is_image(path):
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
hint=(
|
|
"Image file detected. Automatically redirected to vision_analyze tool. "
|
|
"Use vision_analyze with this file path to inspect the image contents."
|
|
),
|
|
)
|
|
|
|
# Read a sample to check for binary content
|
|
sample_cmd = f"head -c 1000 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
sample_result = self._exec(sample_cmd)
|
|
|
|
if self._is_likely_binary(path, sample_result.stdout):
|
|
return ReadResult(
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
error="Binary file - cannot display as text. Use appropriate tools to handle this file type."
|
|
)
|
|
|
|
# Read with pagination using sed
|
|
end_line = offset + limit - 1
|
|
read_cmd = f"sed -n '{offset},{end_line}p' {self._escape_shell_arg(path)}"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return ReadResult(error=f"Failed to read file: {read_result.stdout}")
|
|
|
|
# Get total line count
|
|
wc_cmd = f"wc -l < {self._escape_shell_arg(path)}"
|
|
wc_result = self._exec(wc_cmd)
|
|
try:
|
|
total_lines = int(wc_result.stdout.strip())
|
|
except ValueError:
|
|
total_lines = 0
|
|
|
|
# Check if truncated
|
|
truncated = total_lines > end_line
|
|
hint = None
|
|
if truncated:
|
|
hint = f"Use offset={end_line + 1} to continue reading (showing {offset}-{end_line} of {total_lines} lines)"
|
|
|
|
return ReadResult(
|
|
content=self._add_line_numbers(read_result.stdout, offset),
|
|
total_lines=total_lines,
|
|
file_size=file_size,
|
|
truncated=truncated,
|
|
hint=hint
|
|
)
|
|
|
|
# Images larger than this are too expensive to inline as base64 in the
|
|
# conversation context. Return metadata only and suggest vision_analyze.
|
|
MAX_IMAGE_BYTES = 512 * 1024 # 512 KB
|
|
|
|
def _read_image(self, path: str) -> ReadResult:
|
|
"""Read an image file, returning base64 content."""
|
|
# Get file size (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
try:
|
|
file_size = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
file_size = 0
|
|
|
|
if file_size > self.MAX_IMAGE_BYTES:
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
hint=(
|
|
f"Image is too large to inline ({file_size:,} bytes). "
|
|
"Use vision_analyze to inspect the image, or reference it by path."
|
|
),
|
|
)
|
|
|
|
# Get base64 content
|
|
b64_cmd = f"base64 -w 0 {self._escape_shell_arg(path)} 2>/dev/null"
|
|
b64_result = self._exec(b64_cmd, timeout=30)
|
|
|
|
if b64_result.exit_code != 0:
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
error=f"Failed to read image: {b64_result.stdout}"
|
|
)
|
|
|
|
# Try to get dimensions (requires ImageMagick)
|
|
dimensions = None
|
|
if self._has_command('identify'):
|
|
dim_cmd = f"identify -format '%wx%h' {self._escape_shell_arg(path)} 2>/dev/null"
|
|
dim_result = self._exec(dim_cmd)
|
|
if dim_result.exit_code == 0:
|
|
dimensions = dim_result.stdout.strip()
|
|
|
|
# Determine MIME type from extension
|
|
ext = os.path.splitext(path)[1].lower()
|
|
mime_types = {
|
|
'.png': 'image/png',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg',
|
|
'.gif': 'image/gif',
|
|
'.webp': 'image/webp',
|
|
'.bmp': 'image/bmp',
|
|
'.ico': 'image/x-icon',
|
|
}
|
|
mime_type = mime_types.get(ext, 'application/octet-stream')
|
|
|
|
return ReadResult(
|
|
is_image=True,
|
|
is_binary=True,
|
|
file_size=file_size,
|
|
base64_content=b64_result.stdout,
|
|
mime_type=mime_type,
|
|
dimensions=dimensions
|
|
)
|
|
|
|
def _suggest_similar_files(self, path: str) -> ReadResult:
|
|
"""Suggest similar files when the requested file is not found."""
|
|
# Get directory and filename
|
|
dir_path = os.path.dirname(path) or "."
|
|
filename = os.path.basename(path)
|
|
|
|
# List files in directory
|
|
ls_cmd = f"ls -1 {self._escape_shell_arg(dir_path)} 2>/dev/null | head -20"
|
|
ls_result = self._exec(ls_cmd)
|
|
|
|
similar = []
|
|
if ls_result.exit_code == 0 and ls_result.stdout.strip():
|
|
files = ls_result.stdout.strip().split('\n')
|
|
# Simple similarity: files that share some characters with the target
|
|
for f in files:
|
|
# Check if filenames share significant overlap
|
|
common = set(filename.lower()) & set(f.lower())
|
|
if len(common) >= len(filename) * 0.5: # 50% character overlap
|
|
similar.append(os.path.join(dir_path, f))
|
|
|
|
return ReadResult(
|
|
error=f"File not found: {path}",
|
|
similar_files=similar[:5] # Limit to 5 suggestions
|
|
)
|
|
|
|
# =========================================================================
|
|
# WRITE Implementation
|
|
# =========================================================================
|
|
|
|
def write_file(self, path: str, content: str) -> WriteResult:
|
|
"""
|
|
Write content to a file, creating parent directories as needed.
|
|
|
|
Pipes content through stdin to avoid OS ARG_MAX limits on large
|
|
files. The content never appears in the shell command string —
|
|
only the file path does.
|
|
|
|
Args:
|
|
path: File path to write
|
|
content: Content to write
|
|
|
|
Returns:
|
|
WriteResult with bytes written or error
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Create parent directories
|
|
parent = os.path.dirname(path)
|
|
dirs_created = False
|
|
|
|
if parent:
|
|
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
|
|
mkdir_result = self._exec(mkdir_cmd)
|
|
if mkdir_result.exit_code == 0:
|
|
dirs_created = True
|
|
|
|
# Write via stdin pipe — content bypasses shell arg parsing entirely,
|
|
# so there's no ARG_MAX limit regardless of file size.
|
|
write_cmd = f"cat > {self._escape_shell_arg(path)}"
|
|
write_result = self._exec(write_cmd, stdin_data=content)
|
|
|
|
if write_result.exit_code != 0:
|
|
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
|
|
|
|
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
|
|
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
|
|
stat_result = self._exec(stat_cmd)
|
|
|
|
try:
|
|
bytes_written = int(stat_result.stdout.strip())
|
|
except ValueError:
|
|
bytes_written = len(content.encode('utf-8'))
|
|
|
|
return WriteResult(
|
|
bytes_written=bytes_written,
|
|
dirs_created=dirs_created
|
|
)
|
|
|
|
# =========================================================================
|
|
# PATCH Implementation (Replace Mode)
|
|
# =========================================================================
|
|
|
|
def patch_replace(self, path: str, old_string: str, new_string: str,
|
|
replace_all: bool = False) -> PatchResult:
|
|
"""
|
|
Replace text in a file using fuzzy matching.
|
|
|
|
Args:
|
|
path: File path to modify
|
|
old_string: Text to find (must be unique unless replace_all=True)
|
|
new_string: Replacement text
|
|
replace_all: If True, replace all occurrences
|
|
|
|
Returns:
|
|
PatchResult with diff and lint results
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Block writes to sensitive paths
|
|
if _is_write_denied(path):
|
|
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
|
|
|
|
# Read current content
|
|
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
|
|
read_result = self._exec(read_cmd)
|
|
|
|
if read_result.exit_code != 0:
|
|
return PatchResult(error=f"Failed to read file: {path}")
|
|
|
|
content = read_result.stdout
|
|
|
|
# Import and use fuzzy matching
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
new_content, match_count, error = fuzzy_find_and_replace(
|
|
content, old_string, new_string, replace_all
|
|
)
|
|
|
|
if error:
|
|
return PatchResult(error=error)
|
|
|
|
if match_count == 0:
|
|
return PatchResult(error=f"Could not find match for old_string in {path}")
|
|
|
|
# Write back
|
|
write_result = self.write_file(path, new_content)
|
|
if write_result.error:
|
|
return PatchResult(error=f"Failed to write changes: {write_result.error}")
|
|
|
|
# Generate diff
|
|
diff = self._unified_diff(content, new_content, path)
|
|
|
|
# Auto-lint
|
|
lint_result = self._check_lint(path)
|
|
|
|
return PatchResult(
|
|
success=True,
|
|
diff=diff,
|
|
files_modified=[path],
|
|
lint=lint_result.to_dict() if lint_result else None
|
|
)
|
|
|
|
def patch_v4a(self, patch_content: str) -> PatchResult:
|
|
"""
|
|
Apply a V4A format patch.
|
|
|
|
V4A format:
|
|
*** Begin Patch
|
|
*** Update File: path/to/file.py
|
|
@@ context hint @@
|
|
context line
|
|
-removed line
|
|
+added line
|
|
*** End Patch
|
|
|
|
Args:
|
|
patch_content: V4A format patch string
|
|
|
|
Returns:
|
|
PatchResult with changes made
|
|
"""
|
|
# Import patch parser
|
|
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
|
|
|
operations, parse_error = parse_v4a_patch(patch_content)
|
|
if parse_error:
|
|
return PatchResult(error=f"Failed to parse patch: {parse_error}")
|
|
|
|
# Apply operations
|
|
result = apply_v4a_operations(operations, self)
|
|
return result
|
|
|
|
def _check_lint(self, path: str) -> LintResult:
|
|
"""
|
|
Run syntax check on a file after editing.
|
|
|
|
Args:
|
|
path: File path to lint
|
|
|
|
Returns:
|
|
LintResult with status and any errors
|
|
"""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
|
|
if ext not in LINTERS:
|
|
return LintResult(skipped=True, message=f"No linter for {ext} files")
|
|
|
|
# Check if linter command is available
|
|
linter_cmd = LINTERS[ext]
|
|
# Extract the base command (first word)
|
|
base_cmd = linter_cmd.split()[0]
|
|
|
|
if not self._has_command(base_cmd):
|
|
return LintResult(skipped=True, message=f"{base_cmd} not available")
|
|
|
|
# Run linter
|
|
cmd = linter_cmd.format(file=self._escape_shell_arg(path))
|
|
result = self._exec(cmd, timeout=30)
|
|
|
|
return LintResult(
|
|
success=result.exit_code == 0,
|
|
output=result.stdout.strip() if result.stdout.strip() else ""
|
|
)
|
|
|
|
# =========================================================================
|
|
# SEARCH Implementation
|
|
# =========================================================================
|
|
|
|
def search(self, pattern: str, path: str = ".", target: str = "content",
|
|
file_glob: Optional[str] = None, limit: int = 50, offset: int = 0,
|
|
output_mode: str = "content", context: int = 0) -> SearchResult:
|
|
"""
|
|
Search for content or files.
|
|
|
|
Args:
|
|
pattern: Regex (for content) or glob pattern (for files)
|
|
path: Directory/file to search (default: cwd)
|
|
target: "content" (grep) or "files" (glob)
|
|
file_glob: File pattern filter for content search (e.g., "*.py")
|
|
limit: Max results (default 50)
|
|
offset: Skip first N results
|
|
output_mode: "content", "files_only", or "count"
|
|
context: Lines of context around matches
|
|
|
|
Returns:
|
|
SearchResult with matches or file list
|
|
"""
|
|
# Expand ~ and other shell paths
|
|
path = self._expand_path(path)
|
|
|
|
# Validate that the path exists before searching
|
|
check = self._exec(f"test -e {self._escape_shell_arg(path)} && echo exists || echo not_found")
|
|
if "not_found" in check.stdout:
|
|
return SearchResult(
|
|
error=f"Path not found: {path}. Verify the path exists (use 'terminal' to check).",
|
|
total_count=0
|
|
)
|
|
|
|
if target == "files":
|
|
return self._search_files(pattern, path, limit, offset)
|
|
else:
|
|
return self._search_content(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
|
|
def _search_files(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name pattern (glob-like)."""
|
|
# Auto-prepend **/ for recursive search if not already present
|
|
if not pattern.startswith('**/') and '/' not in pattern:
|
|
search_pattern = pattern
|
|
else:
|
|
search_pattern = pattern.split('/')[-1]
|
|
|
|
# Prefer ripgrep: respects .gitignore, excludes hidden dirs by
|
|
# default, and has parallel directory traversal (~200x faster than
|
|
# find on wide trees). Mirrors _search_content which already uses rg.
|
|
if self._has_command('rg'):
|
|
return self._search_files_rg(search_pattern, path, limit, offset)
|
|
|
|
# Fallback: find (slower, no .gitignore awareness)
|
|
if not self._has_command('find'):
|
|
return SearchResult(
|
|
error="File search requires 'rg' (ripgrep) or 'find'. "
|
|
"Install ripgrep for best results: "
|
|
"https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
hidden_exclude = "-not -path '*/.*'"
|
|
|
|
cmd = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"-printf '%T@ %p\\\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
|
|
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
if not result.stdout.strip():
|
|
# Try without -printf (BSD find compatibility -- macOS)
|
|
cmd_simple = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
|
f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}"
|
|
result = self._exec(cmd_simple, timeout=60)
|
|
|
|
files = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(' ', 1)
|
|
if len(parts) == 2 and parts[0].replace('.', '').isdigit():
|
|
files.append(parts[1])
|
|
else:
|
|
files.append(line)
|
|
|
|
return SearchResult(
|
|
files=files,
|
|
total_count=len(files)
|
|
)
|
|
|
|
def _search_files_rg(self, pattern: str, path: str, limit: int, offset: int) -> SearchResult:
|
|
"""Search for files by name using ripgrep's --files mode.
|
|
|
|
rg --files respects .gitignore and excludes hidden directories by
|
|
default, and uses parallel directory traversal for ~200x speedup
|
|
over find on wide trees.
|
|
"""
|
|
# rg --files -g uses glob patterns; wrap bare names so they match
|
|
# at any depth (equivalent to find -name).
|
|
if '/' not in pattern and not pattern.startswith('*'):
|
|
glob_pattern = f"*{pattern}"
|
|
else:
|
|
glob_pattern = pattern
|
|
|
|
fetch_limit = limit + offset
|
|
cmd = (
|
|
f"rg --files -g {self._escape_shell_arg(glob_pattern)} "
|
|
f"{self._escape_shell_arg(path)} 2>/dev/null "
|
|
f"| head -n {fetch_limit}"
|
|
)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
page = all_files[offset:offset + limit]
|
|
|
|
return SearchResult(
|
|
files=page,
|
|
total_count=len(all_files),
|
|
truncated=len(all_files) >= fetch_limit,
|
|
)
|
|
|
|
def _search_content(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search for content inside files (grep-like)."""
|
|
# Try ripgrep first (fast), fallback to grep (slower but works)
|
|
if self._has_command('rg'):
|
|
return self._search_with_rg(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
elif self._has_command('grep'):
|
|
return self._search_with_grep(pattern, path, file_glob, limit, offset,
|
|
output_mode, context)
|
|
else:
|
|
# Neither rg nor grep available (Windows without Git Bash, etc.)
|
|
return SearchResult(
|
|
error="Content search requires ripgrep (rg) or grep. "
|
|
"Install ripgrep: https://github.com/BurntSushi/ripgrep#installation"
|
|
)
|
|
|
|
def _search_with_rg(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Search using ripgrep."""
|
|
cmd_parts = ["rg", "--line-number", "--no-heading", "--with-filename"]
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file glob filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--glob", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l") # Files only
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c") # Count per file
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch extra rows so we can report the true total before slicing.
|
|
# For context mode, rg emits separator lines ("--") between groups,
|
|
# so we grab generously and filter in Python.
|
|
fetch_limit = limit + offset + 200 if context > 0 else limit + offset
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# rg exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
# Parse results based on output mode
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# Parse content matches and context lines.
|
|
# rg match lines: "file:lineno:content" (colon separator)
|
|
# rg context lines: "file-lineno-content" (dash separator)
|
|
# rg group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
# Try match line first (colon-separated: file:line:content)
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
# Try context line (dash-separated: file-line-content)
|
|
# Only attempt if context was requested to avoid false positives
|
|
if context > 0:
|
|
m = _ctx_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|
|
|
|
def _search_with_grep(self, pattern: str, path: str, file_glob: Optional[str],
|
|
limit: int, offset: int, output_mode: str, context: int) -> SearchResult:
|
|
"""Fallback search using grep."""
|
|
cmd_parts = ["grep", "-rnH"] # -H forces filename even for single-file searches
|
|
|
|
# Exclude hidden directories (matching ripgrep's default behavior).
|
|
# This prevents searching inside .hub/index-cache/, .git/, etc.
|
|
cmd_parts.append("--exclude-dir='.*'")
|
|
|
|
# Add context if requested
|
|
if context > 0:
|
|
cmd_parts.extend(["-C", str(context)])
|
|
|
|
# Add file pattern filter (must be quoted to prevent shell expansion)
|
|
if file_glob:
|
|
cmd_parts.extend(["--include", self._escape_shell_arg(file_glob)])
|
|
|
|
# Output mode handling
|
|
if output_mode == "files_only":
|
|
cmd_parts.append("-l")
|
|
elif output_mode == "count":
|
|
cmd_parts.append("-c")
|
|
|
|
# Add pattern and path
|
|
cmd_parts.append(self._escape_shell_arg(pattern))
|
|
cmd_parts.append(self._escape_shell_arg(path))
|
|
|
|
# Fetch generously so we can compute total before slicing
|
|
fetch_limit = limit + offset + (200 if context > 0 else 0)
|
|
cmd_parts.extend(["|", "head", "-n", str(fetch_limit)])
|
|
|
|
cmd = " ".join(cmd_parts)
|
|
result = self._exec(cmd, timeout=60)
|
|
|
|
# grep exit codes: 0=matches found, 1=no matches, 2=error
|
|
if result.exit_code == 2 and not result.stdout.strip():
|
|
error_msg = result.stderr.strip() if hasattr(result, 'stderr') and result.stderr else "Search error"
|
|
return SearchResult(error=f"Search failed: {error_msg}", total_count=0)
|
|
|
|
if output_mode == "files_only":
|
|
all_files = [f for f in result.stdout.strip().split('\n') if f]
|
|
total = len(all_files)
|
|
page = all_files[offset:offset + limit]
|
|
return SearchResult(files=page, total_count=total)
|
|
|
|
elif output_mode == "count":
|
|
counts = {}
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.rsplit(':', 1)
|
|
if len(parts) == 2:
|
|
try:
|
|
counts[parts[0]] = int(parts[1])
|
|
except ValueError:
|
|
pass
|
|
return SearchResult(counts=counts, total_count=sum(counts.values()))
|
|
|
|
else:
|
|
# grep match lines: "file:lineno:content" (colon)
|
|
# grep context lines: "file-lineno-content" (dash)
|
|
# grep group seps: "--"
|
|
# Note: on Windows, paths contain drive letters (e.g. C:\path),
|
|
# so naive split(":") breaks. Use regex to handle both platforms.
|
|
_match_re = re.compile(r'^([A-Za-z]:)?(.*?):(\d+):(.*)$')
|
|
_ctx_re = re.compile(r'^([A-Za-z]:)?(.*?)-(\d+)-(.*)$')
|
|
matches = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or line == "--":
|
|
continue
|
|
|
|
m = _match_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
continue
|
|
|
|
if context > 0:
|
|
m = _ctx_re.match(line)
|
|
if m:
|
|
matches.append(SearchMatch(
|
|
path=(m.group(1) or '') + m.group(2),
|
|
line_number=int(m.group(3)),
|
|
content=m.group(4)[:500]
|
|
))
|
|
|
|
|
|
total = len(matches)
|
|
page = matches[offset:offset + limit]
|
|
return SearchResult(
|
|
matches=page,
|
|
total_count=total,
|
|
truncated=total > offset + limit
|
|
)
|