forked from Rockachopa/Timmy-time-dashboard
Extract 5 focused helpers from the 138-line capture_error(): - _extract_origin(): walk traceback for file/line - _log_error_event(): log to event log (best-effort) - _create_bug_report(): create task and log creation event - _send_error_notification(): push notification - _record_to_session(): forward to session recorder capture_error() now orchestrates the helpers in ~25 lines. Added tests for each new helper. Fixes #506 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
285 lines
8.9 KiB
Python
285 lines
8.9 KiB
Python
"""Tests for infrastructure.error_capture module."""
|
|
|
|
import sqlite3
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import patch
|
|
|
|
from infrastructure.error_capture import (
|
|
_create_bug_report,
|
|
_dedup_cache,
|
|
_extract_origin,
|
|
_get_git_context,
|
|
_is_duplicate,
|
|
_log_error_event,
|
|
_record_to_session,
|
|
_send_error_notification,
|
|
_stack_hash,
|
|
capture_error,
|
|
)
|
|
|
|
|
|
def _make_exception():
|
|
"""Helper that always raises from the same line for stable hashing."""
|
|
raise ValueError("test error")
|
|
|
|
|
|
class TestStackHash:
|
|
"""Test _stack_hash produces stable hashes."""
|
|
|
|
def test_hash_is_deterministic_for_same_exception(self):
|
|
"""Same exception object always produces the same hash."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
hash1 = _stack_hash(e)
|
|
hash2 = _stack_hash(e)
|
|
assert hash1 == hash2
|
|
|
|
def test_different_exception_types_differ(self):
|
|
try:
|
|
raise ValueError("x")
|
|
except ValueError as e1:
|
|
hash1 = _stack_hash(e1)
|
|
|
|
try:
|
|
raise TypeError("x")
|
|
except TypeError as e2:
|
|
hash2 = _stack_hash(e2)
|
|
|
|
assert hash1 != hash2
|
|
|
|
def test_hash_is_hex_string(self):
|
|
try:
|
|
raise RuntimeError("test")
|
|
except RuntimeError as e:
|
|
h = _stack_hash(e)
|
|
assert len(h) == 16
|
|
assert all(c in "0123456789abcdef" for c in h)
|
|
|
|
|
|
class TestIsDuplicate:
|
|
"""Test deduplication logic."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_first_occurrence_not_duplicate(self):
|
|
assert _is_duplicate("hash_abc") is False
|
|
|
|
def test_second_occurrence_is_duplicate(self):
|
|
_is_duplicate("hash_dup")
|
|
assert _is_duplicate("hash_dup") is True
|
|
|
|
def test_different_hashes_not_duplicates(self):
|
|
_is_duplicate("hash_1")
|
|
assert _is_duplicate("hash_2") is False
|
|
|
|
def test_stale_entries_pruned(self):
|
|
"""Old entries beyond 2x the dedup window should be pruned."""
|
|
from config import settings
|
|
|
|
window = settings.error_dedup_window_seconds
|
|
# Seed a stale entry far in the past
|
|
stale_time = datetime.now(UTC) - timedelta(seconds=window * 3)
|
|
_dedup_cache["stale_hash"] = stale_time
|
|
|
|
# Calling _is_duplicate on a new hash triggers pruning
|
|
_is_duplicate("fresh_hash")
|
|
|
|
assert "stale_hash" not in _dedup_cache
|
|
assert "fresh_hash" in _dedup_cache
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
|
|
class TestGetGitContext:
|
|
"""Test _get_git_context."""
|
|
|
|
def test_returns_dict_with_branch_and_commit(self):
|
|
"""Git context always returns a dict with branch and commit keys."""
|
|
ctx = _get_git_context()
|
|
assert "branch" in ctx
|
|
assert "commit" in ctx
|
|
assert isinstance(ctx["branch"], str)
|
|
assert isinstance(ctx["commit"], str)
|
|
|
|
def test_git_context_fallback_on_failure(self):
|
|
"""When subprocess.run fails, returns 'unknown' for both fields."""
|
|
with patch("subprocess.run", side_effect=OSError("git not found")):
|
|
ctx = _get_git_context()
|
|
assert ctx == {"branch": "unknown", "commit": "unknown"}
|
|
|
|
|
|
class TestCaptureError:
|
|
"""Test the main capture_error function."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_duplicate_returns_none(self):
|
|
"""Second call with same exception is deduplicated."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
# First call
|
|
capture_error(e, source="test")
|
|
# Second call — same hash, within dedup window
|
|
result = capture_error(e, source="test")
|
|
assert result is None
|
|
|
|
def test_returns_none_when_feedback_disabled(self):
|
|
"""capture_error returns None immediately when error_feedback_enabled is False."""
|
|
_dedup_cache.clear()
|
|
|
|
import config
|
|
|
|
original = config.settings.error_feedback_enabled
|
|
try:
|
|
config.settings.error_feedback_enabled = False
|
|
try:
|
|
raise RuntimeError("disabled test")
|
|
except RuntimeError as e:
|
|
result = capture_error(e, source="test")
|
|
assert result is None
|
|
finally:
|
|
config.settings.error_feedback_enabled = original
|
|
|
|
def test_capture_does_not_crash_on_missing_deps(self):
|
|
"""capture_error should never crash even if optional deps are missing."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise OSError("graceful test")
|
|
except OSError as e:
|
|
# Should not raise even though swarm/event_log don't exist
|
|
capture_error(e, source="graceful")
|
|
|
|
def test_capture_with_context_does_not_crash(self):
|
|
"""capture_error with context dict should not crash."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise RuntimeError("context test")
|
|
except RuntimeError as e:
|
|
capture_error(e, source="test_module", context={"path": "/api/foo"})
|
|
|
|
def test_capture_creates_task_in_temp_db(self):
|
|
"""capture_error should write a task to the isolated temp DB, not production.
|
|
|
|
This validates that conftest.clean_database properly redirects
|
|
swarm.task_queue.models.DB_PATH to the per-test temp directory.
|
|
"""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise ValueError("Test error for DB isolation check")
|
|
except ValueError as e:
|
|
task_id = capture_error(e, source="test")
|
|
|
|
# task_id can be None if error_feedback_enabled is False — that's fine,
|
|
# but if it was created, verify it landed in the temp DB
|
|
if task_id:
|
|
import swarm.task_queue.models as tq_models
|
|
|
|
db_path = tq_models.DB_PATH
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT id, title FROM tasks WHERE id = ?", (task_id,)
|
|
).fetchone()
|
|
assert row is not None, (
|
|
f"Task {task_id} not found in DB at {db_path} — "
|
|
"check conftest.clean_database patches swarm.task_queue.models.DB_PATH"
|
|
)
|
|
assert "[BUG]" in row[1]
|
|
finally:
|
|
conn.close()
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
|
|
class TestExtractOrigin:
|
|
"""Test _extract_origin helper."""
|
|
|
|
def test_returns_file_and_line(self):
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
filename, lineno = _extract_origin(e)
|
|
assert filename.endswith("test_error_capture.py")
|
|
assert lineno > 0
|
|
|
|
def test_no_traceback_returns_defaults(self):
|
|
exc = ValueError("no tb")
|
|
exc.__traceback__ = None
|
|
assert _extract_origin(exc) == ("unknown", 0)
|
|
|
|
|
|
class TestLogErrorEvent:
|
|
"""Test _log_error_event helper."""
|
|
|
|
def test_does_not_crash_when_event_log_missing(self):
|
|
try:
|
|
raise RuntimeError("log test")
|
|
except RuntimeError as e:
|
|
_log_error_event(e, "test", "abc123", "file.py", 42, {})
|
|
|
|
|
|
class TestCreateBugReport:
|
|
"""Test _create_bug_report helper."""
|
|
|
|
def test_returns_none_on_import_failure(self):
|
|
try:
|
|
raise RuntimeError("report test")
|
|
except RuntimeError as e:
|
|
with patch("infrastructure.error_capture.logger"):
|
|
result = _create_bug_report(e, "test", "abc", "f.py", 1, {}, "tb", None)
|
|
# Returns a task id or None depending on whether swarm is available
|
|
assert result is None or isinstance(result, str)
|
|
|
|
|
|
class TestSendErrorNotification:
|
|
"""Test _send_error_notification helper."""
|
|
|
|
def test_does_not_crash_on_notifier_failure(self):
|
|
try:
|
|
raise RuntimeError("notify test")
|
|
except RuntimeError as e:
|
|
_send_error_notification(e, "test")
|
|
|
|
|
|
class TestRecordToSession:
|
|
"""Test _record_to_session helper."""
|
|
|
|
def test_noop_when_no_recorder(self):
|
|
import infrastructure.error_capture as ec
|
|
|
|
original = ec._error_recorder
|
|
try:
|
|
ec._error_recorder = None
|
|
try:
|
|
raise RuntimeError("session test")
|
|
except RuntimeError as e:
|
|
_record_to_session(e, "test") # should not crash
|
|
finally:
|
|
ec._error_recorder = original
|
|
|
|
def test_calls_registered_recorder(self):
|
|
import infrastructure.error_capture as ec
|
|
|
|
original = ec._error_recorder
|
|
calls = []
|
|
try:
|
|
ec._error_recorder = lambda **kwargs: calls.append(kwargs)
|
|
try:
|
|
raise RuntimeError("recorded")
|
|
except RuntimeError as e:
|
|
_record_to_session(e, "src")
|
|
assert len(calls) == 1
|
|
assert "RuntimeError: recorded" in calls[0]["error"]
|
|
assert calls[0]["context"] == "src"
|
|
finally:
|
|
ec._error_recorder = original
|