Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
289 lines
9.1 KiB
Python
289 lines
9.1 KiB
Python
"""Tests for infrastructure.error_capture module."""
|
|
|
|
import sqlite3
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import patch
|
|
|
|
from infrastructure.error_capture import (
|
|
_create_bug_report,
|
|
_dedup_cache,
|
|
_extract_traceback_info,
|
|
_get_git_context,
|
|
_is_duplicate,
|
|
_log_error_event,
|
|
_notify_bug_report,
|
|
_record_to_session,
|
|
_stack_hash,
|
|
capture_error,
|
|
)
|
|
|
|
|
|
def _make_exception():
|
|
"""Helper that always raises from the same line for stable hashing."""
|
|
raise ValueError("test error")
|
|
|
|
|
|
class TestStackHash:
|
|
"""Test _stack_hash produces stable hashes."""
|
|
|
|
def test_hash_is_deterministic_for_same_exception(self):
|
|
"""Same exception object always produces the same hash."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
hash1 = _stack_hash(e)
|
|
hash2 = _stack_hash(e)
|
|
assert hash1 == hash2
|
|
|
|
def test_different_exception_types_differ(self):
|
|
try:
|
|
raise ValueError("x")
|
|
except ValueError as e1:
|
|
hash1 = _stack_hash(e1)
|
|
|
|
try:
|
|
raise TypeError("x")
|
|
except TypeError as e2:
|
|
hash2 = _stack_hash(e2)
|
|
|
|
assert hash1 != hash2
|
|
|
|
def test_hash_is_hex_string(self):
|
|
try:
|
|
raise RuntimeError("test")
|
|
except RuntimeError as e:
|
|
h = _stack_hash(e)
|
|
assert len(h) == 16
|
|
assert all(c in "0123456789abcdef" for c in h)
|
|
|
|
|
|
class TestIsDuplicate:
|
|
"""Test deduplication logic."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_first_occurrence_not_duplicate(self):
|
|
assert _is_duplicate("hash_abc") is False
|
|
|
|
def test_second_occurrence_is_duplicate(self):
|
|
_is_duplicate("hash_dup")
|
|
assert _is_duplicate("hash_dup") is True
|
|
|
|
def test_different_hashes_not_duplicates(self):
|
|
_is_duplicate("hash_1")
|
|
assert _is_duplicate("hash_2") is False
|
|
|
|
def test_stale_entries_pruned(self):
|
|
"""Old entries beyond 2x the dedup window should be pruned."""
|
|
from config import settings
|
|
|
|
window = settings.error_dedup_window_seconds
|
|
# Seed a stale entry far in the past
|
|
stale_time = datetime.now(UTC) - timedelta(seconds=window * 3)
|
|
_dedup_cache["stale_hash"] = stale_time
|
|
|
|
# Calling _is_duplicate on a new hash triggers pruning
|
|
_is_duplicate("fresh_hash")
|
|
|
|
assert "stale_hash" not in _dedup_cache
|
|
assert "fresh_hash" in _dedup_cache
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
|
|
class TestGetGitContext:
|
|
"""Test _get_git_context."""
|
|
|
|
def test_returns_dict_with_branch_and_commit(self):
|
|
"""Git context always returns a dict with branch and commit keys."""
|
|
ctx = _get_git_context()
|
|
assert "branch" in ctx
|
|
assert "commit" in ctx
|
|
assert isinstance(ctx["branch"], str)
|
|
assert isinstance(ctx["commit"], str)
|
|
|
|
def test_git_context_fallback_on_failure(self):
|
|
"""When subprocess.run fails, returns 'unknown' for both fields."""
|
|
with patch("subprocess.run", side_effect=OSError("git not found")):
|
|
ctx = _get_git_context()
|
|
assert ctx == {"branch": "unknown", "commit": "unknown"}
|
|
|
|
|
|
class TestCaptureError:
|
|
"""Test the main capture_error function."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_duplicate_returns_none(self):
|
|
"""Second call with same exception is deduplicated."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
# First call
|
|
capture_error(e, source="test")
|
|
# Second call — same hash, within dedup window
|
|
result = capture_error(e, source="test")
|
|
assert result is None
|
|
|
|
def test_returns_none_when_feedback_disabled(self):
|
|
"""capture_error returns None immediately when error_feedback_enabled is False."""
|
|
_dedup_cache.clear()
|
|
|
|
import config
|
|
|
|
original = config.settings.error_feedback_enabled
|
|
try:
|
|
config.settings.error_feedback_enabled = False
|
|
try:
|
|
raise RuntimeError("disabled test")
|
|
except RuntimeError as e:
|
|
result = capture_error(e, source="test")
|
|
assert result is None
|
|
finally:
|
|
config.settings.error_feedback_enabled = original
|
|
|
|
def test_capture_does_not_crash_on_missing_deps(self):
|
|
"""capture_error should never crash even if optional deps are missing."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise OSError("graceful test")
|
|
except OSError as e:
|
|
# Should not raise even though swarm/event_log don't exist
|
|
capture_error(e, source="graceful")
|
|
|
|
def test_capture_with_context_does_not_crash(self):
|
|
"""capture_error with context dict should not crash."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise RuntimeError("context test")
|
|
except RuntimeError as e:
|
|
capture_error(e, source="test_module", context={"path": "/api/foo"})
|
|
|
|
def test_capture_creates_task_in_temp_db(self):
|
|
"""capture_error should write a task to the isolated temp DB, not production.
|
|
|
|
This validates that conftest.clean_database properly redirects
|
|
swarm.task_queue.models.DB_PATH to the per-test temp directory.
|
|
"""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise ValueError("Test error for DB isolation check")
|
|
except ValueError as e:
|
|
task_id = capture_error(e, source="test")
|
|
|
|
# task_id can be None if error_feedback_enabled is False — that's fine,
|
|
# but if it was created, verify it landed in the temp DB
|
|
if task_id:
|
|
import swarm.task_queue.models as tq_models
|
|
|
|
db_path = tq_models.DB_PATH
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT id, title FROM tasks WHERE id = ?", (task_id,)
|
|
).fetchone()
|
|
assert row is not None, (
|
|
f"Task {task_id} not found in DB at {db_path} — "
|
|
"check conftest.clean_database patches swarm.task_queue.models.DB_PATH"
|
|
)
|
|
assert "[BUG]" in row[1]
|
|
finally:
|
|
conn.close()
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
|
|
class TestExtractTracebackInfo:
|
|
"""Test _extract_traceback_info helper."""
|
|
|
|
def test_returns_three_tuple(self):
|
|
try:
|
|
raise ValueError("extract test")
|
|
except ValueError as e:
|
|
tb_str, affected_file, affected_line = _extract_traceback_info(e)
|
|
assert "ValueError" in tb_str
|
|
assert "extract test" in tb_str
|
|
assert affected_file.endswith(".py")
|
|
assert affected_line > 0
|
|
|
|
def test_file_points_to_raise_site(self):
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
_, affected_file, _ = _extract_traceback_info(e)
|
|
assert "test_error_capture" in affected_file
|
|
|
|
|
|
class TestLogErrorEvent:
|
|
"""Test _log_error_event helper."""
|
|
|
|
def test_does_not_crash_on_missing_deps(self):
|
|
try:
|
|
raise RuntimeError("log test")
|
|
except RuntimeError as e:
|
|
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
|
|
|
|
|
|
class TestCreateBugReport:
|
|
"""Test _create_bug_report helper."""
|
|
|
|
def test_does_not_crash_on_missing_deps(self):
|
|
try:
|
|
raise RuntimeError("report test")
|
|
except RuntimeError as e:
|
|
result = _create_bug_report(
|
|
e, "test", None, "abc123", "traceback...", "file.py", 42, {}
|
|
)
|
|
# May return None if swarm deps unavailable — that's fine
|
|
assert result is None or isinstance(result, str)
|
|
|
|
def test_with_context(self):
|
|
try:
|
|
raise RuntimeError("ctx test")
|
|
except RuntimeError as e:
|
|
result = _create_bug_report(e, "test", {"path": "/api"}, "abc", "tb", "f.py", 1, {})
|
|
assert result is None or isinstance(result, str)
|
|
|
|
|
|
class TestNotifyBugReport:
|
|
"""Test _notify_bug_report helper."""
|
|
|
|
def test_does_not_crash(self):
|
|
try:
|
|
raise RuntimeError("notify test")
|
|
except RuntimeError as e:
|
|
_notify_bug_report(e, "test")
|
|
|
|
|
|
class TestRecordToSession:
|
|
"""Test _record_to_session helper."""
|
|
|
|
def test_does_not_crash_without_recorder(self):
|
|
try:
|
|
raise RuntimeError("session test")
|
|
except RuntimeError as e:
|
|
_record_to_session(e, "test")
|
|
|
|
def test_calls_registered_recorder(self):
|
|
from infrastructure.error_capture import register_error_recorder
|
|
|
|
calls = []
|
|
register_error_recorder(lambda **kwargs: calls.append(kwargs))
|
|
try:
|
|
try:
|
|
raise RuntimeError("callback test")
|
|
except RuntimeError as e:
|
|
_record_to_session(e, "test_source")
|
|
assert len(calls) == 1
|
|
assert "RuntimeError" in calls[0]["error"]
|
|
assert calls[0]["context"] == "test_source"
|
|
finally:
|
|
register_error_recorder(None)
|