"""Tests for infrastructure.error_capture module.""" import sqlite3 from datetime import UTC, datetime, timedelta from unittest.mock import patch from infrastructure.error_capture import ( _dedup_cache, _get_git_context, _is_duplicate, _stack_hash, capture_error, ) def _make_exception(): """Helper that always raises from the same line for stable hashing.""" raise ValueError("test error") class TestStackHash: """Test _stack_hash produces stable hashes.""" def test_hash_is_deterministic_for_same_exception(self): """Same exception object always produces the same hash.""" try: _make_exception() except ValueError as e: hash1 = _stack_hash(e) hash2 = _stack_hash(e) assert hash1 == hash2 def test_different_exception_types_differ(self): try: raise ValueError("x") except ValueError as e1: hash1 = _stack_hash(e1) try: raise TypeError("x") except TypeError as e2: hash2 = _stack_hash(e2) assert hash1 != hash2 def test_hash_is_hex_string(self): try: raise RuntimeError("test") except RuntimeError as e: h = _stack_hash(e) assert len(h) == 16 assert all(c in "0123456789abcdef" for c in h) class TestIsDuplicate: """Test deduplication logic.""" def setup_method(self): _dedup_cache.clear() def test_first_occurrence_not_duplicate(self): assert _is_duplicate("hash_abc") is False def test_second_occurrence_is_duplicate(self): _is_duplicate("hash_dup") assert _is_duplicate("hash_dup") is True def test_different_hashes_not_duplicates(self): _is_duplicate("hash_1") assert _is_duplicate("hash_2") is False def test_stale_entries_pruned(self): """Old entries beyond 2x the dedup window should be pruned.""" from config import settings window = settings.error_dedup_window_seconds # Seed a stale entry far in the past stale_time = datetime.now(UTC) - timedelta(seconds=window * 3) _dedup_cache["stale_hash"] = stale_time # Calling _is_duplicate on a new hash triggers pruning _is_duplicate("fresh_hash") assert "stale_hash" not in _dedup_cache assert "fresh_hash" in _dedup_cache def teardown_method(self): _dedup_cache.clear() class TestGetGitContext: """Test _get_git_context.""" def test_returns_dict_with_branch_and_commit(self): """Git context always returns a dict with branch and commit keys.""" ctx = _get_git_context() assert "branch" in ctx assert "commit" in ctx assert isinstance(ctx["branch"], str) assert isinstance(ctx["commit"], str) def test_git_context_fallback_on_failure(self): """When subprocess.run fails, returns 'unknown' for both fields.""" with patch("subprocess.run", side_effect=OSError("git not found")): ctx = _get_git_context() assert ctx == {"branch": "unknown", "commit": "unknown"} class TestCaptureError: """Test the main capture_error function.""" def setup_method(self): _dedup_cache.clear() def test_duplicate_returns_none(self): """Second call with same exception is deduplicated.""" try: _make_exception() except ValueError as e: # First call capture_error(e, source="test") # Second call — same hash, within dedup window result = capture_error(e, source="test") assert result is None def test_returns_none_when_feedback_disabled(self): """capture_error returns None immediately when error_feedback_enabled is False.""" _dedup_cache.clear() import config original = config.settings.error_feedback_enabled try: config.settings.error_feedback_enabled = False try: raise RuntimeError("disabled test") except RuntimeError as e: result = capture_error(e, source="test") assert result is None finally: config.settings.error_feedback_enabled = original def test_capture_does_not_crash_on_missing_deps(self): """capture_error should never crash even if optional deps are missing.""" _dedup_cache.clear() try: raise OSError("graceful test") except OSError as e: # Should not raise even though swarm/event_log don't exist capture_error(e, source="graceful") def test_capture_with_context_does_not_crash(self): """capture_error with context dict should not crash.""" _dedup_cache.clear() try: raise RuntimeError("context test") except RuntimeError as e: capture_error(e, source="test_module", context={"path": "/api/foo"}) def test_capture_creates_task_in_temp_db(self): """capture_error should write a task to the isolated temp DB, not production. This validates that conftest.clean_database properly redirects swarm.task_queue.models.DB_PATH to the per-test temp directory. """ _dedup_cache.clear() try: raise ValueError("Test error for DB isolation check") except ValueError as e: task_id = capture_error(e, source="test") # task_id can be None if error_feedback_enabled is False — that's fine, # but if it was created, verify it landed in the temp DB if task_id: import swarm.task_queue.models as tq_models db_path = tq_models.DB_PATH conn = sqlite3.connect(str(db_path)) try: row = conn.execute( "SELECT id, title FROM tasks WHERE id = ?", (task_id,) ).fetchone() assert row is not None, ( f"Task {task_id} not found in DB at {db_path} — " "check conftest.clean_database patches swarm.task_queue.models.DB_PATH" ) assert "[BUG]" in row[1] finally: conn.close() def teardown_method(self): _dedup_cache.clear()