forked from Rockachopa/Timmy-time-dashboard
Test data was bleeding into production tasks.db because swarm.task_queue.models.DB_PATH (relative path) was never patched in conftest.clean_database. Fixed by switching to absolute paths via settings.repo_root and adding the missing module to the patching list. Discord bot could leak orphaned clients on retry after ERROR state. Added _cleanup_stale() to close stale client/task before each start() attempt, with improved logging in the token watcher. Rewrote test_paperclip_client.py to use httpx.MockTransport instead of patching _get/_post/_delete — tests now exercise real HTTP status codes, error handling, and JSON parsing. Added end-to-end test for capture_error → create_task DB isolation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
156 lines
4.7 KiB
Python
156 lines
4.7 KiB
Python
"""Tests for infrastructure.error_capture module."""
|
|
|
|
import sqlite3
|
|
|
|
from infrastructure.error_capture import (
|
|
_dedup_cache,
|
|
_get_git_context,
|
|
_is_duplicate,
|
|
_stack_hash,
|
|
capture_error,
|
|
)
|
|
|
|
|
|
def _make_exception():
|
|
"""Helper that always raises from the same line for stable hashing."""
|
|
raise ValueError("test error")
|
|
|
|
|
|
class TestStackHash:
|
|
"""Test _stack_hash produces stable hashes."""
|
|
|
|
def test_hash_is_deterministic_for_same_exception(self):
|
|
"""Same exception object always produces the same hash."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
hash1 = _stack_hash(e)
|
|
hash2 = _stack_hash(e)
|
|
assert hash1 == hash2
|
|
|
|
def test_different_exception_types_differ(self):
|
|
try:
|
|
raise ValueError("x")
|
|
except ValueError as e1:
|
|
hash1 = _stack_hash(e1)
|
|
|
|
try:
|
|
raise TypeError("x")
|
|
except TypeError as e2:
|
|
hash2 = _stack_hash(e2)
|
|
|
|
assert hash1 != hash2
|
|
|
|
def test_hash_is_hex_string(self):
|
|
try:
|
|
raise RuntimeError("test")
|
|
except RuntimeError as e:
|
|
h = _stack_hash(e)
|
|
assert len(h) == 16
|
|
assert all(c in "0123456789abcdef" for c in h)
|
|
|
|
|
|
class TestIsDuplicate:
|
|
"""Test deduplication logic."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_first_occurrence_not_duplicate(self):
|
|
assert _is_duplicate("hash_abc") is False
|
|
|
|
def test_second_occurrence_is_duplicate(self):
|
|
_is_duplicate("hash_dup")
|
|
assert _is_duplicate("hash_dup") is True
|
|
|
|
def test_different_hashes_not_duplicates(self):
|
|
_is_duplicate("hash_1")
|
|
assert _is_duplicate("hash_2") is False
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
|
|
class TestGetGitContext:
|
|
"""Test _get_git_context."""
|
|
|
|
def test_returns_dict_with_branch_and_commit(self):
|
|
"""Git context always returns a dict with branch and commit keys."""
|
|
ctx = _get_git_context()
|
|
assert "branch" in ctx
|
|
assert "commit" in ctx
|
|
assert isinstance(ctx["branch"], str)
|
|
assert isinstance(ctx["commit"], str)
|
|
|
|
|
|
class TestCaptureError:
|
|
"""Test the main capture_error function."""
|
|
|
|
def setup_method(self):
|
|
_dedup_cache.clear()
|
|
|
|
def test_duplicate_returns_none(self):
|
|
"""Second call with same exception is deduplicated."""
|
|
try:
|
|
_make_exception()
|
|
except ValueError as e:
|
|
# First call
|
|
capture_error(e, source="test")
|
|
# Second call — same hash, within dedup window
|
|
result = capture_error(e, source="test")
|
|
assert result is None
|
|
|
|
def test_capture_does_not_crash_on_missing_deps(self):
|
|
"""capture_error should never crash even if optional deps are missing."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise OSError("graceful test")
|
|
except OSError as e:
|
|
# Should not raise even though swarm/event_log don't exist
|
|
capture_error(e, source="graceful")
|
|
|
|
def test_capture_with_context_does_not_crash(self):
|
|
"""capture_error with context dict should not crash."""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise RuntimeError("context test")
|
|
except RuntimeError as e:
|
|
capture_error(e, source="test_module", context={"path": "/api/foo"})
|
|
|
|
def test_capture_creates_task_in_temp_db(self):
|
|
"""capture_error should write a task to the isolated temp DB, not production.
|
|
|
|
This validates that conftest.clean_database properly redirects
|
|
swarm.task_queue.models.DB_PATH to the per-test temp directory.
|
|
"""
|
|
_dedup_cache.clear()
|
|
|
|
try:
|
|
raise ValueError("Test error for DB isolation check")
|
|
except ValueError as e:
|
|
task_id = capture_error(e, source="test")
|
|
|
|
# task_id can be None if error_feedback_enabled is False — that's fine,
|
|
# but if it was created, verify it landed in the temp DB
|
|
if task_id:
|
|
import swarm.task_queue.models as tq_models
|
|
|
|
db_path = tq_models.DB_PATH
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT id, title FROM tasks WHERE id = ?", (task_id,)
|
|
).fetchone()
|
|
assert row is not None, (
|
|
f"Task {task_id} not found in DB at {db_path} — "
|
|
"check conftest.clean_database patches swarm.task_queue.models.DB_PATH"
|
|
)
|
|
assert "[BUG]" in row[1]
|
|
finally:
|
|
conn.close()
|
|
|
|
def teardown_method(self):
|
|
_dedup_cache.clear()
|