From 934fbe3c06174d5b71b02c48c9f099e83c3f1e8e Mon Sep 17 00:00:00 2001 From: Teknium Date: Mon, 23 Mar 2026 06:50:39 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20strip=20ANSI=20at=20the=20source=20?= =?UTF-8?q?=E2=80=94=20clean=20terminal=20output=20before=20it=20reaches?= =?UTF-8?q?=20the=20model?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: terminal_tool, execute_code, and process_registry returned raw subprocess output with ANSI escape sequences intact. The model saw these in tool results and copied them into file writes. Previous fix (PR #2532) stripped ANSI at the write point in file_tools.py, but this was a band-aid — regex on file content risks corrupting legitimate content, and doesn't prevent ANSI from wasting tokens in the model context. Source-level fix: - New tools/ansi_strip.py with comprehensive ECMA-48 regex covering CSI (incl. private-mode, colon-separated, intermediate bytes), OSC (both terminators), DCS/SOS/PM/APC strings, Fp/Fe/Fs/nF escapes, 8-bit C1 - terminal_tool.py: strip output before returning to model - code_execution_tool.py: strip stdout/stderr before returning - process_registry.py: strip output in poll/read_log/wait - file_tools.py: remove _strip_ansi band-aid (no longer needed) Verified: `ls --color=always` output returned as clean text to model, file written from that output contains zero ESC bytes. --- tests/tools/test_ansi_strip.py | 168 +++++++++++++++++++++++++++++++++ tests/tools/test_file_tools.py | 3 + tools/ansi_strip.py | 44 +++++++++ tools/code_execution_tool.py | 6 ++ tools/file_tools.py | 16 ---- tools/process_registry.py | 15 ++- tools/terminal_tool.py | 5 + 7 files changed, 236 insertions(+), 21 deletions(-) create mode 100644 tests/tools/test_ansi_strip.py create mode 100644 tools/ansi_strip.py diff --git a/tests/tools/test_ansi_strip.py b/tests/tools/test_ansi_strip.py new file mode 100644 index 000000000..d1585c92b --- /dev/null +++ b/tests/tools/test_ansi_strip.py @@ -0,0 +1,168 @@ +"""Comprehensive tests for ANSI escape sequence stripping (ECMA-48). + +The strip_ansi function in tools/ansi_strip.py is the source-level fix for +ANSI codes leaking into the model's context via terminal/execute_code output. +It must strip ALL terminal escape sequences while preserving legitimate text. +""" + +from tools.ansi_strip import strip_ansi + + +class TestStripAnsiBasicSGR: + """Select Graphic Rendition — the most common ANSI sequences.""" + + def test_reset(self): + assert strip_ansi("\x1b[0m") == "" + + def test_color(self): + assert strip_ansi("\x1b[31;1m") == "" + + def test_truecolor_semicolon(self): + assert strip_ansi("\x1b[38;2;255;0;0m") == "" + + def test_truecolor_colon_separated(self): + """Modern terminals use colon-separated SGR params.""" + assert strip_ansi("\x1b[38:2:255:0:0m") == "" + assert strip_ansi("\x1b[48:2:0:255:0m") == "" + + +class TestStripAnsiCSIPrivateMode: + """CSI sequences with ? prefix (DEC private modes).""" + + def test_cursor_show_hide(self): + assert strip_ansi("\x1b[?25h") == "" + assert strip_ansi("\x1b[?25l") == "" + + def test_alt_screen(self): + assert strip_ansi("\x1b[?1049h") == "" + assert strip_ansi("\x1b[?1049l") == "" + + def test_bracketed_paste(self): + assert strip_ansi("\x1b[?2004h") == "" + + +class TestStripAnsiCSIIntermediate: + """CSI sequences with intermediate bytes (space, etc.).""" + + def test_cursor_shape(self): + assert strip_ansi("\x1b[0 q") == "" + assert strip_ansi("\x1b[2 q") == "" + assert strip_ansi("\x1b[6 q") == "" + + +class TestStripAnsiOSC: + """Operating System Command sequences.""" + + def test_bel_terminator(self): + assert strip_ansi("\x1b]0;title\x07") == "" + + def test_st_terminator(self): + assert strip_ansi("\x1b]0;title\x1b\\") == "" + + def test_hyperlink_preserves_text(self): + assert strip_ansi( + "\x1b]8;;https://example.com\x1b\\click\x1b]8;;\x1b\\" + ) == "click" + + +class TestStripAnsiDECPrivate: + """DEC private / Fp escape sequences.""" + + def test_save_restore_cursor(self): + assert strip_ansi("\x1b7") == "" + assert strip_ansi("\x1b8") == "" + + def test_keypad_modes(self): + assert strip_ansi("\x1b=") == "" + assert strip_ansi("\x1b>") == "" + + +class TestStripAnsiFe: + """Fe (C1 as 7-bit) escape sequences.""" + + def test_reverse_index(self): + assert strip_ansi("\x1bM") == "" + + def test_reset_terminal(self): + assert strip_ansi("\x1bc") == "" + + def test_index_and_newline(self): + assert strip_ansi("\x1bD") == "" + assert strip_ansi("\x1bE") == "" + + +class TestStripAnsiNF: + """nF (character set selection) sequences.""" + + def test_charset_selection(self): + assert strip_ansi("\x1b(A") == "" + assert strip_ansi("\x1b(B") == "" + assert strip_ansi("\x1b(0") == "" + + +class TestStripAnsiDCS: + """Device Control String sequences.""" + + def test_dcs(self): + assert strip_ansi("\x1bP+q\x1b\\") == "" + + +class TestStripAnsi8BitC1: + """8-bit C1 control characters.""" + + def test_8bit_csi(self): + assert strip_ansi("\x9b31m") == "" + assert strip_ansi("\x9b38;2;255;0;0m") == "" + + def test_8bit_standalone(self): + assert strip_ansi("\x9c") == "" + assert strip_ansi("\x9d") == "" + assert strip_ansi("\x90") == "" + + +class TestStripAnsiRealWorld: + """Real-world contamination scenarios from bug reports.""" + + def test_colored_shebang(self): + """The original reported bug: shebang corrupted by color codes.""" + assert strip_ansi( + "\x1b[32m#!/usr/bin/env python3\x1b[0m\nprint('hello')" + ) == "#!/usr/bin/env python3\nprint('hello')" + + def test_stacked_sgr(self): + assert strip_ansi( + "\x1b[1m\x1b[31m\x1b[42mhello\x1b[0m" + ) == "hello" + + def test_ansi_mid_code(self): + assert strip_ansi( + "def foo(\x1b[33m):\x1b[0m\n return 42" + ) == "def foo():\n return 42" + + +class TestStripAnsiPassthrough: + """Clean content must pass through unmodified.""" + + def test_plain_text(self): + assert strip_ansi("normal text") == "normal text" + + def test_empty(self): + assert strip_ansi("") == "" + + def test_none(self): + assert strip_ansi(None) is None + + def test_whitespace_preserved(self): + assert strip_ansi("line1\nline2\ttab") == "line1\nline2\ttab" + + def test_unicode_safe(self): + assert strip_ansi("emoji 🎉 and ñ café") == "emoji 🎉 and ñ café" + + def test_backslash_in_code(self): + code = "path = 'C:\\\\Users\\\\test'" + assert strip_ansi(code) == code + + def test_square_brackets_in_code(self): + """Array indexing must not be confused with CSI.""" + code = "arr[0] = arr[31]" + assert strip_ansi(code) == code diff --git a/tests/tools/test_file_tools.py b/tests/tools/test_file_tools.py index 2bf141580..067393273 100644 --- a/tests/tools/test_file_tools.py +++ b/tests/tools/test_file_tools.py @@ -309,3 +309,6 @@ class TestSearchHints: raw = search_tool(pattern="foo", offset=50, limit=50) assert "[Hint:" in raw assert "offset=100" in raw + + + diff --git a/tools/ansi_strip.py b/tools/ansi_strip.py new file mode 100644 index 000000000..b1cfb8ece --- /dev/null +++ b/tools/ansi_strip.py @@ -0,0 +1,44 @@ +"""Strip ANSI escape sequences from subprocess output. + +Used by terminal_tool, code_execution_tool, and process_registry to clean +command output before returning it to the model. This prevents ANSI codes +from entering the model's context — which is the root cause of models +copying escape sequences into file writes. + +Covers the full ECMA-48 spec: CSI (including private-mode ``?`` prefix, +colon-separated params, intermediate bytes), OSC (BEL and ST terminators), +DCS/SOS/PM/APC string sequences, nF multi-byte escapes, Fp/Fe/Fs +single-byte escapes, and 8-bit C1 control characters. +""" + +import re + +_ANSI_ESCAPE_RE = re.compile( + r"\x1b" + r"(?:" + r"\[[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]" # CSI sequence + r"|\][\s\S]*?(?:\x07|\x1b\\)" # OSC (BEL or ST terminator) + r"|[PX^_][\s\S]*?(?:\x1b\\)" # DCS/SOS/PM/APC strings + r"|[\x20-\x2f]+[\x30-\x7e]" # nF escape sequences + r"|[\x30-\x7e]" # Fp/Fe/Fs single-byte + r")" + r"|\x9b[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]" # 8-bit CSI + r"|\x9d[\s\S]*?(?:\x07|\x9c)" # 8-bit OSC + r"|[\x80-\x9f]", # Other 8-bit C1 controls + re.DOTALL, +) + +# Fast-path check — skip full regex when no escape-like bytes are present. +_HAS_ESCAPE = re.compile(r"[\x1b\x80-\x9f]") + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape sequences from text. + + Returns the input unchanged (fast path) when no ESC or C1 bytes are + present. Safe to call on any string — clean text passes through + with negligible overhead. + """ + if not text or not _HAS_ESCAPE.search(text): + return text + return _ANSI_ESCAPE_RE.sub("", text) diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index 5d3a80ae7..8ee88ae32 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -577,6 +577,12 @@ def execute_code( server_sock = None # prevent double close in finally rpc_thread.join(timeout=3) + # Strip ANSI escape sequences so the model never sees terminal + # formatting — prevents it from copying escapes into file writes. + from tools.ansi_strip import strip_ansi + stdout_text = strip_ansi(stdout_text) + stderr_text = strip_ansi(stderr_text) + # Build response result: Dict[str, Any] = { "status": status, diff --git a/tools/file_tools.py b/tools/file_tools.py index b192c2265..3f3c6812d 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -5,7 +5,6 @@ import errno import json import logging import os -import re import threading from typing import Optional from tools.file_operations import ShellFileOperations @@ -13,17 +12,6 @@ from agent.redact import redact_sensitive_text logger = logging.getLogger(__name__) -# Regex to match ANSI escape sequences (CSI codes, OSC codes, simple escapes). -# Models occasionally copy these from terminal output into file content. -_ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|\x1b\][^\x07]*\x07|\x1b[()][A-B012]|\x1b[=>]") - - -def _strip_ansi(text: str) -> str: - """Remove ANSI escape sequences from text destined for file writes.""" - if not text or "\x1b" not in text: - return text - return _ANSI_ESCAPE_RE.sub("", text) - _EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS} @@ -301,7 +289,6 @@ def notify_other_tool_call(task_id: str = "default"): def write_file_tool(path: str, content: str, task_id: str = "default") -> str: """Write content to a file.""" try: - content = _strip_ansi(content) file_ops = _get_file_ops(task_id) result = file_ops.write_file(path, content) return json.dumps(result.to_dict(), ensure_ascii=False) @@ -325,13 +312,10 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, return json.dumps({"error": "path required"}) if old_string is None or new_string is None: return json.dumps({"error": "old_string and new_string required"}) - old_string = _strip_ansi(old_string) - new_string = _strip_ansi(new_string) result = file_ops.patch_replace(path, old_string, new_string, replace_all) elif mode == "patch": if not patch: return json.dumps({"error": "patch content required"}) - patch = _strip_ansi(patch) result = file_ops.patch_v4a(patch) else: return json.dumps({"error": f"Unknown mode: {mode}"}) diff --git a/tools/process_registry.py b/tools/process_registry.py index c6ee9ceb6..759a940f7 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -426,12 +426,14 @@ class ProcessRegistry: def poll(self, session_id: str) -> dict: """Check status and get new output for a background process.""" + from tools.ansi_strip import strip_ansi + session = self.get(session_id) if session is None: return {"status": "not_found", "error": f"No process with ID {session_id}"} with session._lock: - output_preview = session.output_buffer[-1000:] if session.output_buffer else "" + output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else "" result = { "session_id": session.id, @@ -450,12 +452,14 @@ class ProcessRegistry: def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict: """Read the full output log with optional pagination by lines.""" + from tools.ansi_strip import strip_ansi + session = self.get(session_id) if session is None: return {"status": "not_found", "error": f"No process with ID {session_id}"} with session._lock: - full_output = session.output_buffer + full_output = strip_ansi(session.output_buffer) lines = full_output.splitlines() total_lines = len(lines) @@ -486,6 +490,7 @@ class ProcessRegistry: dict with status ("exited", "timeout", "interrupted", "not_found") and output snapshot. """ + from tools.ansi_strip import strip_ansi from tools.terminal_tool import _interrupt_event default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180")) @@ -513,7 +518,7 @@ class ProcessRegistry: result = { "status": "exited", "exit_code": session.exit_code, - "output": session.output_buffer[-2000:], + "output": strip_ansi(session.output_buffer[-2000:]), } if timeout_note: result["timeout_note"] = timeout_note @@ -522,7 +527,7 @@ class ProcessRegistry: if _interrupt_event.is_set(): result = { "status": "interrupted", - "output": session.output_buffer[-1000:], + "output": strip_ansi(session.output_buffer[-1000:]), "note": "User sent a new message -- wait interrupted", } if timeout_note: @@ -533,7 +538,7 @@ class ProcessRegistry: result = { "status": "timeout", - "output": session.output_buffer[-1000:], + "output": strip_ansi(session.output_buffer[-1000:]), } if timeout_note: result["timeout_note"] = timeout_note diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 0076b6d0a..6d1dcebea 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -1163,6 +1163,11 @@ def terminal_tool( ) output = output[:head_chars] + truncated_notice + output[-tail_chars:] + # Strip ANSI escape sequences so the model never sees terminal + # formatting — prevents it from copying escapes into file writes. + from tools.ansi_strip import strip_ansi + output = strip_ansi(output) + # Redact secrets from command output (catches env/printenv leaking keys) from agent.redact import redact_sensitive_text output = redact_sensitive_text(output.strip()) if output else ""