diff --git a/tests/tools/test_file_tools_live.py b/tests/tools/test_file_tools_live.py index 99627b91a..426b3543b 100644 --- a/tests/tools/test_file_tools_live.py +++ b/tests/tools/test_file_tools_live.py @@ -17,7 +17,13 @@ import pytest sys.path.insert(0, str(Path(__file__).resolve().parents[2])) -from tools.environments.local import LocalEnvironment, _clean_shell_noise, _SHELL_NOISE_SUBSTRINGS +from tools.environments.local import ( + LocalEnvironment, + _clean_shell_noise, + _extract_fenced_output, + _OUTPUT_FENCE, + _SHELL_NOISE_SUBSTRINGS, +) from tools.file_operations import ShellFileOperations @@ -28,6 +34,7 @@ from tools.file_operations import ShellFileOperations _ALL_NOISE_PATTERNS = list(_SHELL_NOISE_SUBSTRINGS) + [ "bash: ", "Inappropriate ioctl", + "Auto-suggestions:", ] @@ -125,11 +132,84 @@ class TestCleanShellNoise: _assert_clean(result) def test_noise_in_middle_not_stripped(self): - """Only LEADING noise is stripped -- noise in the middle is real output.""" + """Noise in the middle is real output and should be preserved.""" output = "real\nbash: no job control in this shell\nmore real\n" result = _clean_shell_noise(output) assert result == output + def test_zsh_restored_session(self): + output = "Restored session: Mon Mar 2 22:16:54 +03 2026\nhello\n" + result = _clean_shell_noise(output) + assert result == "hello\n" + + def test_zsh_saving_session_trailing(self): + output = "hello\nSaving session...completed.\n" + result = _clean_shell_noise(output) + assert result == "hello\n" + + def test_zsh_oh_my_zsh_banner(self): + output = "Oh My Zsh on! | Auto-suggestions: press right\nhello\n" + result = _clean_shell_noise(output) + assert result == "hello\n" + + def test_zsh_full_noise_sandwich(self): + """Both leading and trailing zsh noise stripped.""" + output = ( + "Restored session: Mon Mar 2\n" + "command not found: docker\n" + "Oh My Zsh on!\n" + "actual output\n" + "Saving session...completed.\n" + ) + result = _clean_shell_noise(output) + assert result == "actual output\n" + + def test_last_login_stripped(self): + output = "Last login: Mon Mar 2 22:00:00 on ttys001\nhello\n" + result = _clean_shell_noise(output) + assert result == "hello\n" + + +# ── _extract_fenced_output unit tests ──────────────────────────────────── + +class TestExtractFencedOutput: + def test_normal_fenced_output(self): + raw = f"noise\n{_OUTPUT_FENCE}hello world\n{_OUTPUT_FENCE}more noise\n" + assert _extract_fenced_output(raw) == "hello world\n" + + def test_no_trailing_newline(self): + """printf output with no trailing newline is preserved.""" + raw = f"noise{_OUTPUT_FENCE}exact{_OUTPUT_FENCE}noise" + assert _extract_fenced_output(raw) == "exact" + + def test_no_fences_falls_back(self): + """Without fences, falls back to pattern-based cleaning.""" + raw = "bash: no job control in this shell\nhello\n" + result = _extract_fenced_output(raw) + assert result == "hello\n" + + def test_only_start_fence(self): + """Only start fence (e.g. user command called exit).""" + raw = f"noise{_OUTPUT_FENCE}hello\nSaving session...\n" + result = _extract_fenced_output(raw) + assert result == "hello\n" + + def test_user_outputs_fence_string(self): + """If user command outputs the fence marker, it is preserved.""" + raw = f"noise{_OUTPUT_FENCE}{_OUTPUT_FENCE}real\n{_OUTPUT_FENCE}noise" + result = _extract_fenced_output(raw) + # first fence -> last fence captures the middle including user's fence + assert _OUTPUT_FENCE in result + assert "real\n" in result + + def test_empty_command_output(self): + raw = f"noise{_OUTPUT_FENCE}{_OUTPUT_FENCE}noise" + assert _extract_fenced_output(raw) == "" + + def test_multiline_output(self): + raw = f"noise\n{_OUTPUT_FENCE}line1\nline2\nline3\n{_OUTPUT_FENCE}noise\n" + assert _extract_fenced_output(raw) == "line1\nline2\nline3\n" + # ── LocalEnvironment.execute() ─────────────────────────────────────────── diff --git a/tests/tools/test_terminal_disk_usage.py b/tests/tools/test_terminal_disk_usage.py index 72dcc608a..c23975181 100644 --- a/tests/tools/test_terminal_disk_usage.py +++ b/tests/tools/test_terminal_disk_usage.py @@ -5,6 +5,12 @@ from unittest.mock import patch, MagicMock import pytest +# tools/__init__.py re-exports a *function* called ``terminal_tool`` which +# shadows the module of the same name. Use sys.modules to get the real module +# so patch.object works correctly. +import sys +import tools.terminal_tool # noqa: F401 -- ensure module is loaded +_tt_mod = sys.modules["tools.terminal_tool"] from tools.terminal_tool import get_active_environments_info # 1 MiB of data so the rounded MB value is clearly distinguishable @@ -34,10 +40,8 @@ class TestDiskUsageGlob: "aaaaaaaa-1111-2222-3333-444444444444": MagicMock(), } - with ( - patch("tools.terminal_tool._active_environments", fake_envs), - patch("tools.terminal_tool._get_scratch_dir", return_value=fake_scratch), - ): + with patch.object(_tt_mod, "_active_environments", fake_envs), \ + patch.object(_tt_mod, "_get_scratch_dir", return_value=fake_scratch): info = get_active_environments_info() # Task A only: ~1.0 MB. With the bug (hardcoded hermes-*), @@ -51,10 +55,8 @@ class TestDiskUsageGlob: "bbbbbbbb-5555-6666-7777-888888888888": MagicMock(), } - with ( - patch("tools.terminal_tool._active_environments", fake_envs), - patch("tools.terminal_tool._get_scratch_dir", return_value=fake_scratch), - ): + with patch.object(_tt_mod, "_active_environments", fake_envs), \ + patch.object(_tt_mod, "_get_scratch_dir", return_value=fake_scratch): info = get_active_environments_info() # Should be ~2.0 MB total (1 MB per task). diff --git a/tools/environments/local.py b/tools/environments/local.py index 5b70a2707..fccf5909b 100644 --- a/tools/environments/local.py +++ b/tools/environments/local.py @@ -9,28 +9,84 @@ import time from tools.environments.base import BaseEnvironment +# Unique marker to isolate real command output from shell init/exit noise. +# printf (no trailing newline) keeps the boundaries clean for splitting. +_OUTPUT_FENCE = "__HERMES_FENCE_a9f7b3__" + # Noise lines emitted by interactive shells when stdin is not a terminal. -# Filtered from output to keep tool results clean. +# Used as a fallback when output fence markers are missing. _SHELL_NOISE_SUBSTRINGS = ( + # bash "bash: cannot set terminal process group", "bash: no job control in this shell", "no job control in this shell", "cannot set terminal process group", "tcsetattr: Inappropriate ioctl for device", + # zsh / oh-my-zsh / macOS terminal session + "Restored session:", + "Saving session...", + "Last login:", + "command not found:", + "Oh My Zsh", + "compinit:", ) def _clean_shell_noise(output: str) -> str: - """Strip shell startup warnings that leak when using -i without a TTY. + """Strip shell startup/exit warnings that leak when using -i without a TTY. - Removes all leading lines that match known noise patterns, not just the first. - Some environments emit multiple noise lines (e.g. Docker, non-TTY sessions). + Removes lines matching known noise patterns from both the beginning + and end of the output. Lines in the middle are left untouched. """ + + def _is_noise(line: str) -> bool: + return any(noise in line for noise in _SHELL_NOISE_SUBSTRINGS) + lines = output.split("\n") - # Strip all leading noise lines - while lines and any(noise in lines[0] for noise in _SHELL_NOISE_SUBSTRINGS): + + # Strip leading noise + while lines and _is_noise(lines[0]): lines.pop(0) - return "\n".join(lines) + + # Strip trailing noise (walk backwards, skip empty lines from split) + end = len(lines) - 1 + while end >= 0 and (not lines[end] or _is_noise(lines[end])): + end -= 1 + + if end < 0: + return "" + + cleaned = lines[: end + 1] + result = "\n".join(cleaned) + + # Preserve trailing newline if original had one + if output.endswith("\n") and result and not result.endswith("\n"): + result += "\n" + return result + + +def _extract_fenced_output(raw: str) -> str: + """Extract real command output from between fence markers. + + The execute() method wraps each command with printf(FENCE) markers. + This function finds the first and last fence and returns only the + content between them, which is the actual command output free of + any shell init/exit noise. + + Falls back to pattern-based _clean_shell_noise if fences are missing. + """ + first = raw.find(_OUTPUT_FENCE) + if first == -1: + return _clean_shell_noise(raw) + + start = first + len(_OUTPUT_FENCE) + last = raw.rfind(_OUTPUT_FENCE) + + if last <= first: + # Only start fence found (e.g. user command called `exit`) + return _clean_shell_noise(raw[start:]) + + return raw[start:last] class LocalEnvironment(BaseEnvironment): @@ -64,8 +120,17 @@ class LocalEnvironment(BaseEnvironment): # -l alone isn't enough: .profile sources .bashrc, but the guard # returns early because the shell isn't interactive. user_shell = os.environ.get("SHELL") or shutil.which("bash") or "/bin/bash" + # Wrap with output fences so we can later extract the real + # command output and discard shell init/exit noise. + fenced_cmd = ( + f"printf '{_OUTPUT_FENCE}';" + f" {exec_command};" + f" __hermes_rc=$?;" + f" printf '{_OUTPUT_FENCE}';" + f" exit $__hermes_rc" + ) proc = subprocess.Popen( - [user_shell, "-lic", exec_command], + [user_shell, "-lic", fenced_cmd], text=True, cwd=work_dir, env=os.environ | self.env, @@ -130,7 +195,7 @@ class LocalEnvironment(BaseEnvironment): time.sleep(0.2) reader.join(timeout=5) - output = _clean_shell_noise("".join(_output_chunks)) + output = _extract_fenced_output("".join(_output_chunks)) return {"output": output, "returncode": proc.returncode} except Exception as e: diff --git a/tools/file_operations.py b/tools/file_operations.py index 3649b9ef0..53ebe6346 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -848,8 +848,8 @@ class ShellFileOperations(FileOperations): result = self._exec(cmd, timeout=60) - if result.exit_code != 0 and not result.stdout.strip(): - # Try without -printf (BSD find compatibility) + if not result.stdout.strip(): + # Try without -printf (BSD find compatibility -- macOS) cmd_simple = f"find {self._escape_shell_arg(path)} -type f -name {self._escape_shell_arg(search_pattern)} " \ f"2>/dev/null | head -n {limit + offset} | tail -n +{offset + 1}" result = self._exec(cmd_simple, timeout=60)