diff --git a/tests/tools/test_file_staleness.py b/tests/tools/test_file_staleness.py index 46e7aac9f..230493e33 100644 --- a/tests/tools/test_file_staleness.py +++ b/tests/tools/test_file_staleness.py @@ -221,7 +221,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase): _read_tracker["t1"] = { "last_key": None, "consecutive": 0, "read_history": set(), "dedup": {}, - "file_mtimes": {"/tmp/other.py": 12345.0}, + "read_timestamps": {"/tmp/other.py": 12345.0}, } self.assertIsNone(_check_file_staleness("/tmp/x.py", "t1")) @@ -231,7 +231,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase): _read_tracker["t1"] = { "last_key": None, "consecutive": 0, "read_history": set(), "dedup": {}, - "file_mtimes": {"/nonexistent/path": 99999.0}, + "read_timestamps": {"/nonexistent/path": 99999.0}, } # File doesn't exist → stat fails → returns None (let write handle it) self.assertIsNone(_check_file_staleness("/nonexistent/path", "t1")) diff --git a/tools/file_tools.py b/tools/file_tools.py index 07fb86d1a..79a111cb7 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -136,9 +136,12 @@ _file_ops_cache: dict = {} # Used to skip re-reads of unchanged files. Reset on # context compression (the original content is summarised # away so the model needs the full content again). -# "file_mtimes": dict mapping resolved_path → mtime float at last read. -# Used by write_file and patch to detect when a file was -# modified externally between the agent's read and write. +# "read_timestamps": dict mapping resolved_path → modification-time float +# recorded when the file was last read (or written) by +# this task. Used by write_file and patch to detect +# external changes between the agent's read and write. +# Updated after successful writes so consecutive edits +# by the same task don't trigger false warnings. _read_tracker_lock = threading.Lock() _read_tracker: dict = {} @@ -401,7 +404,7 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = try: _mtime_now = os.path.getmtime(resolved_str) task_data["dedup"][dedup_key] = _mtime_now - task_data.setdefault("file_mtimes", {})[resolved_str] = _mtime_now + task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now except OSError: pass # Can't stat — skip tracking for this entry @@ -500,6 +503,24 @@ def notify_other_tool_call(task_id: str = "default"): task_data["consecutive"] = 0 +def _update_read_timestamp(filepath: str, task_id: str) -> None: + """Record the file's current modification time after a successful write. + + Called after write_file and patch so that consecutive edits by the + same task don't trigger false staleness warnings — each write + refreshes the stored timestamp to match the file's new state. + """ + try: + resolved = str(Path(filepath).expanduser().resolve()) + current_mtime = os.path.getmtime(resolved) + except (OSError, ValueError): + return + with _read_tracker_lock: + task_data = _read_tracker.get(task_id) + if task_data is not None: + task_data.setdefault("read_timestamps", {})[resolved] = current_mtime + + def _check_file_staleness(filepath: str, task_id: str) -> str | None: """Check whether a file was modified since the agent last read it. @@ -515,7 +536,7 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None: task_data = _read_tracker.get(task_id) if not task_data: return None - read_mtime = task_data.get("file_mtimes", {}).get(resolved) + read_mtime = task_data.get("read_timestamps", {}).get(resolved) if read_mtime is None: return None # File was never read — nothing to compare against try: @@ -543,6 +564,9 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str: result_dict = result.to_dict() if stale_warning: result_dict["_warning"] = stale_warning + # Refresh the stored timestamp so consecutive writes by this + # task don't trigger false staleness warnings. + _update_read_timestamp(path, task_id) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: if _is_expected_write_exception(e): @@ -594,6 +618,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, result_dict = result.to_dict() if stale_warnings: result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings) + # Refresh stored timestamps for all successfully-patched paths so + # consecutive edits by this task don't trigger false warnings. + if not result_dict.get("error"): + for _p in _paths_to_check: + _update_read_timestamp(_p, task_id) result_json = json.dumps(result_dict, ensure_ascii=False) # Hint when old_string not found — saves iterations where the agent # retries with stale content instead of re-reading the file.