fix: restore task processor pipeline and eliminate /ws 403 spam (#91)

The microservices refactoring (PR #88) accidentally dropped handler
registration, zombie reconciliation, and startup drain from app.py.
Every task entering the queue was immediately backlogged with
"No handler for task type" because self._handlers stayed empty.

Restores the three critical blocks from app_backup.py:
- Register handlers for chat_response, thought, internal, bug_report,
  task_request
- Reconcile zombie RUNNING tasks from previous crashes
- Drain all pending tasks on startup before entering steady-state loop
- Re-approve tasks that were backlogged due to missing handlers

Also adds a /ws WebSocket catch-all that accepts stale connections and
closes with code 1008 instead of spamming 403 on every retry, and a
`make fresh` target for clean container rebuilds with no cached state.

Co-authored-by: Alexander Payne <apayne@MM.local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-02-28 12:18:18 -05:00
committed by GitHub
parent 79e8a6894a
commit 7b967d84b2
2 changed files with 88 additions and 3 deletions

View File

@@ -14,7 +14,7 @@ import os
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi import FastAPI, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
@@ -163,7 +163,7 @@ async def _thinking_loop() -> None:
async def _task_processor_loop() -> None:
"""Background task: Timmy's task queue processor."""
from swarm.task_processor import task_processor
from swarm.task_queue.models import update_task_status, TaskStatus
from swarm.task_queue.models import update_task_status, list_tasks, TaskStatus
from timmy.session import chat as timmy_chat
from datetime import datetime
import json
@@ -255,6 +255,65 @@ async def _task_processor_loop() -> None:
pass
return f"Error: {str(e)}"
# Register handlers
task_processor.register_handler("chat_response", handle_chat_response)
task_processor.register_handler("thought", handle_thought)
task_processor.register_handler("internal", handle_thought)
task_processor.register_handler("bug_report", handle_bug_report)
task_processor.register_handler("task_request", handle_task_request)
# ── Reconcile zombie tasks from previous crash ──
zombie_count = task_processor.reconcile_zombie_tasks()
if zombie_count:
logger.info("Recycled %d zombie task(s) back to approved", zombie_count)
# ── Re-approve tasks backlogged due to missing handlers ──
stale = list_tasks(status=TaskStatus.BACKLOGGED, assigned_to="timmy")
requeued = 0
for t in stale:
if t.backlog_reason and "No handler for task type" in t.backlog_reason:
update_task_status(t.id, TaskStatus.APPROVED, result=None)
requeued += 1
if requeued:
logger.info("Re-queued %d task(s) that were backlogged due to missing handlers", requeued)
# ── Startup drain: iterate through all pending tasks immediately ──
logger.info("Draining task queue on startup...")
try:
summary = await task_processor.drain_queue()
if summary["processed"] or summary["backlogged"]:
logger.info(
"Startup drain: %d processed, %d backlogged, %d skipped, %d failed",
summary["processed"],
summary["backlogged"],
summary["skipped"],
summary["failed"],
)
# Notify via WebSocket so the dashboard updates
try:
from infrastructure.ws_manager.handler import ws_manager
asyncio.create_task(
ws_manager.broadcast_json(
{
"type": "task_event",
"event": "startup_drain_complete",
"summary": summary,
}
)
)
except Exception:
pass
except Exception as exc:
logger.error("Startup drain failed: %s", exc)
try:
from infrastructure.error_capture import capture_error
capture_error(exc, source="task_processor_startup")
except Exception:
pass
# ── Steady-state: poll for new tasks ──
logger.info("Task processor entering steady-state loop")
await task_processor.run_loop(interval_seconds=3.0)
@@ -465,6 +524,19 @@ app.include_router(bugs_router)
app.include_router(cascade_router)
@app.websocket("/ws")
async def ws_redirect(websocket: WebSocket):
"""Catch stale /ws connections and close cleanly.
Before PR #82, frontend code connected to /ws which never existed as
an endpoint. Stale browser tabs retry forever, spamming 403 errors.
Accept the connection and immediately close with a policy-violation
code so the client stops retrying.
"""
await websocket.accept()
await websocket.close(code=1008, reason="Use /swarm/live instead")
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Serve the main dashboard page."""