This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/tests/infrastructure/test_error_capture.py
Kimi Agent 9535dd86de test: push event system coverage to ≥80% on all three modules
Add 3 targeted tests for infrastructure/error_capture.py:
- test_stale_entries_pruned: exercises dedup cache pruning (line 61)
- test_git_context_fallback_on_failure: exercises exception path (lines 90-91)
- test_returns_none_when_feedback_disabled: exercises early return (line 112)

Coverage results (63 tests, all passing):
- error_capture.py: 75.6% → 80.0%
- broadcaster.py: 93.9% (unchanged)
- bus.py: 92.9% (unchanged)
- Total: 88.1% → 89.4%

Closes #45
2026-03-14 16:01:05 -04:00

196 lines
6.2 KiB
Python

"""Tests for infrastructure.error_capture module."""
import sqlite3
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_dedup_cache,
_get_git_context,
_is_duplicate,
_stack_hash,
capture_error,
)
def _make_exception():
"""Helper that always raises from the same line for stable hashing."""
raise ValueError("test error")
class TestStackHash:
"""Test _stack_hash produces stable hashes."""
def test_hash_is_deterministic_for_same_exception(self):
"""Same exception object always produces the same hash."""
try:
_make_exception()
except ValueError as e:
hash1 = _stack_hash(e)
hash2 = _stack_hash(e)
assert hash1 == hash2
def test_different_exception_types_differ(self):
try:
raise ValueError("x")
except ValueError as e1:
hash1 = _stack_hash(e1)
try:
raise TypeError("x")
except TypeError as e2:
hash2 = _stack_hash(e2)
assert hash1 != hash2
def test_hash_is_hex_string(self):
try:
raise RuntimeError("test")
except RuntimeError as e:
h = _stack_hash(e)
assert len(h) == 16
assert all(c in "0123456789abcdef" for c in h)
class TestIsDuplicate:
"""Test deduplication logic."""
def setup_method(self):
_dedup_cache.clear()
def test_first_occurrence_not_duplicate(self):
assert _is_duplicate("hash_abc") is False
def test_second_occurrence_is_duplicate(self):
_is_duplicate("hash_dup")
assert _is_duplicate("hash_dup") is True
def test_different_hashes_not_duplicates(self):
_is_duplicate("hash_1")
assert _is_duplicate("hash_2") is False
def test_stale_entries_pruned(self):
"""Old entries beyond 2x the dedup window should be pruned."""
from config import settings
window = settings.error_dedup_window_seconds
# Seed a stale entry far in the past
stale_time = datetime.now(UTC) - timedelta(seconds=window * 3)
_dedup_cache["stale_hash"] = stale_time
# Calling _is_duplicate on a new hash triggers pruning
_is_duplicate("fresh_hash")
assert "stale_hash" not in _dedup_cache
assert "fresh_hash" in _dedup_cache
def teardown_method(self):
_dedup_cache.clear()
class TestGetGitContext:
"""Test _get_git_context."""
def test_returns_dict_with_branch_and_commit(self):
"""Git context always returns a dict with branch and commit keys."""
ctx = _get_git_context()
assert "branch" in ctx
assert "commit" in ctx
assert isinstance(ctx["branch"], str)
assert isinstance(ctx["commit"], str)
def test_git_context_fallback_on_failure(self):
"""When subprocess.run fails, returns 'unknown' for both fields."""
with patch("subprocess.run", side_effect=OSError("git not found")):
ctx = _get_git_context()
assert ctx == {"branch": "unknown", "commit": "unknown"}
class TestCaptureError:
"""Test the main capture_error function."""
def setup_method(self):
_dedup_cache.clear()
def test_duplicate_returns_none(self):
"""Second call with same exception is deduplicated."""
try:
_make_exception()
except ValueError as e:
# First call
capture_error(e, source="test")
# Second call — same hash, within dedup window
result = capture_error(e, source="test")
assert result is None
def test_returns_none_when_feedback_disabled(self):
"""capture_error returns None immediately when error_feedback_enabled is False."""
_dedup_cache.clear()
import config
original = config.settings.error_feedback_enabled
try:
config.settings.error_feedback_enabled = False
try:
raise RuntimeError("disabled test")
except RuntimeError as e:
result = capture_error(e, source="test")
assert result is None
finally:
config.settings.error_feedback_enabled = original
def test_capture_does_not_crash_on_missing_deps(self):
"""capture_error should never crash even if optional deps are missing."""
_dedup_cache.clear()
try:
raise OSError("graceful test")
except OSError as e:
# Should not raise even though swarm/event_log don't exist
capture_error(e, source="graceful")
def test_capture_with_context_does_not_crash(self):
"""capture_error with context dict should not crash."""
_dedup_cache.clear()
try:
raise RuntimeError("context test")
except RuntimeError as e:
capture_error(e, source="test_module", context={"path": "/api/foo"})
def test_capture_creates_task_in_temp_db(self):
"""capture_error should write a task to the isolated temp DB, not production.
This validates that conftest.clean_database properly redirects
swarm.task_queue.models.DB_PATH to the per-test temp directory.
"""
_dedup_cache.clear()
try:
raise ValueError("Test error for DB isolation check")
except ValueError as e:
task_id = capture_error(e, source="test")
# task_id can be None if error_feedback_enabled is False — that's fine,
# but if it was created, verify it landed in the temp DB
if task_id:
import swarm.task_queue.models as tq_models
db_path = tq_models.DB_PATH
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute(
"SELECT id, title FROM tasks WHERE id = ?", (task_id,)
).fetchone()
assert row is not None, (
f"Task {task_id} not found in DB at {db_path}"
"check conftest.clean_database patches swarm.task_queue.models.DB_PATH"
)
assert "[BUG]" in row[1]
finally:
conn.close()
def teardown_method(self):
_dedup_cache.clear()