This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/tests/infrastructure/test_error_capture.py
Trip T ea2dbdb4b5 fix: test DB isolation, Discord recovery, and over-mocked tests
Test data was bleeding into production tasks.db because
swarm.task_queue.models.DB_PATH (relative path) was never patched in
conftest.clean_database. Fixed by switching to absolute paths via
settings.repo_root and adding the missing module to the patching list.

Discord bot could leak orphaned clients on retry after ERROR state.
Added _cleanup_stale() to close stale client/task before each start()
attempt, with improved logging in the token watcher.

Rewrote test_paperclip_client.py to use httpx.MockTransport instead of
patching _get/_post/_delete — tests now exercise real HTTP status codes,
error handling, and JSON parsing. Added end-to-end test for
capture_error → create_task DB isolation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 20:33:59 -04:00

156 lines
4.7 KiB
Python

"""Tests for infrastructure.error_capture module."""
import sqlite3
from infrastructure.error_capture import (
_dedup_cache,
_get_git_context,
_is_duplicate,
_stack_hash,
capture_error,
)
def _make_exception():
"""Helper that always raises from the same line for stable hashing."""
raise ValueError("test error")
class TestStackHash:
"""Test _stack_hash produces stable hashes."""
def test_hash_is_deterministic_for_same_exception(self):
"""Same exception object always produces the same hash."""
try:
_make_exception()
except ValueError as e:
hash1 = _stack_hash(e)
hash2 = _stack_hash(e)
assert hash1 == hash2
def test_different_exception_types_differ(self):
try:
raise ValueError("x")
except ValueError as e1:
hash1 = _stack_hash(e1)
try:
raise TypeError("x")
except TypeError as e2:
hash2 = _stack_hash(e2)
assert hash1 != hash2
def test_hash_is_hex_string(self):
try:
raise RuntimeError("test")
except RuntimeError as e:
h = _stack_hash(e)
assert len(h) == 16
assert all(c in "0123456789abcdef" for c in h)
class TestIsDuplicate:
"""Test deduplication logic."""
def setup_method(self):
_dedup_cache.clear()
def test_first_occurrence_not_duplicate(self):
assert _is_duplicate("hash_abc") is False
def test_second_occurrence_is_duplicate(self):
_is_duplicate("hash_dup")
assert _is_duplicate("hash_dup") is True
def test_different_hashes_not_duplicates(self):
_is_duplicate("hash_1")
assert _is_duplicate("hash_2") is False
def teardown_method(self):
_dedup_cache.clear()
class TestGetGitContext:
"""Test _get_git_context."""
def test_returns_dict_with_branch_and_commit(self):
"""Git context always returns a dict with branch and commit keys."""
ctx = _get_git_context()
assert "branch" in ctx
assert "commit" in ctx
assert isinstance(ctx["branch"], str)
assert isinstance(ctx["commit"], str)
class TestCaptureError:
"""Test the main capture_error function."""
def setup_method(self):
_dedup_cache.clear()
def test_duplicate_returns_none(self):
"""Second call with same exception is deduplicated."""
try:
_make_exception()
except ValueError as e:
# First call
capture_error(e, source="test")
# Second call — same hash, within dedup window
result = capture_error(e, source="test")
assert result is None
def test_capture_does_not_crash_on_missing_deps(self):
"""capture_error should never crash even if optional deps are missing."""
_dedup_cache.clear()
try:
raise OSError("graceful test")
except OSError as e:
# Should not raise even though swarm/event_log don't exist
capture_error(e, source="graceful")
def test_capture_with_context_does_not_crash(self):
"""capture_error with context dict should not crash."""
_dedup_cache.clear()
try:
raise RuntimeError("context test")
except RuntimeError as e:
capture_error(e, source="test_module", context={"path": "/api/foo"})
def test_capture_creates_task_in_temp_db(self):
"""capture_error should write a task to the isolated temp DB, not production.
This validates that conftest.clean_database properly redirects
swarm.task_queue.models.DB_PATH to the per-test temp directory.
"""
_dedup_cache.clear()
try:
raise ValueError("Test error for DB isolation check")
except ValueError as e:
task_id = capture_error(e, source="test")
# task_id can be None if error_feedback_enabled is False — that's fine,
# but if it was created, verify it landed in the temp DB
if task_id:
import swarm.task_queue.models as tq_models
db_path = tq_models.DB_PATH
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute(
"SELECT id, title FROM tasks WHERE id = ?", (task_id,)
).fetchone()
assert row is not None, (
f"Task {task_id} not found in DB at {db_path}"
"check conftest.clean_database patches swarm.task_queue.models.DB_PATH"
)
assert "[BUG]" in row[1]
finally:
conn.close()
def teardown_method(self):
_dedup_cache.clear()