"""Centralized error capture with automatic bug report creation. Catches errors from anywhere in the system, deduplicates them, logs them to the event log, and creates bug report tasks in the task queue. Usage: from infrastructure.error_capture import capture_error try: risky_operation() except Exception as exc: capture_error(exc, source="my_module", context={"request": "/api/foo"}) """ import hashlib import logging import traceback from datetime import UTC, datetime, timedelta logger = logging.getLogger(__name__) # In-memory dedup cache: hash -> last_seen timestamp _dedup_cache: dict[str, datetime] = {} _error_recorder = None def register_error_recorder(fn): """Register a callback for recording errors to session log.""" global _error_recorder _error_recorder = fn def _stack_hash(exc: Exception) -> str: """Create a stable hash of the exception type + traceback locations. Only hashes the file/line/function info from the traceback, not variable values, so the same bug produces the same hash even if runtime data differs. """ tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__) # Extract only "File ..., line ..., in ..." lines for stable hashing stable_parts = [type(exc).__name__] for line in tb_lines: stripped = line.strip() if stripped.startswith("File "): stable_parts.append(stripped) return hashlib.sha256("\n".join(stable_parts).encode()).hexdigest()[:16] def _is_duplicate(error_hash: str) -> bool: """Check if this error was seen recently (within dedup window).""" from config import settings now = datetime.now(UTC) window = timedelta(seconds=settings.error_dedup_window_seconds) if error_hash in _dedup_cache: last_seen = _dedup_cache[error_hash] if now - last_seen < window: return True _dedup_cache[error_hash] = now # Prune old entries cutoff = now - window * 2 expired = [k for k, v in _dedup_cache.items() if v < cutoff] for k in expired: del _dedup_cache[k] return False def _get_git_context() -> dict: """Get current git branch and commit for the bug report.""" try: import subprocess from config import settings branch = subprocess.run( ["git", "branch", "--show-current"], capture_output=True, text=True, timeout=5, cwd=settings.repo_root, ).stdout.strip() commit = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], capture_output=True, text=True, timeout=5, cwd=settings.repo_root, ).stdout.strip() return {"branch": branch, "commit": commit} except Exception as exc: logger.warning("Git info capture error: %s", exc) return {"branch": "unknown", "commit": "unknown"} def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]: """Extract formatted traceback, affected file, and line number. Returns: Tuple of (traceback_string, affected_file, affected_line). """ tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) tb_obj = exc.__traceback__ affected_file = "unknown" affected_line = 0 while tb_obj and tb_obj.tb_next: tb_obj = tb_obj.tb_next if tb_obj: affected_file = tb_obj.tb_frame.f_code.co_filename affected_line = tb_obj.tb_lineno return tb_str, affected_file, affected_line def _log_error_event( exc: Exception, source: str, error_hash: str, affected_file: str, affected_line: int, git_ctx: dict, ) -> None: """Log the captured error to the event log.""" try: from swarm.event_log import EventType, log_event log_event( EventType.ERROR_CAPTURED, source=source, data={ "error_type": type(exc).__name__, "message": str(exc)[:500], "hash": error_hash, "file": affected_file, "line": affected_line, "git_branch": git_ctx.get("branch", ""), "git_commit": git_ctx.get("commit", ""), }, ) except Exception as log_exc: logger.debug("Failed to log error event: %s", log_exc) def _build_report_description( exc: Exception, source: str, context: dict | None, error_hash: str, tb_str: str, affected_file: str, affected_line: int, git_ctx: dict, ) -> str: """Build the markdown description for a bug report task.""" parts = [ f"**Error:** {type(exc).__name__}: {str(exc)}", f"**Source:** {source}", f"**File:** {affected_file}:{affected_line}", f"**Git:** {git_ctx.get('branch', '?')} @ {git_ctx.get('commit', '?')}", f"**Time:** {datetime.now(UTC).isoformat()}", f"**Hash:** {error_hash}", ] if context: ctx_str = ", ".join(f"{k}={v}" for k, v in context.items()) parts.append(f"**Context:** {ctx_str}") parts.append(f"\n**Stack Trace:**\n```\n{tb_str[:2000]}\n```") return "\n".join(parts) def _log_bug_report_created(source: str, task_id: str, error_hash: str, title: str) -> None: """Log a BUG_REPORT_CREATED event (best-effort).""" try: from swarm.event_log import EventType, log_event log_event( EventType.BUG_REPORT_CREATED, source=source, task_id=task_id, data={ "error_hash": error_hash, "title": title[:100], }, ) except Exception as exc: logger.warning("Bug report event log error: %s", exc) def _create_bug_report( exc: Exception, source: str, context: dict | None, error_hash: str, tb_str: str, affected_file: str, affected_line: int, git_ctx: dict, ) -> str | None: """Create a bug report task and return the task ID (or None on failure).""" try: from swarm.task_queue.models import create_task title = f"[BUG] {type(exc).__name__}: {str(exc)[:80]}" description = _build_report_description( exc, source, context, error_hash, tb_str, affected_file, affected_line, git_ctx, ) task = create_task( title=title, description=description, assigned_to="default", created_by="system", priority="normal", requires_approval=False, auto_approve=True, task_type="bug_report", ) _log_bug_report_created(source, task.id, error_hash, title) return task.id except Exception as task_exc: logger.debug("Failed to create bug report task: %s", task_exc) return None def _notify_bug_report(exc: Exception, source: str) -> None: """Send a push notification about the captured error.""" try: from infrastructure.notifications.push import notifier notifier.notify( title="Bug Report Filed", message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}", category="system", ) except Exception as notify_exc: logger.warning("Bug report notification error: %s", notify_exc) def _record_to_session(exc: Exception, source: str) -> None: """Record the error via the registered session callback.""" if _error_recorder is not None: try: _error_recorder( error=f"{type(exc).__name__}: {str(exc)}", context=source, ) except Exception as log_exc: logger.warning("Bug report session logging error: %s", log_exc) def capture_error( exc: Exception, source: str = "unknown", context: dict | None = None, ) -> str | None: """Capture an error and optionally create a bug report. Args: exc: The exception to capture source: Module/component where the error occurred context: Optional dict of extra context (request path, etc.) Returns: Task ID of the created bug report, or None if deduplicated/disabled """ from config import settings if not settings.error_feedback_enabled: return None error_hash = _stack_hash(exc) if _is_duplicate(error_hash): logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash) return None tb_str, affected_file, affected_line = _extract_traceback_info(exc) git_ctx = _get_git_context() _log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx) task_id = _create_bug_report( exc, source, context, error_hash, tb_str, affected_file, affected_line, git_ctx, ) _notify_bug_report(exc, source) _record_to_session(exc, source) return task_id