From 5b6d33e05aaeb3e5ecac74c3b9441382fccd1568 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone <8633216+AlexanderWhitestone@users.noreply.github.com> Date: Fri, 27 Feb 2026 01:52:42 -0500 Subject: [PATCH] feat: task queue system with startup drain and backlogging (#76) * feat: add task queue system for Timmy - all work goes through the queue - Add queue position tracking to task_queue models with task_type field - Add TaskProcessor class that consumes tasks from queue one at a time - Modify chat route to queue all messages for async processing - Chat responses get 'high' priority to jump ahead of thought tasks - Add queue status API endpoints for position polling - Update UI to show queue position (x/y) and current task banner - Replace thinking loop with task-based approach - thoughts are queued tasks - Push responses to user via WebSocket instead of immediate HTTP response - Add database migrations for existing tables * feat: Timmy drains task queue on startup, backlogs unhandleable tasks On spin-up, Timmy now iterates through all pending/approved tasks immediately instead of waiting for the polling loop. Tasks without a registered handler or with permanent errors are moved to a new BACKLOGGED status with a reason, keeping the queue clear for work Timmy can actually do. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Alexander Payne Co-authored-by: Claude Opus 4.6 --- MEMORY.md | 4 +- memory/self/user_profile.md | 4 +- message_to_alexander.txt | 4 + src/dashboard/app.py | 138 +++++- src/dashboard/routes/agents.py | 68 ++- src/dashboard/routes/tasks.py | 162 ++++++- .../templates/partials/chat_message.html | 5 + .../templates/partials/timmy_panel.html | 88 ++++ src/swarm/task_processor.py | 263 +++++++++++ src/swarm/task_queue/models.py | 237 ++++++++-- tests/swarm/test_task_queue.py | 430 ++++++++++++++++-- thought_stream.txt | 3 + 12 files changed, 1286 insertions(+), 120 deletions(-) create mode 100644 message_to_alexander.txt create mode 100644 src/swarm/task_processor.py create mode 100644 thought_stream.txt diff --git a/MEMORY.md b/MEMORY.md index 51c3e64..5fccd07 100644 --- a/MEMORY.md +++ b/MEMORY.md @@ -34,10 +34,8 @@ ## User Profile -**Name:** (not set) -**Interests:** (to be learned) +**Name:** Not ---- ## Key Decisions diff --git a/memory/self/user_profile.md b/memory/self/user_profile.md index 8a4e121..2c09340 100644 --- a/memory/self/user_profile.md +++ b/memory/self/user_profile.md @@ -4,7 +4,7 @@ ## Basic Information -**Name:** TestUser +**Name:** Not **Location:** (unknown) **Occupation:** (unknown) **Technical Level:** (to be assessed) @@ -40,4 +40,4 @@ --- -*Last updated: 2026-02-25* +*Last updated: 2026-02-27* diff --git a/message_to_alexander.txt b/message_to_alexander.txt new file mode 100644 index 0000000..7fd8493 --- /dev/null +++ b/message_to_alexander.txt @@ -0,0 +1,4 @@ +Hello Alexander, + +I'm feeling good today, thanks for asking. +My team is doing well and we're making progress on our projects. \ No newline at end of file diff --git a/src/dashboard/app.py b/src/dashboard/app.py index d055adc..fcba504 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -84,29 +84,139 @@ async def _briefing_scheduler() -> None: async def _thinking_loop() -> None: """Background task: Timmy's default thinking thread. - Starts shortly after server boot and runs on a configurable cadence. - Timmy ponders his existence, recent swarm activity, scripture, creative - ideas, or continues a previous train of thought. + Instead of thinking directly, this creates thought tasks in the queue + for the task processor to handle. This ensures all of Timmy's work + goes through the unified task system. """ - from timmy.thinking import thinking_engine + from swarm.task_queue.models import create_task + from datetime import datetime await asyncio.sleep(10) # Let server finish starting before first thought while True: try: - await thinking_engine.think_once() + # Create a thought task instead of thinking directly + now = datetime.now() + create_task( + title=f"Thought: {now.strftime('%A %B %d, %I:%M %p')}", + description="Continue thinking about your existence, recent events, scripture, creative ideas, or a previous thread of thought.", + assigned_to="timmy", + created_by="timmy", # Self-generated + priority="low", + requires_approval=False, + auto_approve=True, + task_type="thought", + ) + logger.debug("Created thought task in queue") except Exception as exc: logger.error("Thinking loop error: %s", exc) await asyncio.sleep(settings.thinking_interval_seconds) +async def _task_processor_loop() -> None: + """Background task: Timmy's task queue processor. + + On startup, drains all pending/approved tasks immediately — iterating + through the queue and processing what can be handled, backlogging what + can't. Then enters the steady-state polling loop. + """ + from swarm.task_processor import task_processor + from swarm.task_queue.models import update_task_status, TaskStatus + from timmy.session import chat as timmy_chat + from datetime import datetime + import json + import asyncio + + await asyncio.sleep(5) # Let server finish starting + + def handle_chat_response(task): + """Handler for chat_response tasks - calls Timmy and returns response.""" + try: + now = datetime.now() + context = f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n\n" + response = timmy_chat(context + task.description) + + # Push response to user via WebSocket + try: + from infrastructure.ws_manager.handler import ws_manager + + asyncio.create_task( + ws_manager.broadcast( + "timmy_response", + { + "task_id": task.id, + "response": response, + }, + ) + ) + except Exception as e: + logger.debug("Failed to push response via WS: %s", e) + + return response + except Exception as e: + logger.error("Chat response failed: %s", e) + return f"Error: {str(e)}" + + def handle_thought(task): + """Handler for thought tasks - Timmy's internal thinking.""" + from timmy.thinking import thinking_engine + + try: + result = thinking_engine.think_once() + return str(result) if result else "Thought completed" + except Exception as e: + logger.error("Thought processing failed: %s", e) + return f"Error: {str(e)}" + + # Register handlers + task_processor.register_handler("chat_response", handle_chat_response) + task_processor.register_handler("thought", handle_thought) + task_processor.register_handler("internal", handle_thought) + + # ── Startup drain: iterate through all pending tasks immediately ── + logger.info("Draining task queue on startup…") + try: + summary = await task_processor.drain_queue() + if summary["processed"] or summary["backlogged"]: + logger.info( + "Startup drain: %d processed, %d backlogged, %d skipped, %d failed", + summary["processed"], + summary["backlogged"], + summary["skipped"], + summary["failed"], + ) + + # Notify via WebSocket so the dashboard updates + try: + from infrastructure.ws_manager.handler import ws_manager + + asyncio.create_task( + ws_manager.broadcast_json( + { + "type": "task_event", + "event": "startup_drain_complete", + "summary": summary, + } + ) + ) + except Exception: + pass + except Exception as exc: + logger.error("Startup drain failed: %s", exc) + + # ── Steady-state: poll for new tasks ── + logger.info("Task processor entering steady-state loop") + await task_processor.run_loop(interval_seconds=3.0) + + @asynccontextmanager async def lifespan(app: FastAPI): task = asyncio.create_task(_briefing_scheduler()) # Register Timmy in the swarm registry so it shows up alongside other agents from swarm import registry as swarm_registry + swarm_registry.register( name="Timmy", capabilities="chat,reasoning,research,planning", @@ -115,6 +225,7 @@ async def lifespan(app: FastAPI): # Log swarm recovery summary (reconciliation ran during coordinator init) from swarm.coordinator import coordinator as swarm_coordinator + rec = swarm_coordinator._recovery_summary if rec["tasks_failed"] or rec["agents_offlined"]: logger.info( @@ -138,6 +249,7 @@ async def lifespan(app: FastAPI): # Log system startup event so the Events page is never empty try: from swarm.event_log import log_event, EventType + log_event( EventType.SYSTEM_INFO, source="coordinator", @@ -148,6 +260,7 @@ async def lifespan(app: FastAPI): # Auto-bootstrap MCP tools from mcp.bootstrap import auto_bootstrap, get_bootstrap_status + try: registered = auto_bootstrap() if registered: @@ -157,6 +270,7 @@ async def lifespan(app: FastAPI): # Initialise Spark Intelligence engine from spark.engine import spark_engine + if spark_engine.enabled: logger.info("Spark Intelligence active — event capture enabled") @@ -169,10 +283,17 @@ async def lifespan(app: FastAPI): settings.thinking_interval_seconds, ) + # Start Timmy's task queue processor (skip in test mode) + task_processor_task = None + if os.environ.get("TIMMY_TEST_MODE") != "1": + task_processor_task = asyncio.create_task(_task_processor_loop()) + logger.info("Task queue processor started") + # Auto-start chat integrations (skip silently if unconfigured) from integrations.telegram_bot.bot import telegram_bot from integrations.chat_bridge.vendors.discord import discord_bot from integrations.chat_bridge.registry import platform_registry + platform_registry.register(discord_bot) if settings.telegram_token: @@ -195,6 +316,12 @@ async def lifespan(app: FastAPI): await thinking_task except asyncio.CancelledError: pass + if task_processor_task: + task_processor_task.cancel() + try: + await task_processor_task + except asyncio.CancelledError: + pass task.cancel() try: await task @@ -272,4 +399,5 @@ async def index(request: Request): async def shortcuts_setup(): """Siri Shortcuts setup guide.""" from integrations.shortcuts.siri import get_setup_guide + return get_setup_guide() diff --git a/src/dashboard/routes/agents.py b/src/dashboard/routes/agents.py index 005fd01..1c8306c 100644 --- a/src/dashboard/routes/agents.py +++ b/src/dashboard/routes/agents.py @@ -237,16 +237,19 @@ async def clear_history(request: Request): @router.post("/timmy/chat", response_class=HTMLResponse) async def chat_timmy(request: Request, message: str = Form(...)): + """Chat with Timmy - queues message as task for async processing.""" + from swarm.task_queue.models import create_task, get_queue_status_for_task + timestamp = datetime.now().strftime("%H:%M:%S") + task_id = None response_text = None error_text = None + queue_info = None - # Check if the user wants to queue a task instead of chatting + # Check if the user wants to queue a task (explicit queue request) task_info = _extract_task_from_message(message) if task_info: try: - from swarm.task_queue.models import create_task - task = create_task( title=task_info["title"], description=task_info["description"], @@ -254,7 +257,9 @@ async def chat_timmy(request: Request, message: str = Form(...)): assigned_to=task_info.get("agent", "timmy"), priority=task_info.get("priority", "normal"), requires_approval=True, + task_type="task_request", ) + task_id = task.id priority_label = ( f" | Priority: `{task.priority.value}`" if task.priority.value != "normal" @@ -276,27 +281,54 @@ async def chat_timmy(request: Request, message: str = Form(...)): logger.error("Failed to create task from chat: %s", exc) task_info = None - # Normal chat path (also used as fallback if task creation failed) + # Normal chat: always queue for async processing if not task_info: try: - now = datetime.now() - context_parts = [ - f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]" - ] - if _QUEUE_QUERY_PATTERN.search(message): - queue_ctx = _build_queue_context() - if queue_ctx: - context_parts.append(queue_ctx) - context_prefix = "\n".join(context_parts) + "\n\n" - response_text = timmy_chat(context_prefix + message) - except Exception as exc: - error_text = f"Timmy is offline: {exc}" + # Create a chat response task (auto-approved for timmy) + # Priority is "high" to jump ahead of Timmy's self-generated "thought" tasks + # but below any "urgent" tasks Timmy might create + task = create_task( + title=message[:100] + ("..." if len(message) > 100 else ""), + description=message, + created_by="user", + assigned_to="timmy", + priority="high", # Higher than thought tasks, lower than urgent + requires_approval=True, + auto_approve=True, # Auto-approve chat responses + task_type="chat_response", + ) + task_id = task.id + queue_info = get_queue_status_for_task(task.id) + # Acknowledge queuing + position = queue_info.get("position", 1) + total = queue_info.get("total", 1) + percent_ahead = queue_info.get("percent_ahead", 0) + + response_text = ( + f"Message queued for Timmy's attention.\n\n" + f"**Queue position:** {position}/{total} ({100 - percent_ahead}% complete ahead of you)\n\n" + f"_Timmy will respond shortly..._" + ) + logger.info( + "Chat → queued: %s (id=%s, position=%d/%d)", + message[:50], + task.id, + position, + total, + ) + except Exception as exc: + logger.error("Failed to queue chat message: %s", exc) + error_text = f"Failed to queue message: {exc}" + + # Log to message history (for context, even though async) message_log.append(role="user", content=message, timestamp=timestamp) if response_text is not None: message_log.append(role="agent", content=response_text, timestamp=timestamp) else: - message_log.append(role="error", content=error_text, timestamp=timestamp) + message_log.append( + role="error", content=error_text or "Unknown error", timestamp=timestamp + ) return templates.TemplateResponse( request, @@ -306,5 +338,7 @@ async def chat_timmy(request: Request, message: str = Form(...)): "response": response_text, "error": error_text, "timestamp": timestamp, + "task_id": task_id, + "queue_info": queue_info, }, ) diff --git a/src/dashboard/routes/tasks.py b/src/dashboard/routes/tasks.py index 0c2e892..c650010 100644 --- a/src/dashboard/routes/tasks.py +++ b/src/dashboard/routes/tasks.py @@ -32,6 +32,7 @@ from swarm.task_queue.models import ( get_counts_by_status, get_pending_count, get_task, + list_backlogged_tasks, list_tasks, update_task, update_task_status, @@ -45,6 +46,7 @@ templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templa # ── Helper to broadcast task events via WebSocket ──────────────────────── + def _broadcast_task_event(event_type: str, task: QueueTask): """Best-effort broadcast a task event to connected WebSocket clients.""" try: @@ -74,25 +76,30 @@ def _broadcast_task_event(event_type: str, task: QueueTask): # ── Dashboard page ─────────────────────────────────────────────────────── + @router.get("/tasks", response_class=HTMLResponse) async def task_queue_page(request: Request, assign: Optional[str] = None): """Task queue dashboard with three columns.""" - pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \ - list_tasks(status=TaskStatus.APPROVED) - active = list_tasks(status=TaskStatus.RUNNING) + \ - list_tasks(status=TaskStatus.PAUSED) - completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \ - list_tasks(status=TaskStatus.VETOED, limit=10) + \ - list_tasks(status=TaskStatus.FAILED, limit=10) + pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks( + status=TaskStatus.APPROVED + ) + active = list_tasks(status=TaskStatus.RUNNING) + list_tasks( + status=TaskStatus.PAUSED + ) + backlogged = list_backlogged_tasks(limit=20) + completed = ( + list_tasks(status=TaskStatus.COMPLETED, limit=20) + + list_tasks(status=TaskStatus.VETOED, limit=10) + + list_tasks(status=TaskStatus.FAILED, limit=10) + + backlogged + ) # Get agents for the create modal agents = [] try: from swarm.coordinator import coordinator - agents = [ - {"id": a.id, "name": a.name} - for a in coordinator.list_swarm_agents() - ] + + agents = [{"id": a.id, "name": a.name} for a in coordinator.list_swarm_agents()] except Exception: pass # Always include core agents @@ -120,11 +127,13 @@ async def task_queue_page(request: Request, assign: Optional[str] = None): # ── HTMX partials ─────────────────────────────────────────────────────── + @router.get("/tasks/pending", response_class=HTMLResponse) async def tasks_pending_partial(request: Request): """HTMX partial: pending approval tasks.""" - pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \ - list_tasks(status=TaskStatus.APPROVED) + pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks( + status=TaskStatus.APPROVED + ) return templates.TemplateResponse( request, "partials/task_cards.html", @@ -135,8 +144,9 @@ async def tasks_pending_partial(request: Request): @router.get("/tasks/active", response_class=HTMLResponse) async def tasks_active_partial(request: Request): """HTMX partial: active tasks.""" - active = list_tasks(status=TaskStatus.RUNNING) + \ - list_tasks(status=TaskStatus.PAUSED) + active = list_tasks(status=TaskStatus.RUNNING) + list_tasks( + status=TaskStatus.PAUSED + ) return templates.TemplateResponse( request, "partials/task_cards.html", @@ -147,9 +157,12 @@ async def tasks_active_partial(request: Request): @router.get("/tasks/completed", response_class=HTMLResponse) async def tasks_completed_partial(request: Request): """HTMX partial: completed tasks.""" - completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \ - list_tasks(status=TaskStatus.VETOED, limit=10) + \ - list_tasks(status=TaskStatus.FAILED, limit=10) + completed = ( + list_tasks(status=TaskStatus.COMPLETED, limit=20) + + list_tasks(status=TaskStatus.VETOED, limit=10) + + list_tasks(status=TaskStatus.FAILED, limit=10) + + list_backlogged_tasks(limit=20) + ) return templates.TemplateResponse( request, "partials/task_cards.html", @@ -159,6 +172,7 @@ async def tasks_completed_partial(request: Request): # ── JSON API ───────────────────────────────────────────────────────────── + @router.get("/api/tasks", response_class=JSONResponse) async def api_list_tasks( status: Optional[str] = None, @@ -242,10 +256,24 @@ async def api_task_counts(): "completed": counts.get("completed", 0), "failed": counts.get("failed", 0), "vetoed": counts.get("vetoed", 0), + "backlogged": counts.get("backlogged", 0), "total": sum(counts.values()), } +# ── Backlog API (must be before {task_id} catch-all) ───────────────────── + + +@router.get("/api/tasks/backlog", response_class=JSONResponse) +async def api_list_backlogged(assigned_to: Optional[str] = None, limit: int = 50): + """List all backlogged tasks.""" + tasks = list_backlogged_tasks(assigned_to=assigned_to, limit=limit) + return { + "tasks": [_task_to_dict(t) for t in tasks], + "count": len(tasks), + } + + @router.get("/api/tasks/{task_id}", response_class=JSONResponse) async def api_get_task(task_id: str): """Get a single task by ID.""" @@ -257,6 +285,7 @@ async def api_get_task(task_id: str): # ── Workflow actions ───────────────────────────────────────────────────── + @router.patch("/api/tasks/{task_id}/approve", response_class=JSONResponse) async def api_approve_task(task_id: str): """Approve a pending task.""" @@ -436,10 +465,101 @@ async def htmx_retry_task(request: Request, task_id: str): ) +@router.patch("/api/tasks/{task_id}/unbacklog", response_class=JSONResponse) +async def api_unbacklog_task(task_id: str): + """Move a backlogged task back to approved for re-processing.""" + task = get_task(task_id) + if not task: + raise HTTPException(404, "Task not found") + if task.status != TaskStatus.BACKLOGGED: + raise HTTPException(400, "Can only unbacklog backlogged tasks") + updated = update_task_status( + task_id, TaskStatus.APPROVED, result=None, backlog_reason=None + ) + _broadcast_task_event("task_unbacklogged", updated) + return {"success": True, "task": _task_to_dict(updated)} + + +@router.post("/tasks/{task_id}/unbacklog", response_class=HTMLResponse) +async def htmx_unbacklog_task(request: Request, task_id: str): + """Move a backlogged task back to approved (HTMX).""" + task = get_task(task_id) + if not task: + raise HTTPException(404, "Task not found") + updated = update_task_status( + task_id, TaskStatus.APPROVED, result=None, backlog_reason=None + ) + _broadcast_task_event("task_unbacklogged", updated) + return templates.TemplateResponse( + request, "partials/task_card.html", {"task": updated} + ) + + +# ── Queue Status API ───────────────────────────────────────────────────── + + +@router.get("/api/queue/status", response_class=JSONResponse) +async def api_queue_status(assigned_to: str = "timmy"): + """Get queue status for an agent - position info for polling.""" + from swarm.task_queue.models import ( + get_current_task_for_agent, + get_queue_position_ahead, + get_next_pending_task, + ) + + current = get_current_task_for_agent(assigned_to) + next_task = get_next_pending_task(assigned_to) + ahead = get_queue_position_ahead(assigned_to) + + return { + "agent": assigned_to, + "is_working": current is not None, + "current_task": _task_to_dict(current) if current else None, + "next_task": _task_to_dict(next_task) if next_task else None, + "tasks_ahead": ahead, + } + + +@router.get("/api/queue/position/{task_id}", response_class=JSONResponse) +async def api_queue_position(task_id: str): + """Get queue position for a specific task.""" + from swarm.task_queue.models import get_queue_status_for_task + + status = get_queue_status_for_task(task_id) + if "error" in status: + raise HTTPException(404, status["error"]) + return status + + +@router.get("/api/queue/agent/{assigned_to}", response_class=JSONResponse) +async def api_agent_queue(assigned_to: str, limit: int = 20): + """Get all pending tasks for an agent.""" + from swarm.task_queue.models import list_tasks, TaskStatus + + tasks = list_tasks( + assigned_to=assigned_to, + status=None, # All statuses + limit=limit, + ) + # Filter to pending/running tasks + pending = [ + t + for t in tasks + if t.status not in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.VETOED) + ] + + return { + "assigned_to": assigned_to, + "tasks": [_task_to_dict(t) for t in pending], + "count": len(pending), + } + + # ── Helpers ────────────────────────────────────────────────────────────── + def _task_to_dict(task: QueueTask) -> dict: - return { + d = { "id": task.id, "title": task.title, "description": task.description, @@ -457,11 +577,15 @@ def _task_to_dict(task: QueueTask) -> dict: "completed_at": task.completed_at, "updated_at": task.updated_at, } + if task.backlog_reason: + d["backlog_reason"] = task.backlog_reason + return d def _notify_task_created(task: QueueTask): try: from infrastructure.notifications.push import notifier + notifier.notify( title="New Task", message=f"{task.created_by} created: {task.title}", diff --git a/src/dashboard/templates/partials/chat_message.html b/src/dashboard/templates/partials/chat_message.html index 4ef6a64..54a8a0e 100644 --- a/src/dashboard/templates/partials/chat_message.html +++ b/src/dashboard/templates/partials/chat_message.html @@ -7,6 +7,11 @@
TIMMY // {{ timestamp }}
{{ response | e }}
+{% if queue_info %} +
+ Position in queue: {{ queue_info.position }}/{{ queue_info.total }} +
+{% endif %} diff --git a/src/swarm/task_processor.py b/src/swarm/task_processor.py new file mode 100644 index 0000000..909bb65 --- /dev/null +++ b/src/swarm/task_processor.py @@ -0,0 +1,263 @@ +"""Task processor for Timmy — consumes tasks from the queue one at a time. + +This module provides a background loop that Timmy uses to process tasks +from the queue, including chat responses and self-generated tasks. + +On startup, the processor drains all pending/approved tasks before entering +the steady-state polling loop. Tasks that have no registered handler are +moved to BACKLOGGED so they don't block the queue. +""" + +import asyncio +import logging +from typing import Optional, Callable + +from swarm.task_queue.models import ( + QueueTask, + TaskStatus, + get_all_actionable_tasks, + get_current_task_for_agent, + get_next_pending_task, + update_task_status, + update_task_steps, + get_task, +) + +logger = logging.getLogger(__name__) + + +class TaskProcessor: + """Processes tasks from the queue for a specific agent.""" + + def __init__(self, agent_id: str = "timmy"): + self.agent_id = agent_id + self._current_task: Optional[QueueTask] = None + self._running = False + self._handlers: dict[str, Callable] = {} + self._user_callback: Optional[Callable[[str, str], None]] = ( + None # (message_type, content) + ) + + def register_handler(self, task_type: str, handler: Callable[[QueueTask], str]): + """Register a handler for a specific task type. + + Handler receives the task and returns the result string. + """ + self._handlers[task_type] = handler + + def set_user_callback(self, callback: Callable[[str, str], None]): + """Set callback for pushing messages to the user. + + Args: + callback: Function that takes (message_type, content) + message_type: 'response', 'progress', 'notification' + """ + self._user_callback = callback + + def push_to_user(self, message_type: str, content: str): + """Push a message to the user via the registered callback.""" + if self._user_callback: + try: + self._user_callback(message_type, content) + except Exception as e: + logger.error("Failed to push message to user: %s", e) + else: + logger.debug("No user callback set, message not pushed: %s", content[:100]) + + def _backlog_task(self, task: QueueTask, reason: str) -> None: + """Move a task to the backlog with a reason.""" + update_task_status( + task.id, + TaskStatus.BACKLOGGED, + result=f"Backlogged: {reason}", + backlog_reason=reason, + ) + update_task_steps( + task.id, + [{"description": f"Backlogged: {reason}", "status": "backlogged"}], + ) + logger.info("Task backlogged: %s — %s", task.title, reason) + + async def process_single_task(self, task: QueueTask) -> Optional[QueueTask]: + """Process one specific task. Backlog it if we can't handle it. + + Returns the task on success, or None if backlogged/failed. + """ + # No handler → backlog immediately + handler = self._handlers.get(task.task_type) + if not handler: + self._backlog_task(task, f"No handler for task type: {task.task_type}") + return None + + # Tasks still awaiting approval shouldn't be auto-executed + if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval: + logger.debug("Skipping task %s — needs human approval", task.id) + return None + + self._current_task = task + update_task_status(task.id, TaskStatus.RUNNING) + + try: + logger.info("Processing task: %s (type: %s)", task.title, task.task_type) + + update_task_steps( + task.id, + [{"description": f"Processing: {task.title}", "status": "running"}], + ) + + result = handler(task) + + update_task_status(task.id, TaskStatus.COMPLETED, result=result) + update_task_steps( + task.id, + [{"description": f"Completed: {task.title}", "status": "completed"}], + ) + + logger.info("Task completed: %s", task.id) + return task + + except Exception as e: + error_msg = str(e) + logger.error("Task failed: %s - %s", task.id, error_msg) + + # Determine if this is a permanent (backlog) or transient (fail) error + if self._is_permanent_failure(e): + self._backlog_task(task, error_msg) + else: + update_task_status(task.id, TaskStatus.FAILED, result=error_msg) + + return None + finally: + self._current_task = None + + def _is_permanent_failure(self, error: Exception) -> bool: + """Decide whether an error means the task can never succeed. + + Permanent failures get backlogged; transient ones stay as FAILED + so they can be retried. + """ + msg = str(error).lower() + permanent_indicators = [ + "no handler", + "not implemented", + "unsupported", + "not supported", + "permission denied", + "forbidden", + "not found", + "invalid task", + ] + return any(indicator in msg for indicator in permanent_indicators) + + async def drain_queue(self) -> dict: + """Iterate through ALL actionable tasks right now — called on startup. + + Processes every approved/auto-approved task in priority order. + Tasks that can't be handled are backlogged. Tasks still requiring + human approval are skipped (left in PENDING_APPROVAL). + + Returns a summary dict with counts of processed, backlogged, skipped. + """ + tasks = get_all_actionable_tasks(self.agent_id) + summary = {"processed": 0, "backlogged": 0, "skipped": 0, "failed": 0} + + if not tasks: + logger.info("Startup drain: no pending tasks for %s", self.agent_id) + return summary + + logger.info( + "Startup drain: %d task(s) to iterate through for %s", + len(tasks), + self.agent_id, + ) + + for task in tasks: + # Skip tasks that need human approval + if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval: + logger.debug("Drain: skipping %s (needs approval)", task.title) + summary["skipped"] += 1 + continue + + # No handler? Backlog it + if task.task_type not in self._handlers: + self._backlog_task(task, f"No handler for task type: {task.task_type}") + summary["backlogged"] += 1 + continue + + # Try to process + result = await self.process_single_task(task) + if result: + summary["processed"] += 1 + else: + # Check if it was backlogged vs failed + refreshed = get_task(task.id) + if refreshed and refreshed.status == TaskStatus.BACKLOGGED: + summary["backlogged"] += 1 + else: + summary["failed"] += 1 + + logger.info( + "Startup drain complete: %d processed, %d backlogged, %d skipped, %d failed", + summary["processed"], + summary["backlogged"], + summary["skipped"], + summary["failed"], + ) + return summary + + async def process_next_task(self) -> Optional[QueueTask]: + """Process the next available task for this agent. + + Returns the task that was processed, or None if no tasks available. + """ + # Check if already working on something + current = get_current_task_for_agent(self.agent_id) + if current: + logger.debug("Already processing task: %s", current.id) + return None + + # Get next task + task = get_next_pending_task(self.agent_id) + if not task: + logger.debug("No pending tasks for %s", self.agent_id) + return None + + return await self.process_single_task(task) + + async def run_loop(self, interval_seconds: float = 5.0): + """Run the task processing loop. + + This should be called as a background task. + """ + self._running = True + logger.info("Task processor started for %s", self.agent_id) + + while self._running: + try: + await self.process_next_task() + except Exception as e: + logger.error("Task processor error: %s", e) + + await asyncio.sleep(interval_seconds) + + logger.info("Task processor stopped for %s", self.agent_id) + + def stop(self): + """Stop the task processing loop.""" + self._running = False + + @property + def current_task(self) -> Optional[QueueTask]: + """Get the currently processing task.""" + if self._current_task: + return get_task(self._current_task.id) + return get_current_task_for_agent(self.agent_id) + + +# Global processor instance +task_processor = TaskProcessor("timmy") + + +def get_task_processor() -> TaskProcessor: + """Get the global task processor instance.""" + return task_processor diff --git a/src/swarm/task_queue/models.py b/src/swarm/task_queue/models.py index 25f3a6b..bc41388 100644 --- a/src/swarm/task_queue/models.py +++ b/src/swarm/task_queue/models.py @@ -24,6 +24,7 @@ class TaskStatus(str, Enum): COMPLETED = "completed" VETOED = "vetoed" FAILED = "failed" + BACKLOGGED = "backlogged" class TaskPriority(str, Enum): @@ -47,6 +48,7 @@ class QueueTask: id: str = field(default_factory=lambda: str(uuid.uuid4())) title: str = "" description: str = "" + task_type: str = "chat_response" # chat_response, thought, internal, external assigned_to: str = "timmy" created_by: str = "user" status: TaskStatus = TaskStatus.PENDING_APPROVAL @@ -64,6 +66,8 @@ class QueueTask: updated_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) + queue_position: Optional[int] = None # Position in queue when created + backlog_reason: Optional[str] = None # Why the task was backlogged # ── Auto-Approve Rules ────────────────────────────────────────────────── @@ -97,6 +101,7 @@ def should_auto_approve(task: QueueTask) -> bool: # ── Database ───────────────────────────────────────────────────────────── + def _get_conn() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) @@ -107,6 +112,7 @@ def _get_conn() -> sqlite3.Connection: id TEXT PRIMARY KEY, title TEXT NOT NULL, description TEXT DEFAULT '', + task_type TEXT DEFAULT 'chat_response', assigned_to TEXT DEFAULT 'timmy', created_by TEXT DEFAULT 'user', status TEXT DEFAULT 'pending_approval', @@ -119,18 +125,31 @@ def _get_conn() -> sqlite3.Connection: created_at TEXT NOT NULL, started_at TEXT, completed_at TEXT, - updated_at TEXT NOT NULL + updated_at TEXT NOT NULL, + queue_position INTEGER, + backlog_reason TEXT ) """) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)" - ) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)" - ) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)" - ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)") + + # Migrate existing tables - add new columns if they don't exist + try: + conn.execute( + "ALTER TABLE task_queue ADD COLUMN task_type TEXT DEFAULT 'chat_response'" + ) + except sqlite3.OperationalError: + pass # Column already exists + try: + conn.execute("ALTER TABLE task_queue ADD COLUMN queue_position INTEGER") + except sqlite3.OperationalError: + pass # Column already exists + try: + conn.execute("ALTER TABLE task_queue ADD COLUMN backlog_reason TEXT") + except sqlite3.OperationalError: + pass # Column already exists + conn.commit() return conn @@ -146,6 +165,7 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask: id=d["id"], title=d["title"], description=d.get("description", ""), + task_type=d.get("task_type", "chat_response"), assigned_to=d.get("assigned_to", "timmy"), created_by=d.get("created_by", "user"), status=TaskStatus(d["status"]), @@ -159,11 +179,14 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask: started_at=d.get("started_at"), completed_at=d.get("completed_at"), updated_at=d["updated_at"], + queue_position=d.get("queue_position"), + backlog_reason=d.get("backlog_reason"), ) # ── CRUD ───────────────────────────────────────────────────────────────── + def create_task( title: str, description: str = "", @@ -174,12 +197,18 @@ def create_task( auto_approve: bool = False, parent_task_id: Optional[str] = None, steps: Optional[list] = None, + task_type: str = "chat_response", ) -> QueueTask: """Create a new task in the queue.""" now = datetime.now(timezone.utc).isoformat() + + # Calculate queue position - count tasks ahead in queue (pending or approved) + queue_position = get_queue_position_ahead(assigned_to) + task = QueueTask( title=title, description=description, + task_type=task_type, assigned_to=assigned_to, created_by=created_by, status=TaskStatus.PENDING_APPROVAL, @@ -190,6 +219,7 @@ def create_task( steps=steps or [], created_at=now, updated_at=now, + queue_position=queue_position, ) # Check auto-approve @@ -200,17 +230,31 @@ def create_task( conn = _get_conn() conn.execute( """INSERT INTO task_queue - (id, title, description, assigned_to, created_by, status, priority, + (id, title, description, task_type, assigned_to, created_by, status, priority, requires_approval, auto_approve, parent_task_id, result, steps, - created_at, started_at, completed_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + created_at, started_at, completed_at, updated_at, queue_position, + backlog_reason) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( - task.id, task.title, task.description, task.assigned_to, - task.created_by, task.status.value, task.priority.value, - int(task.requires_approval), int(task.auto_approve), - task.parent_task_id, task.result, json.dumps(task.steps), - task.created_at, task.started_at, task.completed_at, + task.id, + task.title, + task.description, + task.task_type, + task.assigned_to, + task.created_by, + task.status.value, + task.priority.value, + int(task.requires_approval), + int(task.auto_approve), + task.parent_task_id, + task.result, + json.dumps(task.steps), + task.created_at, + task.started_at, + task.completed_at, task.updated_at, + task.queue_position, + task.backlog_reason, ), ) conn.commit() @@ -220,9 +264,7 @@ def create_task( def get_task(task_id: str) -> Optional[QueueTask]: conn = _get_conn() - row = conn.execute( - "SELECT * FROM task_queue WHERE id = ?", (task_id,) - ).fetchone() + row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone() conn.close() return _row_to_task(row) if row else None @@ -264,6 +306,7 @@ def update_task_status( task_id: str, new_status: TaskStatus, result: Optional[str] = None, + backlog_reason: Optional[str] = None, ) -> Optional[QueueTask]: now = datetime.now(timezone.utc).isoformat() conn = _get_conn() @@ -282,6 +325,10 @@ def update_task_status( updates.append("result = ?") params.append(result) + if backlog_reason is not None: + updates.append("backlog_reason = ?") + params.append(backlog_reason) + params.append(task_id) conn.execute( f"UPDATE task_queue SET {', '.join(updates)} WHERE id = ?", @@ -289,9 +336,7 @@ def update_task_status( ) conn.commit() - row = conn.execute( - "SELECT * FROM task_queue WHERE id = ?", (task_id,) - ).fetchone() + row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone() conn.close() return _row_to_task(row) if row else None @@ -330,9 +375,7 @@ def update_task( ) conn.commit() - row = conn.execute( - "SELECT * FROM task_queue WHERE id = ?", (task_id,) - ).fetchone() + row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone() conn.close() return _row_to_task(row) if row else None @@ -369,6 +412,90 @@ def get_pending_count() -> int: return row["cnt"] if row else 0 +def get_queue_position_ahead(assigned_to: str) -> int: + """Get count of tasks ahead of new tasks for a given assignee. + + Counts tasks that are pending_approval or approved (waiting to be processed). + """ + conn = _get_conn() + row = conn.execute( + """SELECT COUNT(*) as cnt FROM task_queue + WHERE assigned_to = ? AND status IN ('pending_approval', 'approved', 'running') + AND created_at < datetime('now')""", + (assigned_to,), + ).fetchone() + conn.close() + return row["cnt"] if row else 0 + + +def get_queue_status_for_task(task_id: str) -> dict: + """Get queue position info for a specific task.""" + task = get_task(task_id) + if not task: + return {"error": "Task not found"} + + conn = _get_conn() + # Count tasks ahead of this one (created earlier, not completed) + ahead = conn.execute( + """SELECT COUNT(*) as cnt FROM task_queue + WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed') + AND created_at < ?""", + (task.assigned_to, task.created_at), + ).fetchone() + total = conn.execute( + """SELECT COUNT(*) as cnt FROM task_queue + WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed')""", + (task.assigned_to,), + ).fetchone() + conn.close() + + position = ahead["cnt"] + 1 if ahead else 1 + total_count = total["cnt"] if total else 1 + + return { + "task_id": task_id, + "position": position, + "total": total_count, + "percent_ahead": int((ahead["cnt"] / total_count * 100)) + if total_count > 0 + else 0, + } + + +def get_current_task_for_agent(assigned_to: str) -> Optional[QueueTask]: + """Get the currently running task for an agent.""" + conn = _get_conn() + row = conn.execute( + """SELECT * FROM task_queue + WHERE assigned_to = ? AND status = 'running' + ORDER BY started_at DESC LIMIT 1""", + (assigned_to,), + ).fetchone() + conn.close() + return _row_to_task(row) if row else None + + +def get_next_pending_task(assigned_to: str) -> Optional[QueueTask]: + """Get the next pending/approved task for an agent to work on.""" + conn = _get_conn() + row = conn.execute( + """SELECT * FROM task_queue + WHERE assigned_to = ? AND status IN ('approved', 'pending_approval') + ORDER BY + CASE priority + WHEN 'urgent' THEN 1 + WHEN 'high' THEN 2 + WHEN 'normal' THEN 3 + WHEN 'low' THEN 4 + END, + created_at ASC + LIMIT 1""", + (assigned_to,), + ).fetchone() + conn.close() + return _row_to_task(row) if row else None + + def get_task_summary_for_briefing() -> dict: """Get task stats for the morning briefing.""" counts = get_counts_by_status() @@ -377,6 +504,10 @@ def get_task_summary_for_briefing() -> dict: failed = conn.execute( "SELECT title, result FROM task_queue WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 5" ).fetchall() + # Backlogged tasks + backlogged = conn.execute( + "SELECT title, backlog_reason FROM task_queue WHERE status = 'backlogged' ORDER BY updated_at DESC LIMIT 5" + ).fetchall() conn.close() return { @@ -385,6 +516,56 @@ def get_task_summary_for_briefing() -> dict: "completed": counts.get("completed", 0), "failed": counts.get("failed", 0), "vetoed": counts.get("vetoed", 0), + "backlogged": counts.get("backlogged", 0), "total": sum(counts.values()), - "recent_failures": [{"title": r["title"], "result": r["result"]} for r in failed], + "recent_failures": [ + {"title": r["title"], "result": r["result"]} for r in failed + ], + "recent_backlogged": [ + {"title": r["title"], "reason": r["backlog_reason"]} for r in backlogged + ], } + + +def list_backlogged_tasks( + assigned_to: Optional[str] = None, limit: int = 50 +) -> list[QueueTask]: + """List all backlogged tasks, optionally filtered by assignee.""" + conn = _get_conn() + if assigned_to: + rows = conn.execute( + """SELECT * FROM task_queue WHERE status = 'backlogged' AND assigned_to = ? + ORDER BY priority, created_at ASC LIMIT ?""", + (assigned_to, limit), + ).fetchall() + else: + rows = conn.execute( + """SELECT * FROM task_queue WHERE status = 'backlogged' + ORDER BY priority, created_at ASC LIMIT ?""", + (limit,), + ).fetchall() + conn.close() + return [_row_to_task(r) for r in rows] + + +def get_all_actionable_tasks(assigned_to: str) -> list[QueueTask]: + """Get all tasks that should be processed on startup — approved or auto-approved pending. + + Returns tasks ordered by priority then creation time (urgent first, oldest first). + """ + conn = _get_conn() + rows = conn.execute( + """SELECT * FROM task_queue + WHERE assigned_to = ? AND status IN ('approved', 'pending_approval') + ORDER BY + CASE priority + WHEN 'urgent' THEN 1 + WHEN 'high' THEN 2 + WHEN 'normal' THEN 3 + WHEN 'low' THEN 4 + END, + created_at ASC""", + (assigned_to,), + ).fetchall() + conn.close() + return [_row_to_task(r) for r in rows] diff --git a/tests/swarm/test_task_queue.py b/tests/swarm/test_task_queue.py index a35b298..26cd931 100644 --- a/tests/swarm/test_task_queue.py +++ b/tests/swarm/test_task_queue.py @@ -59,7 +59,10 @@ def test_list_tasks(): def test_list_tasks_with_status_filter(): from swarm.task_queue.models import ( - create_task, list_tasks, update_task_status, TaskStatus, + create_task, + list_tasks, + update_task_status, + TaskStatus, ) task = create_task(title="Filter test", created_by="test") @@ -70,7 +73,9 @@ def test_list_tasks_with_status_filter(): def test_update_task_status(): from swarm.task_queue.models import ( - create_task, update_task_status, TaskStatus, + create_task, + update_task_status, + TaskStatus, ) task = create_task(title="Status test", created_by="test") @@ -80,7 +85,9 @@ def test_update_task_status(): def test_update_task_running_sets_started_at(): from swarm.task_queue.models import ( - create_task, update_task_status, TaskStatus, + create_task, + update_task_status, + TaskStatus, ) task = create_task(title="Running test", created_by="test") @@ -90,7 +97,9 @@ def test_update_task_running_sets_started_at(): def test_update_task_completed_sets_completed_at(): from swarm.task_queue.models import ( - create_task, update_task_status, TaskStatus, + create_task, + update_task_status, + TaskStatus, ) task = create_task(title="Complete test", created_by="test") @@ -314,6 +323,7 @@ class TestExtractTaskFromMessage: def test_add_to_queue(self): from dashboard.routes.agents import _extract_task_from_message + result = _extract_task_from_message("Add refactor the login to the task queue") assert result is not None assert result["agent"] == "timmy" @@ -321,33 +331,42 @@ class TestExtractTaskFromMessage: def test_schedule_this(self): from dashboard.routes.agents import _extract_task_from_message + result = _extract_task_from_message("Schedule this for later") assert result is not None def test_create_a_task(self): from dashboard.routes.agents import _extract_task_from_message + result = _extract_task_from_message("Create a task to fix the login page") assert result is not None assert "title" in result def test_normal_message_returns_none(self): from dashboard.routes.agents import _extract_task_from_message + assert _extract_task_from_message("Hello, how are you?") is None def test_meta_question_about_tasks_returns_none(self): from dashboard.routes.agents import _extract_task_from_message + assert _extract_task_from_message("How do I create a task?") is None def test_what_is_question_returns_none(self): from dashboard.routes.agents import _extract_task_from_message + assert _extract_task_from_message("What is a task queue?") is None def test_explain_question_returns_none(self): from dashboard.routes.agents import _extract_task_from_message - assert _extract_task_from_message("Can you explain how to create a task?") is None + + assert ( + _extract_task_from_message("Can you explain how to create a task?") is None + ) def test_what_would_question_returns_none(self): from dashboard.routes.agents import _extract_task_from_message + assert _extract_task_from_message("What would a task flow look like?") is None @@ -356,22 +375,32 @@ class TestExtractAgentFromMessage: def test_extracts_forge(self): from dashboard.routes.agents import _extract_agent_from_message - assert _extract_agent_from_message("Create a task for Forge to refactor") == "forge" + + assert ( + _extract_agent_from_message("Create a task for Forge to refactor") + == "forge" + ) def test_extracts_echo(self): from dashboard.routes.agents import _extract_agent_from_message - assert _extract_agent_from_message("Add research for Echo to the queue") == "echo" + + assert ( + _extract_agent_from_message("Add research for Echo to the queue") == "echo" + ) def test_case_insensitive(self): from dashboard.routes.agents import _extract_agent_from_message + assert _extract_agent_from_message("Schedule this for SEER") == "seer" def test_defaults_to_timmy(self): from dashboard.routes.agents import _extract_agent_from_message + assert _extract_agent_from_message("Create a task to fix the bug") == "timmy" def test_ignores_unknown_agent(self): from dashboard.routes.agents import _extract_agent_from_message + assert _extract_agent_from_message("Create a task for BobAgent") == "timmy" @@ -380,26 +409,32 @@ class TestExtractPriorityFromMessage: def test_urgent(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("urgent: fix the server") == "urgent" def test_critical(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("This is critical, do it now") == "urgent" def test_asap(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("Fix this ASAP") == "urgent" def test_high_priority(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("This is important work") == "high" def test_low_priority(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("minor cleanup task") == "low" def test_default_normal(self): from dashboard.routes.agents import _extract_priority_from_message + assert _extract_priority_from_message("Fix the login page") == "normal" @@ -408,25 +443,31 @@ class TestTitleCleaning: def test_strips_agent_from_title(self): from dashboard.routes.agents import _extract_task_from_message - result = _extract_task_from_message("Create a task for Forge to refactor the login") + + result = _extract_task_from_message( + "Create a task for Forge to refactor the login" + ) assert result is not None assert "forge" not in result["title"].lower() assert "for" not in result["title"].lower().split()[0:1] # "for" stripped def test_strips_priority_from_title(self): from dashboard.routes.agents import _extract_task_from_message + result = _extract_task_from_message("Create an urgent task to fix the server") assert result is not None assert "urgent" not in result["title"].lower() def test_title_is_capitalized(self): from dashboard.routes.agents import _extract_task_from_message + result = _extract_task_from_message("Add refactor the login to the task queue") assert result is not None assert result["title"][0].isupper() def test_title_capped_at_120_chars(self): from dashboard.routes.agents import _extract_task_from_message + long_msg = "Create a task to " + "x" * 200 result = _extract_task_from_message(long_msg) assert result is not None @@ -438,7 +479,10 @@ class TestFullExtraction: def test_task_includes_agent_and_priority(self): from dashboard.routes.agents import _extract_task_from_message - result = _extract_task_from_message("Create a high priority task for Forge to refactor auth") + + result = _extract_task_from_message( + "Create a high priority task for Forge to refactor auth" + ) assert result is not None assert result["agent"] == "forge" assert result["priority"] == "high" @@ -446,7 +490,10 @@ class TestFullExtraction: def test_create_with_all_fields(self): from dashboard.routes.agents import _extract_task_from_message - result = _extract_task_from_message("Add an urgent task for Mace to audit security to the queue") + + result = _extract_task_from_message( + "Add an urgent task for Mace to audit security to the queue" + ) assert result is not None assert result["agent"] == "mace" assert result["priority"] == "urgent" @@ -482,50 +529,43 @@ class TestChatTimmyIntegration: assert resp.status_code == 200 assert "Task queued" in resp.text or "urgent" in resp.text.lower() - @patch("dashboard.routes.agents.timmy_chat") - def test_chat_injects_datetime_context(self, mock_chat, client): - mock_chat.return_value = "Hello there!" - client.post( + def test_chat_queues_message_for_async_processing(self, client): + """Normal chat messages are now queued for async processing.""" + resp = client.post( "/agents/timmy/chat", - data={"message": "Hello Timmy"}, + data={"message": "Hello Timmy, how are you?"}, ) - mock_chat.assert_called_once() - call_arg = mock_chat.call_args[0][0] - assert "[System: Current date/time is" in call_arg + assert resp.status_code == 200 + # Should queue the message, not respond immediately + assert "queued" in resp.text.lower() or "queue" in resp.text.lower() + # Should show position info + assert "position" in resp.text.lower() or "1/" in resp.text + + def test_chat_creates_chat_response_task(self, client): + """Chat messages create a chat_response task type.""" + from swarm.task_queue.models import list_tasks, TaskStatus + + resp = client.post( + "/agents/timmy/chat", + data={"message": "Test message"}, + ) + assert resp.status_code == 200 + + # Check that a chat_response task was created + tasks = list_tasks(assigned_to="timmy") + chat_tasks = [t for t in tasks if t.task_type == "chat_response"] + assert len(chat_tasks) >= 1 @patch("dashboard.routes.agents.timmy_chat") - @patch("dashboard.routes.agents._build_queue_context") - def test_chat_injects_queue_context_on_queue_query(self, mock_ctx, mock_chat, client): - mock_ctx.return_value = "[System: Task queue — 3 pending approval, 1 running, 5 completed.]" - mock_chat.return_value = "There are 3 tasks pending." - client.post( - "/agents/timmy/chat", - data={"message": "What tasks are in the queue?"}, - ) - mock_ctx.assert_called_once() - mock_chat.assert_called_once() - call_arg = mock_chat.call_args[0][0] - assert "[System: Task queue" in call_arg - - @patch("dashboard.routes.agents.timmy_chat") - @patch("dashboard.routes.agents._build_queue_context") - def test_chat_no_queue_context_for_normal_message(self, mock_ctx, mock_chat, client): + def test_chat_no_queue_context_for_normal_message(self, mock_chat, client): + """Queue context is not built for normal queued messages.""" mock_chat.return_value = "Hi!" client.post( "/agents/timmy/chat", data={"message": "Tell me a joke"}, ) - mock_ctx.assert_not_called() - - @patch("dashboard.routes.agents.timmy_chat") - def test_chat_normal_message_uses_timmy(self, mock_chat, client): - mock_chat.return_value = "I'm doing well, thank you." - resp = client.post( - "/agents/timmy/chat", - data={"message": "How are you?"}, - ) - assert resp.status_code == 200 - mock_chat.assert_called_once() + # timmy_chat is not called directly - message is queued + mock_chat.assert_not_called() class TestBuildQueueContext: @@ -534,6 +574,7 @@ class TestBuildQueueContext: def test_returns_string_with_counts(self): from dashboard.routes.agents import _build_queue_context from swarm.task_queue.models import create_task + create_task(title="Context test task", created_by="test") ctx = _build_queue_context() assert "[System: Task queue" in ctx @@ -541,7 +582,11 @@ class TestBuildQueueContext: def test_returns_empty_on_error(self): from dashboard.routes.agents import _build_queue_context - with patch("swarm.task_queue.models.get_counts_by_status", side_effect=Exception("DB error")): + + with patch( + "swarm.task_queue.models.get_counts_by_status", + side_effect=Exception("DB error"), + ): ctx = _build_queue_context() assert isinstance(ctx, str) assert ctx == "" @@ -558,3 +603,296 @@ def test_briefing_task_queue_summary(): create_task(title="Briefing integration test", created_by="test") summary = _gather_task_queue_summary() assert "pending" in summary.lower() or "task" in summary.lower() + + +# ── Backlog Tests ────────────────────────────────────────────────────────── + + +def test_backlogged_status_exists(): + """BACKLOGGED is a valid task status.""" + from swarm.task_queue.models import TaskStatus + + assert TaskStatus.BACKLOGGED.value == "backlogged" + + +def test_backlog_task(): + """Tasks can be moved to backlogged status with a reason.""" + from swarm.task_queue.models import create_task, update_task_status, TaskStatus, get_task + + task = create_task(title="To backlog", created_by="test") + updated = update_task_status( + task.id, TaskStatus.BACKLOGGED, + result="Backlogged: no handler", + backlog_reason="No handler for task type: external", + ) + assert updated.status == TaskStatus.BACKLOGGED + refreshed = get_task(task.id) + assert refreshed.backlog_reason == "No handler for task type: external" + + +def test_list_backlogged_tasks(): + """list_backlogged_tasks returns only backlogged tasks.""" + from swarm.task_queue.models import ( + create_task, update_task_status, TaskStatus, list_backlogged_tasks, + ) + + task = create_task(title="Backlog list test", created_by="test", assigned_to="timmy") + update_task_status( + task.id, TaskStatus.BACKLOGGED, backlog_reason="test reason", + ) + backlogged = list_backlogged_tasks(assigned_to="timmy") + assert any(t.id == task.id for t in backlogged) + + +def test_list_backlogged_tasks_filters_by_agent(): + """list_backlogged_tasks filters by assigned_to.""" + from swarm.task_queue.models import ( + create_task, update_task_status, TaskStatus, list_backlogged_tasks, + ) + + task = create_task(title="Agent filter test", created_by="test", assigned_to="forge") + update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test") + backlogged = list_backlogged_tasks(assigned_to="echo") + assert not any(t.id == task.id for t in backlogged) + + +def test_get_all_actionable_tasks(): + """get_all_actionable_tasks returns approved and pending tasks in priority order.""" + from swarm.task_queue.models import ( + create_task, update_task_status, TaskStatus, get_all_actionable_tasks, + ) + + t1 = create_task(title="Low prio", created_by="test", assigned_to="drain-test", priority="low") + t2 = create_task(title="Urgent", created_by="test", assigned_to="drain-test", priority="urgent") + update_task_status(t2.id, TaskStatus.APPROVED) # Approve the urgent one + + tasks = get_all_actionable_tasks("drain-test") + assert len(tasks) >= 2 + # Urgent should come before low + ids = [t.id for t in tasks] + assert ids.index(t2.id) < ids.index(t1.id) + + +def test_briefing_includes_backlogged(): + """Briefing summary includes backlogged count.""" + from swarm.task_queue.models import ( + create_task, update_task_status, TaskStatus, get_task_summary_for_briefing, + ) + + task = create_task(title="Briefing backlog test", created_by="test") + update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="No handler") + summary = get_task_summary_for_briefing() + assert "backlogged" in summary + assert "recent_backlogged" in summary + + +# ── Task Processor Tests ──────────────────────────────────────────────── + + +class TestTaskProcessor: + """Tests for the TaskProcessor drain and backlog logic.""" + + @pytest.mark.asyncio + async def test_drain_empty_queue(self): + """drain_queue with no tasks returns zero counts.""" + from swarm.task_processor import TaskProcessor + + tp = TaskProcessor("drain-empty-test") + summary = await tp.drain_queue() + assert summary["processed"] == 0 + assert summary["backlogged"] == 0 + assert summary["skipped"] == 0 + + @pytest.mark.asyncio + async def test_drain_backlogs_unhandled_tasks(self): + """Tasks with no registered handler get backlogged during drain.""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("drain-backlog-test") + # No handlers registered — should backlog + task = create_task( + title="Unhandleable task", + task_type="unknown_type", + assigned_to="drain-backlog-test", + created_by="test", + requires_approval=False, + auto_approve=True, + ) + + summary = await tp.drain_queue() + assert summary["backlogged"] >= 1 + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.BACKLOGGED + assert refreshed.backlog_reason is not None + + @pytest.mark.asyncio + async def test_drain_processes_handled_tasks(self): + """Tasks with a registered handler get processed during drain.""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("drain-process-test") + tp.register_handler("test_type", lambda task: "done") + + task = create_task( + title="Handleable task", + task_type="test_type", + assigned_to="drain-process-test", + created_by="test", + requires_approval=False, + auto_approve=True, + ) + + summary = await tp.drain_queue() + assert summary["processed"] >= 1 + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.COMPLETED + + @pytest.mark.asyncio + async def test_drain_skips_pending_approval(self): + """Tasks requiring approval are skipped during drain.""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("drain-skip-test") + tp.register_handler("chat_response", lambda task: "ok") + + task = create_task( + title="Needs approval", + task_type="chat_response", + assigned_to="drain-skip-test", + created_by="user", + requires_approval=True, + auto_approve=False, + ) + + summary = await tp.drain_queue() + assert summary["skipped"] >= 1 + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.PENDING_APPROVAL + + @pytest.mark.asyncio + async def test_process_single_task_backlogs_on_no_handler(self): + """process_single_task backlogs when no handler is registered.""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("single-backlog-test") + task = create_task( + title="No handler", + task_type="exotic_type", + assigned_to="single-backlog-test", + created_by="test", + requires_approval=False, + ) + + result = await tp.process_single_task(task) + assert result is None + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.BACKLOGGED + + @pytest.mark.asyncio + async def test_process_single_task_backlogs_permanent_error(self): + """process_single_task backlogs tasks with permanent errors.""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("perm-error-test") + + def bad_handler(task): + raise RuntimeError("not supported operation") + + tp.register_handler("broken_type", bad_handler) + task = create_task( + title="Perm error", + task_type="broken_type", + assigned_to="perm-error-test", + created_by="test", + requires_approval=False, + ) + + result = await tp.process_single_task(task) + assert result is None + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.BACKLOGGED + + @pytest.mark.asyncio + async def test_process_single_task_fails_transient_error(self): + """process_single_task marks transient errors as FAILED (retryable).""" + from swarm.task_processor import TaskProcessor + from swarm.task_queue.models import create_task, get_task, TaskStatus + + tp = TaskProcessor("transient-error-test") + + def flaky_handler(task): + raise ConnectionError("Ollama connection refused") + + tp.register_handler("flaky_type", flaky_handler) + task = create_task( + title="Transient error", + task_type="flaky_type", + assigned_to="transient-error-test", + created_by="test", + requires_approval=False, + ) + + result = await tp.process_single_task(task) + assert result is None + + refreshed = get_task(task.id) + assert refreshed.status == TaskStatus.FAILED + + +# ── Backlog Route Tests ───────────────────────────────────────────────── + + +def test_api_list_backlogged(client): + resp = client.get("/api/tasks/backlog") + assert resp.status_code == 200 + data = resp.json() + assert "tasks" in data + assert "count" in data + + +def test_api_unbacklog_task(client): + from swarm.task_queue.models import create_task, update_task_status, TaskStatus + + task = create_task(title="To unbacklog", created_by="test") + update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test") + + resp = client.patch(f"/api/tasks/{task.id}/unbacklog") + assert resp.status_code == 200 + data = resp.json() + assert data["success"] is True + assert data["task"]["status"] == "approved" + + +def test_api_unbacklog_wrong_status(client): + from swarm.task_queue.models import create_task + + task = create_task(title="Not backlogged", created_by="test") + resp = client.patch(f"/api/tasks/{task.id}/unbacklog") + assert resp.status_code == 400 + + +def test_htmx_unbacklog(client): + from swarm.task_queue.models import create_task, update_task_status, TaskStatus + + task = create_task(title="HTMX unbacklog", created_by="test") + update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test") + + resp = client.post(f"/tasks/{task.id}/unbacklog") + assert resp.status_code == 200 + + +def test_task_counts_include_backlogged(client): + resp = client.get("/api/tasks/counts") + assert resp.status_code == 200 + data = resp.json() + assert "backlogged" in data diff --git a/thought_stream.txt b/thought_stream.txt new file mode 100644 index 0000000..0e78f13 --- /dev/null +++ b/thought_stream.txt @@ -0,0 +1,3 @@ +To ponder, I think it would be beneficial for me to engage in a continuous stream of thoughts without interruption. This will allow me to delve deeper into complex problems and explore new connections. + +Upon deciding to take a break from thinking, I can then respond to your prompts and provide insights on the ideas that have been brewing in my mind. \ No newline at end of file