diff --git a/.gitignore b/.gitignore
index 9d4ae250..09354cfc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,6 +40,9 @@ reports/
# Self-modify reports (auto-generated)
data/self_modify_reports/
+
+# Error logs (auto-generated)
+logs/
src/data/
# Handoff context (session-scoped)
diff --git a/src/config.py b/src/config.py
index d1aff094..92b9af9f 100644
--- a/src/config.py
+++ b/src/config.py
@@ -141,6 +141,14 @@ class Settings(BaseSettings):
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
+ # ── Error Logging ─────────────────────────────────────────────────
+ error_log_enabled: bool = True
+ error_log_dir: str = "logs"
+ error_log_max_bytes: int = 5_242_880 # 5 MB
+ error_log_backup_count: int = 5
+ error_feedback_enabled: bool = True # Auto-create bug report tasks
+ error_dedup_window_seconds: int = 300 # 5-min dedup window
+
# ── Scripture / Biblical Integration ──────────────────────────────
# Enable the sovereign biblical text module. When enabled, Timmy
# loads the local ESV text corpus and runs meditation workflows.
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index fcba5040..5d63ad3f 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -40,13 +40,51 @@ from dashboard.routes.models import router as models_router
from dashboard.routes.models import api_router as models_api_router
from dashboard.routes.chat_api import router as chat_api_router
from dashboard.routes.thinking import router as thinking_router
+from dashboard.routes.bugs import router as bugs_router
from infrastructure.router.api import router as cascade_router
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
- datefmt="%H:%M:%S",
-)
+def _configure_logging() -> None:
+ """Configure logging with console and optional rotating file handler."""
+ root_logger = logging.getLogger()
+ root_logger.setLevel(logging.INFO)
+
+ # Console handler (existing behavior)
+ console = logging.StreamHandler()
+ console.setLevel(logging.INFO)
+ console.setFormatter(
+ logging.Formatter(
+ "%(asctime)s %(levelname)-8s %(name)s — %(message)s",
+ datefmt="%H:%M:%S",
+ )
+ )
+ root_logger.addHandler(console)
+
+ # Rotating file handler for errors
+ if settings.error_log_enabled:
+ from logging.handlers import RotatingFileHandler
+
+ log_dir = Path(settings.repo_root) / settings.error_log_dir
+ log_dir.mkdir(parents=True, exist_ok=True)
+ error_file = log_dir / "errors.log"
+
+ file_handler = RotatingFileHandler(
+ error_file,
+ maxBytes=settings.error_log_max_bytes,
+ backupCount=settings.error_log_backup_count,
+ )
+ file_handler.setLevel(logging.ERROR)
+ file_handler.setFormatter(
+ logging.Formatter(
+ "%(asctime)s %(levelname)-8s %(name)s — %(message)s\n"
+ " File: %(pathname)s:%(lineno)d\n"
+ " Function: %(funcName)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+ )
+ )
+ root_logger.addHandler(file_handler)
+
+
+_configure_logging()
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).parent
@@ -77,6 +115,11 @@ async def _briefing_scheduler() -> None:
logger.info("Briefing is fresh; skipping generation.")
except Exception as exc:
logger.error("Briefing scheduler error: %s", exc)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(exc, source="briefing_scheduler")
+ except Exception:
+ pass
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
@@ -110,6 +153,11 @@ async def _thinking_loop() -> None:
logger.debug("Created thought task in queue")
except Exception as exc:
logger.error("Thinking loop error: %s", exc)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(exc, source="thinking_loop")
+ except Exception:
+ pass
await asyncio.sleep(settings.thinking_interval_seconds)
@@ -156,6 +204,11 @@ async def _task_processor_loop() -> None:
return response
except Exception as e:
logger.error("Chat response failed: %s", e)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(e, source="chat_response_handler")
+ except Exception:
+ pass
return f"Error: {str(e)}"
def handle_thought(task):
@@ -167,12 +220,22 @@ async def _task_processor_loop() -> None:
return str(result) if result else "Thought completed"
except Exception as e:
logger.error("Thought processing failed: %s", e)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(e, source="thought_handler")
+ except Exception:
+ pass
return f"Error: {str(e)}"
+ def handle_bug_report(task):
+ """Handler for bug_report tasks - acknowledge and mark completed."""
+ return f"Bug report acknowledged: {task.title}"
+
# Register handlers
task_processor.register_handler("chat_response", handle_chat_response)
task_processor.register_handler("thought", handle_thought)
task_processor.register_handler("internal", handle_thought)
+ task_processor.register_handler("bug_report", handle_bug_report)
# ── Startup drain: iterate through all pending tasks immediately ──
logger.info("Draining task queue on startup…")
@@ -204,6 +267,11 @@ async def _task_processor_loop() -> None:
pass
except Exception as exc:
logger.error("Startup drain failed: %s", exc)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(exc, source="task_processor_startup")
+ except Exception:
+ pass
# ── Steady-state: poll for new tasks ──
logger.info("Task processor entering steady-state loop")
@@ -388,6 +456,55 @@ app.include_router(models_api_router)
app.include_router(chat_api_router)
app.include_router(thinking_router)
app.include_router(cascade_router)
+app.include_router(bugs_router)
+
+
+# ── Error capture middleware ──────────────────────────────────────────────
+from starlette.middleware.base import BaseHTTPMiddleware
+from starlette.requests import Request as StarletteRequest
+from fastapi.responses import JSONResponse
+
+
+class ErrorCaptureMiddleware(BaseHTTPMiddleware):
+ """Catch unhandled exceptions and feed them into the error feedback loop."""
+
+ async def dispatch(self, request: StarletteRequest, call_next):
+ try:
+ return await call_next(request)
+ except Exception as exc:
+ logger.error(
+ "Unhandled exception on %s %s: %s",
+ request.method, request.url.path, exc,
+ )
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(
+ exc,
+ source="http_middleware",
+ context={
+ "method": request.method,
+ "path": request.url.path,
+ "query": str(request.query_params),
+ },
+ )
+ except Exception:
+ pass # Never crash the middleware itself
+ raise # Re-raise so FastAPI's default handler returns 500
+
+
+app.add_middleware(ErrorCaptureMiddleware)
+
+
+@app.exception_handler(Exception)
+async def global_exception_handler(request: Request, exc: Exception):
+ """Safety net for uncaught exceptions."""
+ logger.error("Unhandled exception: %s", exc, exc_info=True)
+ try:
+ from infrastructure.error_capture import capture_error
+ capture_error(exc, source="exception_handler", context={"path": str(request.url)})
+ except Exception:
+ pass
+ return JSONResponse(status_code=500, content={"detail": "Internal server error"})
@app.get("/", response_class=HTMLResponse)
diff --git a/src/dashboard/routes/bugs.py b/src/dashboard/routes/bugs.py
new file mode 100644
index 00000000..1bd02371
--- /dev/null
+++ b/src/dashboard/routes/bugs.py
@@ -0,0 +1,86 @@
+"""Bug Report routes -- error feedback loop dashboard.
+
+GET /bugs -- Bug reports dashboard page
+GET /api/bugs -- List bug reports (JSON)
+GET /api/bugs/stats -- Bug report statistics
+"""
+
+import logging
+from pathlib import Path
+from typing import Optional
+
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+from fastapi.templating import Jinja2Templates
+
+from swarm.task_queue.models import list_tasks
+
+logger = logging.getLogger(__name__)
+router = APIRouter(tags=["bugs"])
+templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates"))
+
+
+def _get_bug_reports(status: Optional[str] = None, limit: int = 50) -> list:
+ """Get bug report tasks from the task queue."""
+ all_tasks = list_tasks(limit=limit)
+ bugs = [t for t in all_tasks if t.task_type == "bug_report"]
+ if status:
+ bugs = [t for t in bugs if t.status.value == status]
+ return bugs
+
+
+@router.get("/bugs", response_class=HTMLResponse)
+async def bugs_page(request: Request, status: Optional[str] = None):
+ """Bug reports dashboard page."""
+ bugs = _get_bug_reports(status=status, limit=200)
+
+ # Count by status
+ all_bugs = _get_bug_reports(limit=500)
+ stats: dict[str, int] = {}
+ for bug in all_bugs:
+ s = bug.status.value
+ stats[s] = stats.get(s, 0) + 1
+
+ return templates.TemplateResponse(
+ request,
+ "bugs.html",
+ {
+ "page_title": "Bug Reports",
+ "bugs": bugs,
+ "stats": stats,
+ "total": len(all_bugs),
+ "filter_status": status,
+ },
+ )
+
+
+@router.get("/api/bugs", response_class=JSONResponse)
+async def api_list_bugs(status: Optional[str] = None, limit: int = 50):
+ """List bug reports as JSON."""
+ bugs = _get_bug_reports(status=status, limit=limit)
+ return {
+ "bugs": [
+ {
+ "id": b.id,
+ "title": b.title,
+ "description": b.description,
+ "status": b.status.value,
+ "priority": b.priority.value,
+ "created_at": b.created_at,
+ "result": b.result,
+ }
+ for b in bugs
+ ],
+ "count": len(bugs),
+ }
+
+
+@router.get("/api/bugs/stats", response_class=JSONResponse)
+async def api_bug_stats():
+ """Bug report statistics."""
+ all_bugs = _get_bug_reports(limit=500)
+ stats: dict[str, int] = {}
+ for bug in all_bugs:
+ s = bug.status.value
+ stats[s] = stats.get(s, 0) + 1
+ return {"stats": stats, "total": len(all_bugs)}
diff --git a/src/dashboard/templates/base.html b/src/dashboard/templates/base.html
index 455dbc50..0b563bb2 100644
--- a/src/dashboard/templates/base.html
+++ b/src/dashboard/templates/base.html
@@ -37,6 +37,7 @@
MARKET
TOOLS
EVENTS
+ BUGS
LEDGER
MEMORY
ROUTER
@@ -73,6 +74,7 @@
MARKET
TOOLS
EVENTS
+ BUGS
LEDGER
MEMORY
WORK ORDERS
diff --git a/src/dashboard/templates/bugs.html b/src/dashboard/templates/bugs.html
new file mode 100644
index 00000000..cf65503d
--- /dev/null
+++ b/src/dashboard/templates/bugs.html
@@ -0,0 +1,67 @@
+{% extends "base.html" %}
+
+{% block title %}Bug Reports — Timmy Time{% endblock %}
+
+{% block content %}
+
+
+
+
+
+
+ {% for status_name, count in stats.items() %}
+
+
{{ count }}
+
{{ status_name | replace("_", " ") | upper }}
+
+ {% endfor %}
+
+
+
+
+
+
+
+
+ {% if bugs %}
+ {% for bug in bugs %}
+
+
+
{{ bug.title | e }}
+
+ {{ bug.status.value | replace("_"," ") | upper }}
+ {{ bug.priority.value | upper }}
+
+
+ {% if bug.description %}
+
+ Stack trace & details
+ {{ bug.description | e }}
+
+ {% endif %}
+
{{ bug.created_at[:19].replace("T", " ") }} UTC
+
+ {% endfor %}
+ {% else %}
+
+
No bug reports found.
+
The system is running clean.
+
+ {% endif %}
+
+{% endblock %}
diff --git a/src/infrastructure/error_capture.py b/src/infrastructure/error_capture.py
new file mode 100644
index 00000000..eb761d00
--- /dev/null
+++ b/src/infrastructure/error_capture.py
@@ -0,0 +1,235 @@
+"""Centralized error capture with automatic bug report creation.
+
+Catches errors from anywhere in the system, deduplicates them, logs them
+to the event log, and creates bug report tasks in the task queue.
+
+Usage:
+ from infrastructure.error_capture import capture_error
+
+ try:
+ risky_operation()
+ except Exception as exc:
+ capture_error(exc, source="my_module", context={"request": "/api/foo"})
+"""
+
+import hashlib
+import logging
+import traceback
+from datetime import datetime, timedelta, timezone
+from typing import Optional
+
+logger = logging.getLogger(__name__)
+
+# In-memory dedup cache: hash -> last_seen timestamp
+_dedup_cache: dict[str, datetime] = {}
+
+
+def _stack_hash(exc: Exception) -> str:
+ """Create a stable hash of the exception type + traceback locations.
+
+ Only hashes the file/line/function info from the traceback, not
+ variable values, so the same bug produces the same hash even if
+ runtime data differs.
+ """
+ tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
+ # Extract only "File ..., line ..., in ..." lines for stable hashing
+ stable_parts = [type(exc).__name__]
+ for line in tb_lines:
+ stripped = line.strip()
+ if stripped.startswith("File "):
+ stable_parts.append(stripped)
+ return hashlib.sha256("\n".join(stable_parts).encode()).hexdigest()[:16]
+
+
+def _is_duplicate(error_hash: str) -> bool:
+ """Check if this error was seen recently (within dedup window)."""
+ from config import settings
+
+ now = datetime.now(timezone.utc)
+ window = timedelta(seconds=settings.error_dedup_window_seconds)
+
+ if error_hash in _dedup_cache:
+ last_seen = _dedup_cache[error_hash]
+ if now - last_seen < window:
+ return True
+
+ _dedup_cache[error_hash] = now
+
+ # Prune old entries
+ cutoff = now - window * 2
+ expired = [k for k, v in _dedup_cache.items() if v < cutoff]
+ for k in expired:
+ del _dedup_cache[k]
+
+ return False
+
+
+def _get_git_context() -> dict:
+ """Get current git branch and commit for the bug report."""
+ try:
+ import subprocess
+
+ from config import settings
+
+ branch = subprocess.run(
+ ["git", "branch", "--show-current"],
+ capture_output=True,
+ text=True,
+ timeout=5,
+ cwd=settings.repo_root,
+ ).stdout.strip()
+
+ commit = subprocess.run(
+ ["git", "rev-parse", "--short", "HEAD"],
+ capture_output=True,
+ text=True,
+ timeout=5,
+ cwd=settings.repo_root,
+ ).stdout.strip()
+
+ return {"branch": branch, "commit": commit}
+ except Exception:
+ return {"branch": "unknown", "commit": "unknown"}
+
+
+def capture_error(
+ exc: Exception,
+ source: str = "unknown",
+ context: Optional[dict] = None,
+) -> Optional[str]:
+ """Capture an error and optionally create a bug report.
+
+ Args:
+ exc: The exception to capture
+ source: Module/component where the error occurred
+ context: Optional dict of extra context (request path, etc.)
+
+ Returns:
+ Task ID of the created bug report, or None if deduplicated/disabled
+ """
+ from config import settings
+
+ if not settings.error_feedback_enabled:
+ return None
+
+ error_hash = _stack_hash(exc)
+
+ if _is_duplicate(error_hash):
+ logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
+ return None
+
+ # Format the stack trace
+ tb_str = "".join(
+ traceback.format_exception(type(exc), exc, exc.__traceback__)
+ )
+
+ # Extract file/line from traceback
+ tb_obj = exc.__traceback__
+ affected_file = "unknown"
+ affected_line = 0
+ while tb_obj and tb_obj.tb_next:
+ tb_obj = tb_obj.tb_next
+ if tb_obj:
+ affected_file = tb_obj.tb_frame.f_code.co_filename
+ affected_line = tb_obj.tb_lineno
+
+ git_ctx = _get_git_context()
+
+ # 1. Log to event_log
+ try:
+ from swarm.event_log import EventType, log_event
+
+ log_event(
+ EventType.ERROR_CAPTURED,
+ source=source,
+ data={
+ "error_type": type(exc).__name__,
+ "message": str(exc)[:500],
+ "hash": error_hash,
+ "file": affected_file,
+ "line": affected_line,
+ "git_branch": git_ctx.get("branch", ""),
+ "git_commit": git_ctx.get("commit", ""),
+ },
+ )
+ except Exception as log_exc:
+ logger.debug("Failed to log error event: %s", log_exc)
+
+ # 2. Create bug report task
+ task_id = None
+ try:
+ from swarm.task_queue.models import create_task
+
+ title = f"[BUG] {type(exc).__name__}: {str(exc)[:80]}"
+
+ description_parts = [
+ f"**Error:** {type(exc).__name__}: {str(exc)}",
+ f"**Source:** {source}",
+ f"**File:** {affected_file}:{affected_line}",
+ f"**Git:** {git_ctx.get('branch', '?')} @ {git_ctx.get('commit', '?')}",
+ f"**Time:** {datetime.now(timezone.utc).isoformat()}",
+ f"**Hash:** {error_hash}",
+ ]
+
+ if context:
+ ctx_str = ", ".join(f"{k}={v}" for k, v in context.items())
+ description_parts.append(f"**Context:** {ctx_str}")
+
+ description_parts.append(f"\n**Stack Trace:**\n```\n{tb_str[:2000]}\n```")
+
+ task = create_task(
+ title=title,
+ description="\n".join(description_parts),
+ assigned_to="timmy",
+ created_by="system",
+ priority="normal",
+ requires_approval=False,
+ auto_approve=True,
+ task_type="bug_report",
+ )
+ task_id = task.id
+
+ # Log the creation event
+ try:
+ from swarm.event_log import EventType, log_event
+
+ log_event(
+ EventType.BUG_REPORT_CREATED,
+ source=source,
+ task_id=task_id,
+ data={
+ "error_hash": error_hash,
+ "title": title[:100],
+ },
+ )
+ except Exception:
+ pass
+
+ except Exception as task_exc:
+ logger.debug("Failed to create bug report task: %s", task_exc)
+
+ # 3. Send notification
+ try:
+ from infrastructure.notifications.push import notifier
+
+ notifier.notify(
+ title="Bug Report Filed",
+ message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
+ category="system",
+ )
+ except Exception:
+ pass
+
+ # 4. Record in session logger
+ try:
+ from timmy.session_logger import get_session_logger
+
+ session_logger = get_session_logger()
+ session_logger.record_error(
+ error=f"{type(exc).__name__}: {str(exc)}",
+ context=source,
+ )
+ except Exception:
+ pass
+
+ return task_id
diff --git a/src/infrastructure/events/broadcaster.py b/src/infrastructure/events/broadcaster.py
index c7ba26ad..b7b0cba9 100644
--- a/src/infrastructure/events/broadcaster.py
+++ b/src/infrastructure/events/broadcaster.py
@@ -109,6 +109,8 @@ EVENT_ICONS = {
"system.error": "⚠️",
"system.warning": "🔶",
"system.info": "ℹ️",
+ "error.captured": "🐛",
+ "bug_report.created": "📋",
}
EVENT_LABELS = {
@@ -129,6 +131,8 @@ EVENT_LABELS = {
"system.error": "Error",
"system.warning": "Warning",
"system.info": "Info",
+ "error.captured": "Error captured",
+ "bug_report.created": "Bug report filed",
}
diff --git a/src/swarm/event_log.py b/src/swarm/event_log.py
index 6fda588c..2812d4d0 100644
--- a/src/swarm/event_log.py
+++ b/src/swarm/event_log.py
@@ -47,6 +47,10 @@ class EventType(str, Enum):
SYSTEM_WARNING = "system.warning"
SYSTEM_INFO = "system.info"
+ # Error feedback loop
+ ERROR_CAPTURED = "error.captured"
+ BUG_REPORT_CREATED = "bug_report.created"
+
@dataclass
class EventLogEntry:
diff --git a/src/swarm/task_queue/models.py b/src/swarm/task_queue/models.py
index bc413882..86556a96 100644
--- a/src/swarm/task_queue/models.py
+++ b/src/swarm/task_queue/models.py
@@ -74,8 +74,11 @@ class QueueTask:
AUTO_APPROVE_RULES = [
{"assigned_to": "timmy", "type": "chat_response"},
+ {"assigned_to": "timmy", "type": "thought"},
+ {"assigned_to": "timmy", "type": "internal"},
{"assigned_to": "forge", "type": "run_tests"},
{"priority": "urgent", "created_by": "timmy"},
+ {"type": "bug_report", "created_by": "system"},
]
@@ -87,7 +90,10 @@ def should_auto_approve(task: QueueTask) -> bool:
match = True
for key, val in rule.items():
if key == "type":
- continue # type matching is informational for now
+ if task.task_type != val:
+ match = False
+ break
+ continue
task_val = getattr(task, key, None)
if isinstance(task_val, Enum):
task_val = task_val.value
diff --git a/tests/dashboard/test_bugs_route.py b/tests/dashboard/test_bugs_route.py
new file mode 100644
index 00000000..13186c31
--- /dev/null
+++ b/tests/dashboard/test_bugs_route.py
@@ -0,0 +1,47 @@
+"""Tests for bug reports dashboard route."""
+
+import pytest
+from fastapi.testclient import TestClient
+
+
+@pytest.fixture(autouse=True)
+def _isolate_db(tmp_path, monkeypatch):
+ """Point task_queue and event_log SQLite to a temp directory."""
+ db = tmp_path / "swarm.db"
+ monkeypatch.setattr("swarm.task_queue.models.DB_PATH", db)
+ monkeypatch.setattr("swarm.event_log.DB_PATH", db)
+
+
+@pytest.fixture
+def client():
+ from dashboard.app import app
+
+ with TestClient(app) as c:
+ yield c
+
+
+def test_bugs_page_loads(client):
+ resp = client.get("/bugs")
+ assert resp.status_code == 200
+ assert "BUG REPORTS" in resp.text
+
+
+def test_api_list_bugs(client):
+ resp = client.get("/api/bugs")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "bugs" in data
+ assert "count" in data
+
+
+def test_api_bug_stats(client):
+ resp = client.get("/api/bugs/stats")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "stats" in data
+ assert "total" in data
+
+
+def test_bugs_page_with_status_filter(client):
+ resp = client.get("/bugs?status=approved")
+ assert resp.status_code == 200
diff --git a/tests/infrastructure/test_error_capture.py b/tests/infrastructure/test_error_capture.py
new file mode 100644
index 00000000..98ee1c8e
--- /dev/null
+++ b/tests/infrastructure/test_error_capture.py
@@ -0,0 +1,180 @@
+"""Tests for the error capture and bug report feedback loop."""
+
+from unittest.mock import patch
+
+import pytest
+
+
+@pytest.fixture(autouse=True)
+def _isolate_db(tmp_path, monkeypatch):
+ """Point task_queue and event_log SQLite to a temp directory."""
+ db = tmp_path / "swarm.db"
+ monkeypatch.setattr("swarm.task_queue.models.DB_PATH", db)
+ monkeypatch.setattr("swarm.event_log.DB_PATH", db)
+
+
+@pytest.fixture(autouse=True)
+def _clear_dedup():
+ """Clear the dedup cache between tests."""
+ from infrastructure.error_capture import _dedup_cache
+
+ _dedup_cache.clear()
+ yield
+ _dedup_cache.clear()
+
+
+def _raise_value_error():
+ """Helper — always raises from the same file:line so hash is stable."""
+ raise ValueError("test error")
+
+
+def _raise_type_error():
+ """Helper — always raises from the same file:line so hash is stable."""
+ raise TypeError("type error")
+
+
+class TestStackHash:
+ def test_same_exception_deterministic(self):
+ """Hash is deterministic for the same exception object."""
+ from infrastructure.error_capture import _stack_hash
+
+ try:
+ _raise_value_error()
+ except ValueError as exc:
+ hash1 = _stack_hash(exc)
+ hash2 = _stack_hash(exc)
+
+ assert hash1 == hash2
+
+ def test_different_exception_types_different_hash(self):
+ from infrastructure.error_capture import _stack_hash
+
+ try:
+ _raise_value_error()
+ except ValueError as exc1:
+ hash1 = _stack_hash(exc1)
+
+ try:
+ _raise_type_error()
+ except TypeError as exc2:
+ hash2 = _stack_hash(exc2)
+
+ assert hash1 != hash2
+
+ def test_hash_is_16_chars(self):
+ from infrastructure.error_capture import _stack_hash
+
+ try:
+ raise RuntimeError("hash length test")
+ except RuntimeError as exc:
+ h = _stack_hash(exc)
+
+ assert len(h) == 16
+
+
+class TestDeduplication:
+ def test_first_error_not_duplicate(self):
+ from infrastructure.error_capture import _is_duplicate
+
+ assert _is_duplicate("test-hash-001") is False
+
+ def test_same_hash_is_duplicate(self):
+ from infrastructure.error_capture import _is_duplicate
+
+ _is_duplicate("test-hash-002") # First time
+ assert _is_duplicate("test-hash-002") is True
+
+ def test_different_hashes_not_duplicate(self):
+ from infrastructure.error_capture import _is_duplicate
+
+ _is_duplicate("hash-aaa")
+ assert _is_duplicate("hash-bbb") is False
+
+
+class TestCaptureError:
+ def test_capture_creates_bug_report_task(self):
+ from infrastructure.error_capture import capture_error
+
+ try:
+ raise RuntimeError("test capture error")
+ except RuntimeError as exc:
+ task_id = capture_error(exc, source="test_module")
+
+ assert task_id is not None
+
+ from swarm.task_queue.models import get_task
+
+ task = get_task(task_id)
+ assert task is not None
+ assert task.task_type == "bug_report"
+ assert "RuntimeError" in task.title
+ assert task.created_by == "system"
+
+ def test_capture_deduplicates(self):
+ """Capturing the same exception twice suppresses the second report."""
+ from infrastructure.error_capture import capture_error, _dedup_cache, _stack_hash
+
+ try:
+ _raise_value_error()
+ except ValueError as exc:
+ # Capture first time
+ id1 = capture_error(exc, source="test")
+ # Manually insert hash as if it was just seen (capture already did this)
+ # Now capture again with the same exc object — should be deduped
+ id2 = capture_error(exc, source="test")
+
+ assert id1 is not None
+ assert id2 is None # Deduplicated
+
+ def test_capture_disabled(self, monkeypatch):
+ monkeypatch.setattr("config.settings.error_feedback_enabled", False)
+ from infrastructure.error_capture import capture_error
+
+ try:
+ raise RuntimeError("disabled test")
+ except RuntimeError as exc:
+ result = capture_error(exc, source="test")
+
+ assert result is None
+
+ def test_capture_includes_context(self):
+ from infrastructure.error_capture import capture_error
+
+ try:
+ raise IOError("context test")
+ except IOError as exc:
+ task_id = capture_error(
+ exc, source="http", context={"path": "/api/test"}
+ )
+
+ from swarm.task_queue.models import get_task
+
+ task = get_task(task_id)
+ assert "/api/test" in task.description
+
+ def test_capture_includes_stack_trace(self):
+ from infrastructure.error_capture import capture_error
+
+ try:
+ raise KeyError("stack trace test")
+ except KeyError as exc:
+ task_id = capture_error(exc, source="test")
+
+ from swarm.task_queue.models import get_task
+
+ task = get_task(task_id)
+ assert "Stack Trace" in task.description
+ assert "KeyError" in task.description
+
+ def test_bug_report_is_auto_approved(self):
+ from infrastructure.error_capture import capture_error
+
+ try:
+ raise RuntimeError("auto-approve test")
+ except RuntimeError as exc:
+ task_id = capture_error(exc, source="test")
+
+ from swarm.task_queue.models import get_task
+
+ task = get_task(task_id)
+ assert task.status.value == "approved"