#!/usr/bin/env python3 """ Tests for the read-loop detection mechanism in file_tools. Verifies that: 1. Re-reading the same file region produces a warning 2. Different regions/files don't trigger false warnings 3. Task isolation works (different tasks have separate trackers) 4. get_read_files_summary returns accurate history 5. clear_read_tracker resets state 6. Context compression injects file-read history Run with: python -m pytest tests/tools/test_read_loop_detection.py -v """ import json import unittest from unittest.mock import patch, MagicMock from tools.file_tools import ( read_file_tool, get_read_files_summary, clear_read_tracker, _read_tracker, ) class _FakeReadResult: """Minimal stand-in for FileOperations.read_file return value.""" def __init__(self, content="line1\nline2\n", total_lines=2): self._content = content self._total_lines = total_lines def to_dict(self): return {"content": self._content, "total_lines": self._total_lines} def _fake_read_file(path, offset=1, limit=500): return _FakeReadResult(content=f"content of {path}", total_lines=10) def _make_fake_file_ops(): fake = MagicMock() fake.read_file = _fake_read_file return fake class TestReadLoopDetection(unittest.TestCase): """Verify that read_file_tool detects and warns on re-reads.""" def setUp(self): clear_read_tracker() def tearDown(self): clear_read_tracker() @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_first_read_has_no_warning(self, _mock_ops): result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) self.assertNotIn("_warning", result) self.assertIn("content", result) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_second_read_same_region_has_warning(self, _mock_ops): read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") result = json.loads( read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") ) self.assertIn("_warning", result) self.assertIn("already read", result["_warning"]) self.assertIn("2 times", result["_warning"]) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_third_read_increments_count(self, _mock_ops): for _ in range(2): read_file_tool("/tmp/test.py", task_id="t1") result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) self.assertIn("3 times", result["_warning"]) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_different_region_no_warning(self, _mock_ops): read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") result = json.loads( read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1") ) self.assertNotIn("_warning", result) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_different_file_no_warning(self, _mock_ops): read_file_tool("/tmp/a.py", task_id="t1") result = json.loads(read_file_tool("/tmp/b.py", task_id="t1")) self.assertNotIn("_warning", result) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_different_tasks_isolated(self, _mock_ops): read_file_tool("/tmp/test.py", task_id="task_a") result = json.loads( read_file_tool("/tmp/test.py", task_id="task_b") ) self.assertNotIn("_warning", result) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_warning_still_returns_content(self, _mock_ops): """Even with a warning, the file content is still returned.""" read_file_tool("/tmp/test.py", task_id="t1") result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) self.assertIn("_warning", result) self.assertIn("content", result) self.assertIn("content of /tmp/test.py", result["content"]) class TestReadFilesSummary(unittest.TestCase): """Verify get_read_files_summary returns accurate file-read history.""" def setUp(self): clear_read_tracker() def tearDown(self): clear_read_tracker() @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_empty_when_no_reads(self, _mock_ops): summary = get_read_files_summary("t1") self.assertEqual(summary, []) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_single_file_single_region(self, _mock_ops): read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") summary = get_read_files_summary("t1") self.assertEqual(len(summary), 1) self.assertEqual(summary[0]["path"], "/tmp/test.py") self.assertIn("lines 1-500", summary[0]["regions"]) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_single_file_multiple_regions(self, _mock_ops): read_file_tool("/tmp/test.py", offset=1, limit=500, task_id="t1") read_file_tool("/tmp/test.py", offset=501, limit=500, task_id="t1") summary = get_read_files_summary("t1") self.assertEqual(len(summary), 1) self.assertEqual(len(summary[0]["regions"]), 2) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_multiple_files(self, _mock_ops): read_file_tool("/tmp/a.py", task_id="t1") read_file_tool("/tmp/b.py", task_id="t1") summary = get_read_files_summary("t1") self.assertEqual(len(summary), 2) paths = [s["path"] for s in summary] self.assertIn("/tmp/a.py", paths) self.assertIn("/tmp/b.py", paths) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_different_task_has_separate_summary(self, _mock_ops): read_file_tool("/tmp/a.py", task_id="task_a") read_file_tool("/tmp/b.py", task_id="task_b") summary_a = get_read_files_summary("task_a") summary_b = get_read_files_summary("task_b") self.assertEqual(len(summary_a), 1) self.assertEqual(summary_a[0]["path"], "/tmp/a.py") self.assertEqual(len(summary_b), 1) self.assertEqual(summary_b[0]["path"], "/tmp/b.py") class TestClearReadTracker(unittest.TestCase): """Verify clear_read_tracker resets state properly.""" def setUp(self): clear_read_tracker() def tearDown(self): clear_read_tracker() @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_clear_specific_task(self, _mock_ops): read_file_tool("/tmp/test.py", task_id="t1") read_file_tool("/tmp/test.py", task_id="t2") clear_read_tracker("t1") self.assertEqual(get_read_files_summary("t1"), []) self.assertEqual(len(get_read_files_summary("t2")), 1) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_clear_all(self, _mock_ops): read_file_tool("/tmp/test.py", task_id="t1") read_file_tool("/tmp/test.py", task_id="t2") clear_read_tracker() self.assertEqual(get_read_files_summary("t1"), []) self.assertEqual(get_read_files_summary("t2"), []) @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_clear_then_reread_no_warning(self, _mock_ops): read_file_tool("/tmp/test.py", task_id="t1") clear_read_tracker("t1") result = json.loads(read_file_tool("/tmp/test.py", task_id="t1")) self.assertNotIn("_warning", result) class TestCompressionFileHistory(unittest.TestCase): """Verify that _compress_context injects file-read history.""" def setUp(self): clear_read_tracker() def tearDown(self): clear_read_tracker() @patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops()) def test_compress_context_includes_read_files(self, _mock_ops): """After reading files, _compress_context should inject a message listing which files were already read.""" # Simulate reads read_file_tool("/tmp/foo.py", offset=1, limit=100, task_id="compress_test") read_file_tool("/tmp/bar.py", offset=1, limit=200, task_id="compress_test") # Build minimal messages for compression (need enough messages) messages = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Analyze the codebase."}, {"role": "assistant", "content": "I'll read the files."}, {"role": "user", "content": "Continue."}, {"role": "assistant", "content": "Reading more files."}, {"role": "user", "content": "What did you find?"}, {"role": "assistant", "content": "Here are my findings."}, {"role": "user", "content": "Great, write the fix."}, {"role": "assistant", "content": "Working on it."}, {"role": "user", "content": "Status?"}, ] # Mock the compressor to return a simple compression mock_compressor = MagicMock() mock_compressor.compress.return_value = [ messages[0], # system messages[1], # first user {"role": "user", "content": "[CONTEXT SUMMARY]: Files were analyzed."}, messages[-1], # last user ] mock_compressor.last_prompt_tokens = 5000 # Mock the agent's _compress_context dependencies mock_agent = MagicMock() mock_agent.context_compressor = mock_compressor mock_agent._todo_store.format_for_injection.return_value = None mock_agent._session_db = None mock_agent.quiet_mode = True mock_agent._invalidate_system_prompt = MagicMock() mock_agent._build_system_prompt = MagicMock(return_value="system prompt") mock_agent._cached_system_prompt = None # Call the real _compress_context from run_agent import AIAgent result, _ = AIAgent._compress_context( mock_agent, messages, "system prompt", approx_tokens=5000, task_id="compress_test", ) # Find the injected file-read history message file_history_msgs = [ m for m in result if isinstance(m.get("content"), str) and "already read" in m.get("content", "").lower() ] self.assertEqual(len(file_history_msgs), 1, "Should inject exactly one file-read history message") history_content = file_history_msgs[0]["content"] self.assertIn("/tmp/foo.py", history_content) self.assertIn("/tmp/bar.py", history_content) self.assertIn("do NOT re-read", history_content) if __name__ == "__main__": unittest.main()