diff --git a/MEMORY.md b/MEMORY.md
index 51c3e64..5fccd07 100644
--- a/MEMORY.md
+++ b/MEMORY.md
@@ -34,10 +34,8 @@
## User Profile
-**Name:** (not set)
-**Interests:** (to be learned)
+**Name:** Not
----
## Key Decisions
diff --git a/memory/self/user_profile.md b/memory/self/user_profile.md
index 8a4e121..2c09340 100644
--- a/memory/self/user_profile.md
+++ b/memory/self/user_profile.md
@@ -4,7 +4,7 @@
## Basic Information
-**Name:** TestUser
+**Name:** Not
**Location:** (unknown)
**Occupation:** (unknown)
**Technical Level:** (to be assessed)
@@ -40,4 +40,4 @@
---
-*Last updated: 2026-02-25*
+*Last updated: 2026-02-27*
diff --git a/message_to_alexander.txt b/message_to_alexander.txt
new file mode 100644
index 0000000..7fd8493
--- /dev/null
+++ b/message_to_alexander.txt
@@ -0,0 +1,4 @@
+Hello Alexander,
+
+I'm feeling good today, thanks for asking.
+My team is doing well and we're making progress on our projects.
\ No newline at end of file
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index d055adc..fcba504 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -84,29 +84,139 @@ async def _briefing_scheduler() -> None:
async def _thinking_loop() -> None:
"""Background task: Timmy's default thinking thread.
- Starts shortly after server boot and runs on a configurable cadence.
- Timmy ponders his existence, recent swarm activity, scripture, creative
- ideas, or continues a previous train of thought.
+ Instead of thinking directly, this creates thought tasks in the queue
+ for the task processor to handle. This ensures all of Timmy's work
+ goes through the unified task system.
"""
- from timmy.thinking import thinking_engine
+ from swarm.task_queue.models import create_task
+ from datetime import datetime
await asyncio.sleep(10) # Let server finish starting before first thought
while True:
try:
- await thinking_engine.think_once()
+ # Create a thought task instead of thinking directly
+ now = datetime.now()
+ create_task(
+ title=f"Thought: {now.strftime('%A %B %d, %I:%M %p')}",
+ description="Continue thinking about your existence, recent events, scripture, creative ideas, or a previous thread of thought.",
+ assigned_to="timmy",
+ created_by="timmy", # Self-generated
+ priority="low",
+ requires_approval=False,
+ auto_approve=True,
+ task_type="thought",
+ )
+ logger.debug("Created thought task in queue")
except Exception as exc:
logger.error("Thinking loop error: %s", exc)
await asyncio.sleep(settings.thinking_interval_seconds)
+async def _task_processor_loop() -> None:
+ """Background task: Timmy's task queue processor.
+
+ On startup, drains all pending/approved tasks immediately — iterating
+ through the queue and processing what can be handled, backlogging what
+ can't. Then enters the steady-state polling loop.
+ """
+ from swarm.task_processor import task_processor
+ from swarm.task_queue.models import update_task_status, TaskStatus
+ from timmy.session import chat as timmy_chat
+ from datetime import datetime
+ import json
+ import asyncio
+
+ await asyncio.sleep(5) # Let server finish starting
+
+ def handle_chat_response(task):
+ """Handler for chat_response tasks - calls Timmy and returns response."""
+ try:
+ now = datetime.now()
+ context = f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n\n"
+ response = timmy_chat(context + task.description)
+
+ # Push response to user via WebSocket
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+
+ asyncio.create_task(
+ ws_manager.broadcast(
+ "timmy_response",
+ {
+ "task_id": task.id,
+ "response": response,
+ },
+ )
+ )
+ except Exception as e:
+ logger.debug("Failed to push response via WS: %s", e)
+
+ return response
+ except Exception as e:
+ logger.error("Chat response failed: %s", e)
+ return f"Error: {str(e)}"
+
+ def handle_thought(task):
+ """Handler for thought tasks - Timmy's internal thinking."""
+ from timmy.thinking import thinking_engine
+
+ try:
+ result = thinking_engine.think_once()
+ return str(result) if result else "Thought completed"
+ except Exception as e:
+ logger.error("Thought processing failed: %s", e)
+ return f"Error: {str(e)}"
+
+ # Register handlers
+ task_processor.register_handler("chat_response", handle_chat_response)
+ task_processor.register_handler("thought", handle_thought)
+ task_processor.register_handler("internal", handle_thought)
+
+ # ── Startup drain: iterate through all pending tasks immediately ──
+ logger.info("Draining task queue on startup…")
+ try:
+ summary = await task_processor.drain_queue()
+ if summary["processed"] or summary["backlogged"]:
+ logger.info(
+ "Startup drain: %d processed, %d backlogged, %d skipped, %d failed",
+ summary["processed"],
+ summary["backlogged"],
+ summary["skipped"],
+ summary["failed"],
+ )
+
+ # Notify via WebSocket so the dashboard updates
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+
+ asyncio.create_task(
+ ws_manager.broadcast_json(
+ {
+ "type": "task_event",
+ "event": "startup_drain_complete",
+ "summary": summary,
+ }
+ )
+ )
+ except Exception:
+ pass
+ except Exception as exc:
+ logger.error("Startup drain failed: %s", exc)
+
+ # ── Steady-state: poll for new tasks ──
+ logger.info("Task processor entering steady-state loop")
+ await task_processor.run_loop(interval_seconds=3.0)
+
+
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(_briefing_scheduler())
# Register Timmy in the swarm registry so it shows up alongside other agents
from swarm import registry as swarm_registry
+
swarm_registry.register(
name="Timmy",
capabilities="chat,reasoning,research,planning",
@@ -115,6 +225,7 @@ async def lifespan(app: FastAPI):
# Log swarm recovery summary (reconciliation ran during coordinator init)
from swarm.coordinator import coordinator as swarm_coordinator
+
rec = swarm_coordinator._recovery_summary
if rec["tasks_failed"] or rec["agents_offlined"]:
logger.info(
@@ -138,6 +249,7 @@ async def lifespan(app: FastAPI):
# Log system startup event so the Events page is never empty
try:
from swarm.event_log import log_event, EventType
+
log_event(
EventType.SYSTEM_INFO,
source="coordinator",
@@ -148,6 +260,7 @@ async def lifespan(app: FastAPI):
# Auto-bootstrap MCP tools
from mcp.bootstrap import auto_bootstrap, get_bootstrap_status
+
try:
registered = auto_bootstrap()
if registered:
@@ -157,6 +270,7 @@ async def lifespan(app: FastAPI):
# Initialise Spark Intelligence engine
from spark.engine import spark_engine
+
if spark_engine.enabled:
logger.info("Spark Intelligence active — event capture enabled")
@@ -169,10 +283,17 @@ async def lifespan(app: FastAPI):
settings.thinking_interval_seconds,
)
+ # Start Timmy's task queue processor (skip in test mode)
+ task_processor_task = None
+ if os.environ.get("TIMMY_TEST_MODE") != "1":
+ task_processor_task = asyncio.create_task(_task_processor_loop())
+ logger.info("Task queue processor started")
+
# Auto-start chat integrations (skip silently if unconfigured)
from integrations.telegram_bot.bot import telegram_bot
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.chat_bridge.registry import platform_registry
+
platform_registry.register(discord_bot)
if settings.telegram_token:
@@ -195,6 +316,12 @@ async def lifespan(app: FastAPI):
await thinking_task
except asyncio.CancelledError:
pass
+ if task_processor_task:
+ task_processor_task.cancel()
+ try:
+ await task_processor_task
+ except asyncio.CancelledError:
+ pass
task.cancel()
try:
await task
@@ -272,4 +399,5 @@ async def index(request: Request):
async def shortcuts_setup():
"""Siri Shortcuts setup guide."""
from integrations.shortcuts.siri import get_setup_guide
+
return get_setup_guide()
diff --git a/src/dashboard/routes/agents.py b/src/dashboard/routes/agents.py
index 005fd01..1c8306c 100644
--- a/src/dashboard/routes/agents.py
+++ b/src/dashboard/routes/agents.py
@@ -237,16 +237,19 @@ async def clear_history(request: Request):
@router.post("/timmy/chat", response_class=HTMLResponse)
async def chat_timmy(request: Request, message: str = Form(...)):
+ """Chat with Timmy - queues message as task for async processing."""
+ from swarm.task_queue.models import create_task, get_queue_status_for_task
+
timestamp = datetime.now().strftime("%H:%M:%S")
+ task_id = None
response_text = None
error_text = None
+ queue_info = None
- # Check if the user wants to queue a task instead of chatting
+ # Check if the user wants to queue a task (explicit queue request)
task_info = _extract_task_from_message(message)
if task_info:
try:
- from swarm.task_queue.models import create_task
-
task = create_task(
title=task_info["title"],
description=task_info["description"],
@@ -254,7 +257,9 @@ async def chat_timmy(request: Request, message: str = Form(...)):
assigned_to=task_info.get("agent", "timmy"),
priority=task_info.get("priority", "normal"),
requires_approval=True,
+ task_type="task_request",
)
+ task_id = task.id
priority_label = (
f" | Priority: `{task.priority.value}`"
if task.priority.value != "normal"
@@ -276,27 +281,54 @@ async def chat_timmy(request: Request, message: str = Form(...)):
logger.error("Failed to create task from chat: %s", exc)
task_info = None
- # Normal chat path (also used as fallback if task creation failed)
+ # Normal chat: always queue for async processing
if not task_info:
try:
- now = datetime.now()
- context_parts = [
- f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]"
- ]
- if _QUEUE_QUERY_PATTERN.search(message):
- queue_ctx = _build_queue_context()
- if queue_ctx:
- context_parts.append(queue_ctx)
- context_prefix = "\n".join(context_parts) + "\n\n"
- response_text = timmy_chat(context_prefix + message)
- except Exception as exc:
- error_text = f"Timmy is offline: {exc}"
+ # Create a chat response task (auto-approved for timmy)
+ # Priority is "high" to jump ahead of Timmy's self-generated "thought" tasks
+ # but below any "urgent" tasks Timmy might create
+ task = create_task(
+ title=message[:100] + ("..." if len(message) > 100 else ""),
+ description=message,
+ created_by="user",
+ assigned_to="timmy",
+ priority="high", # Higher than thought tasks, lower than urgent
+ requires_approval=True,
+ auto_approve=True, # Auto-approve chat responses
+ task_type="chat_response",
+ )
+ task_id = task.id
+ queue_info = get_queue_status_for_task(task.id)
+ # Acknowledge queuing
+ position = queue_info.get("position", 1)
+ total = queue_info.get("total", 1)
+ percent_ahead = queue_info.get("percent_ahead", 0)
+
+ response_text = (
+ f"Message queued for Timmy's attention.\n\n"
+ f"**Queue position:** {position}/{total} ({100 - percent_ahead}% complete ahead of you)\n\n"
+ f"_Timmy will respond shortly..._"
+ )
+ logger.info(
+ "Chat → queued: %s (id=%s, position=%d/%d)",
+ message[:50],
+ task.id,
+ position,
+ total,
+ )
+ except Exception as exc:
+ logger.error("Failed to queue chat message: %s", exc)
+ error_text = f"Failed to queue message: {exc}"
+
+ # Log to message history (for context, even though async)
message_log.append(role="user", content=message, timestamp=timestamp)
if response_text is not None:
message_log.append(role="agent", content=response_text, timestamp=timestamp)
else:
- message_log.append(role="error", content=error_text, timestamp=timestamp)
+ message_log.append(
+ role="error", content=error_text or "Unknown error", timestamp=timestamp
+ )
return templates.TemplateResponse(
request,
@@ -306,5 +338,7 @@ async def chat_timmy(request: Request, message: str = Form(...)):
"response": response_text,
"error": error_text,
"timestamp": timestamp,
+ "task_id": task_id,
+ "queue_info": queue_info,
},
)
diff --git a/src/dashboard/routes/tasks.py b/src/dashboard/routes/tasks.py
index 0c2e892..c650010 100644
--- a/src/dashboard/routes/tasks.py
+++ b/src/dashboard/routes/tasks.py
@@ -32,6 +32,7 @@ from swarm.task_queue.models import (
get_counts_by_status,
get_pending_count,
get_task,
+ list_backlogged_tasks,
list_tasks,
update_task,
update_task_status,
@@ -45,6 +46,7 @@ templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templa
# ── Helper to broadcast task events via WebSocket ────────────────────────
+
def _broadcast_task_event(event_type: str, task: QueueTask):
"""Best-effort broadcast a task event to connected WebSocket clients."""
try:
@@ -74,25 +76,30 @@ def _broadcast_task_event(event_type: str, task: QueueTask):
# ── Dashboard page ───────────────────────────────────────────────────────
+
@router.get("/tasks", response_class=HTMLResponse)
async def task_queue_page(request: Request, assign: Optional[str] = None):
"""Task queue dashboard with three columns."""
- pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \
- list_tasks(status=TaskStatus.APPROVED)
- active = list_tasks(status=TaskStatus.RUNNING) + \
- list_tasks(status=TaskStatus.PAUSED)
- completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \
- list_tasks(status=TaskStatus.VETOED, limit=10) + \
- list_tasks(status=TaskStatus.FAILED, limit=10)
+ pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks(
+ status=TaskStatus.APPROVED
+ )
+ active = list_tasks(status=TaskStatus.RUNNING) + list_tasks(
+ status=TaskStatus.PAUSED
+ )
+ backlogged = list_backlogged_tasks(limit=20)
+ completed = (
+ list_tasks(status=TaskStatus.COMPLETED, limit=20)
+ + list_tasks(status=TaskStatus.VETOED, limit=10)
+ + list_tasks(status=TaskStatus.FAILED, limit=10)
+ + backlogged
+ )
# Get agents for the create modal
agents = []
try:
from swarm.coordinator import coordinator
- agents = [
- {"id": a.id, "name": a.name}
- for a in coordinator.list_swarm_agents()
- ]
+
+ agents = [{"id": a.id, "name": a.name} for a in coordinator.list_swarm_agents()]
except Exception:
pass
# Always include core agents
@@ -120,11 +127,13 @@ async def task_queue_page(request: Request, assign: Optional[str] = None):
# ── HTMX partials ───────────────────────────────────────────────────────
+
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending_partial(request: Request):
"""HTMX partial: pending approval tasks."""
- pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \
- list_tasks(status=TaskStatus.APPROVED)
+ pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks(
+ status=TaskStatus.APPROVED
+ )
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -135,8 +144,9 @@ async def tasks_pending_partial(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active_partial(request: Request):
"""HTMX partial: active tasks."""
- active = list_tasks(status=TaskStatus.RUNNING) + \
- list_tasks(status=TaskStatus.PAUSED)
+ active = list_tasks(status=TaskStatus.RUNNING) + list_tasks(
+ status=TaskStatus.PAUSED
+ )
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -147,9 +157,12 @@ async def tasks_active_partial(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed_partial(request: Request):
"""HTMX partial: completed tasks."""
- completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \
- list_tasks(status=TaskStatus.VETOED, limit=10) + \
- list_tasks(status=TaskStatus.FAILED, limit=10)
+ completed = (
+ list_tasks(status=TaskStatus.COMPLETED, limit=20)
+ + list_tasks(status=TaskStatus.VETOED, limit=10)
+ + list_tasks(status=TaskStatus.FAILED, limit=10)
+ + list_backlogged_tasks(limit=20)
+ )
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -159,6 +172,7 @@ async def tasks_completed_partial(request: Request):
# ── JSON API ─────────────────────────────────────────────────────────────
+
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks(
status: Optional[str] = None,
@@ -242,10 +256,24 @@ async def api_task_counts():
"completed": counts.get("completed", 0),
"failed": counts.get("failed", 0),
"vetoed": counts.get("vetoed", 0),
+ "backlogged": counts.get("backlogged", 0),
"total": sum(counts.values()),
}
+# ── Backlog API (must be before {task_id} catch-all) ─────────────────────
+
+
+@router.get("/api/tasks/backlog", response_class=JSONResponse)
+async def api_list_backlogged(assigned_to: Optional[str] = None, limit: int = 50):
+ """List all backlogged tasks."""
+ tasks = list_backlogged_tasks(assigned_to=assigned_to, limit=limit)
+ return {
+ "tasks": [_task_to_dict(t) for t in tasks],
+ "count": len(tasks),
+ }
+
+
@router.get("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_get_task(task_id: str):
"""Get a single task by ID."""
@@ -257,6 +285,7 @@ async def api_get_task(task_id: str):
# ── Workflow actions ─────────────────────────────────────────────────────
+
@router.patch("/api/tasks/{task_id}/approve", response_class=JSONResponse)
async def api_approve_task(task_id: str):
"""Approve a pending task."""
@@ -436,10 +465,101 @@ async def htmx_retry_task(request: Request, task_id: str):
)
+@router.patch("/api/tasks/{task_id}/unbacklog", response_class=JSONResponse)
+async def api_unbacklog_task(task_id: str):
+ """Move a backlogged task back to approved for re-processing."""
+ task = get_task(task_id)
+ if not task:
+ raise HTTPException(404, "Task not found")
+ if task.status != TaskStatus.BACKLOGGED:
+ raise HTTPException(400, "Can only unbacklog backlogged tasks")
+ updated = update_task_status(
+ task_id, TaskStatus.APPROVED, result=None, backlog_reason=None
+ )
+ _broadcast_task_event("task_unbacklogged", updated)
+ return {"success": True, "task": _task_to_dict(updated)}
+
+
+@router.post("/tasks/{task_id}/unbacklog", response_class=HTMLResponse)
+async def htmx_unbacklog_task(request: Request, task_id: str):
+ """Move a backlogged task back to approved (HTMX)."""
+ task = get_task(task_id)
+ if not task:
+ raise HTTPException(404, "Task not found")
+ updated = update_task_status(
+ task_id, TaskStatus.APPROVED, result=None, backlog_reason=None
+ )
+ _broadcast_task_event("task_unbacklogged", updated)
+ return templates.TemplateResponse(
+ request, "partials/task_card.html", {"task": updated}
+ )
+
+
+# ── Queue Status API ─────────────────────────────────────────────────────
+
+
+@router.get("/api/queue/status", response_class=JSONResponse)
+async def api_queue_status(assigned_to: str = "timmy"):
+ """Get queue status for an agent - position info for polling."""
+ from swarm.task_queue.models import (
+ get_current_task_for_agent,
+ get_queue_position_ahead,
+ get_next_pending_task,
+ )
+
+ current = get_current_task_for_agent(assigned_to)
+ next_task = get_next_pending_task(assigned_to)
+ ahead = get_queue_position_ahead(assigned_to)
+
+ return {
+ "agent": assigned_to,
+ "is_working": current is not None,
+ "current_task": _task_to_dict(current) if current else None,
+ "next_task": _task_to_dict(next_task) if next_task else None,
+ "tasks_ahead": ahead,
+ }
+
+
+@router.get("/api/queue/position/{task_id}", response_class=JSONResponse)
+async def api_queue_position(task_id: str):
+ """Get queue position for a specific task."""
+ from swarm.task_queue.models import get_queue_status_for_task
+
+ status = get_queue_status_for_task(task_id)
+ if "error" in status:
+ raise HTTPException(404, status["error"])
+ return status
+
+
+@router.get("/api/queue/agent/{assigned_to}", response_class=JSONResponse)
+async def api_agent_queue(assigned_to: str, limit: int = 20):
+ """Get all pending tasks for an agent."""
+ from swarm.task_queue.models import list_tasks, TaskStatus
+
+ tasks = list_tasks(
+ assigned_to=assigned_to,
+ status=None, # All statuses
+ limit=limit,
+ )
+ # Filter to pending/running tasks
+ pending = [
+ t
+ for t in tasks
+ if t.status not in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.VETOED)
+ ]
+
+ return {
+ "assigned_to": assigned_to,
+ "tasks": [_task_to_dict(t) for t in pending],
+ "count": len(pending),
+ }
+
+
# ── Helpers ──────────────────────────────────────────────────────────────
+
def _task_to_dict(task: QueueTask) -> dict:
- return {
+ d = {
"id": task.id,
"title": task.title,
"description": task.description,
@@ -457,11 +577,15 @@ def _task_to_dict(task: QueueTask) -> dict:
"completed_at": task.completed_at,
"updated_at": task.updated_at,
}
+ if task.backlog_reason:
+ d["backlog_reason"] = task.backlog_reason
+ return d
def _notify_task_created(task: QueueTask):
try:
from infrastructure.notifications.push import notifier
+
notifier.notify(
title="New Task",
message=f"{task.created_by} created: {task.title}",
diff --git a/src/dashboard/templates/partials/chat_message.html b/src/dashboard/templates/partials/chat_message.html
index 4ef6a64..54a8a0e 100644
--- a/src/dashboard/templates/partials/chat_message.html
+++ b/src/dashboard/templates/partials/chat_message.html
@@ -7,6 +7,11 @@
TIMMY // {{ timestamp }}
{{ response | e }}
+{% if queue_info %}
+
+ Position in queue: {{ queue_info.position }}/{{ queue_info.total }}
+
+{% endif %}
diff --git a/src/swarm/task_processor.py b/src/swarm/task_processor.py
new file mode 100644
index 0000000..909bb65
--- /dev/null
+++ b/src/swarm/task_processor.py
@@ -0,0 +1,263 @@
+"""Task processor for Timmy — consumes tasks from the queue one at a time.
+
+This module provides a background loop that Timmy uses to process tasks
+from the queue, including chat responses and self-generated tasks.
+
+On startup, the processor drains all pending/approved tasks before entering
+the steady-state polling loop. Tasks that have no registered handler are
+moved to BACKLOGGED so they don't block the queue.
+"""
+
+import asyncio
+import logging
+from typing import Optional, Callable
+
+from swarm.task_queue.models import (
+ QueueTask,
+ TaskStatus,
+ get_all_actionable_tasks,
+ get_current_task_for_agent,
+ get_next_pending_task,
+ update_task_status,
+ update_task_steps,
+ get_task,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class TaskProcessor:
+ """Processes tasks from the queue for a specific agent."""
+
+ def __init__(self, agent_id: str = "timmy"):
+ self.agent_id = agent_id
+ self._current_task: Optional[QueueTask] = None
+ self._running = False
+ self._handlers: dict[str, Callable] = {}
+ self._user_callback: Optional[Callable[[str, str], None]] = (
+ None # (message_type, content)
+ )
+
+ def register_handler(self, task_type: str, handler: Callable[[QueueTask], str]):
+ """Register a handler for a specific task type.
+
+ Handler receives the task and returns the result string.
+ """
+ self._handlers[task_type] = handler
+
+ def set_user_callback(self, callback: Callable[[str, str], None]):
+ """Set callback for pushing messages to the user.
+
+ Args:
+ callback: Function that takes (message_type, content)
+ message_type: 'response', 'progress', 'notification'
+ """
+ self._user_callback = callback
+
+ def push_to_user(self, message_type: str, content: str):
+ """Push a message to the user via the registered callback."""
+ if self._user_callback:
+ try:
+ self._user_callback(message_type, content)
+ except Exception as e:
+ logger.error("Failed to push message to user: %s", e)
+ else:
+ logger.debug("No user callback set, message not pushed: %s", content[:100])
+
+ def _backlog_task(self, task: QueueTask, reason: str) -> None:
+ """Move a task to the backlog with a reason."""
+ update_task_status(
+ task.id,
+ TaskStatus.BACKLOGGED,
+ result=f"Backlogged: {reason}",
+ backlog_reason=reason,
+ )
+ update_task_steps(
+ task.id,
+ [{"description": f"Backlogged: {reason}", "status": "backlogged"}],
+ )
+ logger.info("Task backlogged: %s — %s", task.title, reason)
+
+ async def process_single_task(self, task: QueueTask) -> Optional[QueueTask]:
+ """Process one specific task. Backlog it if we can't handle it.
+
+ Returns the task on success, or None if backlogged/failed.
+ """
+ # No handler → backlog immediately
+ handler = self._handlers.get(task.task_type)
+ if not handler:
+ self._backlog_task(task, f"No handler for task type: {task.task_type}")
+ return None
+
+ # Tasks still awaiting approval shouldn't be auto-executed
+ if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval:
+ logger.debug("Skipping task %s — needs human approval", task.id)
+ return None
+
+ self._current_task = task
+ update_task_status(task.id, TaskStatus.RUNNING)
+
+ try:
+ logger.info("Processing task: %s (type: %s)", task.title, task.task_type)
+
+ update_task_steps(
+ task.id,
+ [{"description": f"Processing: {task.title}", "status": "running"}],
+ )
+
+ result = handler(task)
+
+ update_task_status(task.id, TaskStatus.COMPLETED, result=result)
+ update_task_steps(
+ task.id,
+ [{"description": f"Completed: {task.title}", "status": "completed"}],
+ )
+
+ logger.info("Task completed: %s", task.id)
+ return task
+
+ except Exception as e:
+ error_msg = str(e)
+ logger.error("Task failed: %s - %s", task.id, error_msg)
+
+ # Determine if this is a permanent (backlog) or transient (fail) error
+ if self._is_permanent_failure(e):
+ self._backlog_task(task, error_msg)
+ else:
+ update_task_status(task.id, TaskStatus.FAILED, result=error_msg)
+
+ return None
+ finally:
+ self._current_task = None
+
+ def _is_permanent_failure(self, error: Exception) -> bool:
+ """Decide whether an error means the task can never succeed.
+
+ Permanent failures get backlogged; transient ones stay as FAILED
+ so they can be retried.
+ """
+ msg = str(error).lower()
+ permanent_indicators = [
+ "no handler",
+ "not implemented",
+ "unsupported",
+ "not supported",
+ "permission denied",
+ "forbidden",
+ "not found",
+ "invalid task",
+ ]
+ return any(indicator in msg for indicator in permanent_indicators)
+
+ async def drain_queue(self) -> dict:
+ """Iterate through ALL actionable tasks right now — called on startup.
+
+ Processes every approved/auto-approved task in priority order.
+ Tasks that can't be handled are backlogged. Tasks still requiring
+ human approval are skipped (left in PENDING_APPROVAL).
+
+ Returns a summary dict with counts of processed, backlogged, skipped.
+ """
+ tasks = get_all_actionable_tasks(self.agent_id)
+ summary = {"processed": 0, "backlogged": 0, "skipped": 0, "failed": 0}
+
+ if not tasks:
+ logger.info("Startup drain: no pending tasks for %s", self.agent_id)
+ return summary
+
+ logger.info(
+ "Startup drain: %d task(s) to iterate through for %s",
+ len(tasks),
+ self.agent_id,
+ )
+
+ for task in tasks:
+ # Skip tasks that need human approval
+ if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval:
+ logger.debug("Drain: skipping %s (needs approval)", task.title)
+ summary["skipped"] += 1
+ continue
+
+ # No handler? Backlog it
+ if task.task_type not in self._handlers:
+ self._backlog_task(task, f"No handler for task type: {task.task_type}")
+ summary["backlogged"] += 1
+ continue
+
+ # Try to process
+ result = await self.process_single_task(task)
+ if result:
+ summary["processed"] += 1
+ else:
+ # Check if it was backlogged vs failed
+ refreshed = get_task(task.id)
+ if refreshed and refreshed.status == TaskStatus.BACKLOGGED:
+ summary["backlogged"] += 1
+ else:
+ summary["failed"] += 1
+
+ logger.info(
+ "Startup drain complete: %d processed, %d backlogged, %d skipped, %d failed",
+ summary["processed"],
+ summary["backlogged"],
+ summary["skipped"],
+ summary["failed"],
+ )
+ return summary
+
+ async def process_next_task(self) -> Optional[QueueTask]:
+ """Process the next available task for this agent.
+
+ Returns the task that was processed, or None if no tasks available.
+ """
+ # Check if already working on something
+ current = get_current_task_for_agent(self.agent_id)
+ if current:
+ logger.debug("Already processing task: %s", current.id)
+ return None
+
+ # Get next task
+ task = get_next_pending_task(self.agent_id)
+ if not task:
+ logger.debug("No pending tasks for %s", self.agent_id)
+ return None
+
+ return await self.process_single_task(task)
+
+ async def run_loop(self, interval_seconds: float = 5.0):
+ """Run the task processing loop.
+
+ This should be called as a background task.
+ """
+ self._running = True
+ logger.info("Task processor started for %s", self.agent_id)
+
+ while self._running:
+ try:
+ await self.process_next_task()
+ except Exception as e:
+ logger.error("Task processor error: %s", e)
+
+ await asyncio.sleep(interval_seconds)
+
+ logger.info("Task processor stopped for %s", self.agent_id)
+
+ def stop(self):
+ """Stop the task processing loop."""
+ self._running = False
+
+ @property
+ def current_task(self) -> Optional[QueueTask]:
+ """Get the currently processing task."""
+ if self._current_task:
+ return get_task(self._current_task.id)
+ return get_current_task_for_agent(self.agent_id)
+
+
+# Global processor instance
+task_processor = TaskProcessor("timmy")
+
+
+def get_task_processor() -> TaskProcessor:
+ """Get the global task processor instance."""
+ return task_processor
diff --git a/src/swarm/task_queue/models.py b/src/swarm/task_queue/models.py
index 25f3a6b..bc41388 100644
--- a/src/swarm/task_queue/models.py
+++ b/src/swarm/task_queue/models.py
@@ -24,6 +24,7 @@ class TaskStatus(str, Enum):
COMPLETED = "completed"
VETOED = "vetoed"
FAILED = "failed"
+ BACKLOGGED = "backlogged"
class TaskPriority(str, Enum):
@@ -47,6 +48,7 @@ class QueueTask:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
title: str = ""
description: str = ""
+ task_type: str = "chat_response" # chat_response, thought, internal, external
assigned_to: str = "timmy"
created_by: str = "user"
status: TaskStatus = TaskStatus.PENDING_APPROVAL
@@ -64,6 +66,8 @@ class QueueTask:
updated_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
+ queue_position: Optional[int] = None # Position in queue when created
+ backlog_reason: Optional[str] = None # Why the task was backlogged
# ── Auto-Approve Rules ──────────────────────────────────────────────────
@@ -97,6 +101,7 @@ def should_auto_approve(task: QueueTask) -> bool:
# ── Database ─────────────────────────────────────────────────────────────
+
def _get_conn() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
@@ -107,6 +112,7 @@ def _get_conn() -> sqlite3.Connection:
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
+ task_type TEXT DEFAULT 'chat_response',
assigned_to TEXT DEFAULT 'timmy',
created_by TEXT DEFAULT 'user',
status TEXT DEFAULT 'pending_approval',
@@ -119,18 +125,31 @@ def _get_conn() -> sqlite3.Connection:
created_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
- updated_at TEXT NOT NULL
+ updated_at TEXT NOT NULL,
+ queue_position INTEGER,
+ backlog_reason TEXT
)
""")
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)"
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)"
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)"
- )
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)")
+
+ # Migrate existing tables - add new columns if they don't exist
+ try:
+ conn.execute(
+ "ALTER TABLE task_queue ADD COLUMN task_type TEXT DEFAULT 'chat_response'"
+ )
+ except sqlite3.OperationalError:
+ pass # Column already exists
+ try:
+ conn.execute("ALTER TABLE task_queue ADD COLUMN queue_position INTEGER")
+ except sqlite3.OperationalError:
+ pass # Column already exists
+ try:
+ conn.execute("ALTER TABLE task_queue ADD COLUMN backlog_reason TEXT")
+ except sqlite3.OperationalError:
+ pass # Column already exists
+
conn.commit()
return conn
@@ -146,6 +165,7 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask:
id=d["id"],
title=d["title"],
description=d.get("description", ""),
+ task_type=d.get("task_type", "chat_response"),
assigned_to=d.get("assigned_to", "timmy"),
created_by=d.get("created_by", "user"),
status=TaskStatus(d["status"]),
@@ -159,11 +179,14 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask:
started_at=d.get("started_at"),
completed_at=d.get("completed_at"),
updated_at=d["updated_at"],
+ queue_position=d.get("queue_position"),
+ backlog_reason=d.get("backlog_reason"),
)
# ── CRUD ─────────────────────────────────────────────────────────────────
+
def create_task(
title: str,
description: str = "",
@@ -174,12 +197,18 @@ def create_task(
auto_approve: bool = False,
parent_task_id: Optional[str] = None,
steps: Optional[list] = None,
+ task_type: str = "chat_response",
) -> QueueTask:
"""Create a new task in the queue."""
now = datetime.now(timezone.utc).isoformat()
+
+ # Calculate queue position - count tasks ahead in queue (pending or approved)
+ queue_position = get_queue_position_ahead(assigned_to)
+
task = QueueTask(
title=title,
description=description,
+ task_type=task_type,
assigned_to=assigned_to,
created_by=created_by,
status=TaskStatus.PENDING_APPROVAL,
@@ -190,6 +219,7 @@ def create_task(
steps=steps or [],
created_at=now,
updated_at=now,
+ queue_position=queue_position,
)
# Check auto-approve
@@ -200,17 +230,31 @@ def create_task(
conn = _get_conn()
conn.execute(
"""INSERT INTO task_queue
- (id, title, description, assigned_to, created_by, status, priority,
+ (id, title, description, task_type, assigned_to, created_by, status, priority,
requires_approval, auto_approve, parent_task_id, result, steps,
- created_at, started_at, completed_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ created_at, started_at, completed_at, updated_at, queue_position,
+ backlog_reason)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
- task.id, task.title, task.description, task.assigned_to,
- task.created_by, task.status.value, task.priority.value,
- int(task.requires_approval), int(task.auto_approve),
- task.parent_task_id, task.result, json.dumps(task.steps),
- task.created_at, task.started_at, task.completed_at,
+ task.id,
+ task.title,
+ task.description,
+ task.task_type,
+ task.assigned_to,
+ task.created_by,
+ task.status.value,
+ task.priority.value,
+ int(task.requires_approval),
+ int(task.auto_approve),
+ task.parent_task_id,
+ task.result,
+ json.dumps(task.steps),
+ task.created_at,
+ task.started_at,
+ task.completed_at,
task.updated_at,
+ task.queue_position,
+ task.backlog_reason,
),
)
conn.commit()
@@ -220,9 +264,7 @@ def create_task(
def get_task(task_id: str) -> Optional[QueueTask]:
conn = _get_conn()
- row = conn.execute(
- "SELECT * FROM task_queue WHERE id = ?", (task_id,)
- ).fetchone()
+ row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -264,6 +306,7 @@ def update_task_status(
task_id: str,
new_status: TaskStatus,
result: Optional[str] = None,
+ backlog_reason: Optional[str] = None,
) -> Optional[QueueTask]:
now = datetime.now(timezone.utc).isoformat()
conn = _get_conn()
@@ -282,6 +325,10 @@ def update_task_status(
updates.append("result = ?")
params.append(result)
+ if backlog_reason is not None:
+ updates.append("backlog_reason = ?")
+ params.append(backlog_reason)
+
params.append(task_id)
conn.execute(
f"UPDATE task_queue SET {', '.join(updates)} WHERE id = ?",
@@ -289,9 +336,7 @@ def update_task_status(
)
conn.commit()
- row = conn.execute(
- "SELECT * FROM task_queue WHERE id = ?", (task_id,)
- ).fetchone()
+ row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -330,9 +375,7 @@ def update_task(
)
conn.commit()
- row = conn.execute(
- "SELECT * FROM task_queue WHERE id = ?", (task_id,)
- ).fetchone()
+ row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -369,6 +412,90 @@ def get_pending_count() -> int:
return row["cnt"] if row else 0
+def get_queue_position_ahead(assigned_to: str) -> int:
+ """Get count of tasks ahead of new tasks for a given assignee.
+
+ Counts tasks that are pending_approval or approved (waiting to be processed).
+ """
+ conn = _get_conn()
+ row = conn.execute(
+ """SELECT COUNT(*) as cnt FROM task_queue
+ WHERE assigned_to = ? AND status IN ('pending_approval', 'approved', 'running')
+ AND created_at < datetime('now')""",
+ (assigned_to,),
+ ).fetchone()
+ conn.close()
+ return row["cnt"] if row else 0
+
+
+def get_queue_status_for_task(task_id: str) -> dict:
+ """Get queue position info for a specific task."""
+ task = get_task(task_id)
+ if not task:
+ return {"error": "Task not found"}
+
+ conn = _get_conn()
+ # Count tasks ahead of this one (created earlier, not completed)
+ ahead = conn.execute(
+ """SELECT COUNT(*) as cnt FROM task_queue
+ WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed')
+ AND created_at < ?""",
+ (task.assigned_to, task.created_at),
+ ).fetchone()
+ total = conn.execute(
+ """SELECT COUNT(*) as cnt FROM task_queue
+ WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed')""",
+ (task.assigned_to,),
+ ).fetchone()
+ conn.close()
+
+ position = ahead["cnt"] + 1 if ahead else 1
+ total_count = total["cnt"] if total else 1
+
+ return {
+ "task_id": task_id,
+ "position": position,
+ "total": total_count,
+ "percent_ahead": int((ahead["cnt"] / total_count * 100))
+ if total_count > 0
+ else 0,
+ }
+
+
+def get_current_task_for_agent(assigned_to: str) -> Optional[QueueTask]:
+ """Get the currently running task for an agent."""
+ conn = _get_conn()
+ row = conn.execute(
+ """SELECT * FROM task_queue
+ WHERE assigned_to = ? AND status = 'running'
+ ORDER BY started_at DESC LIMIT 1""",
+ (assigned_to,),
+ ).fetchone()
+ conn.close()
+ return _row_to_task(row) if row else None
+
+
+def get_next_pending_task(assigned_to: str) -> Optional[QueueTask]:
+ """Get the next pending/approved task for an agent to work on."""
+ conn = _get_conn()
+ row = conn.execute(
+ """SELECT * FROM task_queue
+ WHERE assigned_to = ? AND status IN ('approved', 'pending_approval')
+ ORDER BY
+ CASE priority
+ WHEN 'urgent' THEN 1
+ WHEN 'high' THEN 2
+ WHEN 'normal' THEN 3
+ WHEN 'low' THEN 4
+ END,
+ created_at ASC
+ LIMIT 1""",
+ (assigned_to,),
+ ).fetchone()
+ conn.close()
+ return _row_to_task(row) if row else None
+
+
def get_task_summary_for_briefing() -> dict:
"""Get task stats for the morning briefing."""
counts = get_counts_by_status()
@@ -377,6 +504,10 @@ def get_task_summary_for_briefing() -> dict:
failed = conn.execute(
"SELECT title, result FROM task_queue WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 5"
).fetchall()
+ # Backlogged tasks
+ backlogged = conn.execute(
+ "SELECT title, backlog_reason FROM task_queue WHERE status = 'backlogged' ORDER BY updated_at DESC LIMIT 5"
+ ).fetchall()
conn.close()
return {
@@ -385,6 +516,56 @@ def get_task_summary_for_briefing() -> dict:
"completed": counts.get("completed", 0),
"failed": counts.get("failed", 0),
"vetoed": counts.get("vetoed", 0),
+ "backlogged": counts.get("backlogged", 0),
"total": sum(counts.values()),
- "recent_failures": [{"title": r["title"], "result": r["result"]} for r in failed],
+ "recent_failures": [
+ {"title": r["title"], "result": r["result"]} for r in failed
+ ],
+ "recent_backlogged": [
+ {"title": r["title"], "reason": r["backlog_reason"]} for r in backlogged
+ ],
}
+
+
+def list_backlogged_tasks(
+ assigned_to: Optional[str] = None, limit: int = 50
+) -> list[QueueTask]:
+ """List all backlogged tasks, optionally filtered by assignee."""
+ conn = _get_conn()
+ if assigned_to:
+ rows = conn.execute(
+ """SELECT * FROM task_queue WHERE status = 'backlogged' AND assigned_to = ?
+ ORDER BY priority, created_at ASC LIMIT ?""",
+ (assigned_to, limit),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ """SELECT * FROM task_queue WHERE status = 'backlogged'
+ ORDER BY priority, created_at ASC LIMIT ?""",
+ (limit,),
+ ).fetchall()
+ conn.close()
+ return [_row_to_task(r) for r in rows]
+
+
+def get_all_actionable_tasks(assigned_to: str) -> list[QueueTask]:
+ """Get all tasks that should be processed on startup — approved or auto-approved pending.
+
+ Returns tasks ordered by priority then creation time (urgent first, oldest first).
+ """
+ conn = _get_conn()
+ rows = conn.execute(
+ """SELECT * FROM task_queue
+ WHERE assigned_to = ? AND status IN ('approved', 'pending_approval')
+ ORDER BY
+ CASE priority
+ WHEN 'urgent' THEN 1
+ WHEN 'high' THEN 2
+ WHEN 'normal' THEN 3
+ WHEN 'low' THEN 4
+ END,
+ created_at ASC""",
+ (assigned_to,),
+ ).fetchall()
+ conn.close()
+ return [_row_to_task(r) for r in rows]
diff --git a/tests/swarm/test_task_queue.py b/tests/swarm/test_task_queue.py
index a35b298..26cd931 100644
--- a/tests/swarm/test_task_queue.py
+++ b/tests/swarm/test_task_queue.py
@@ -59,7 +59,10 @@ def test_list_tasks():
def test_list_tasks_with_status_filter():
from swarm.task_queue.models import (
- create_task, list_tasks, update_task_status, TaskStatus,
+ create_task,
+ list_tasks,
+ update_task_status,
+ TaskStatus,
)
task = create_task(title="Filter test", created_by="test")
@@ -70,7 +73,9 @@ def test_list_tasks_with_status_filter():
def test_update_task_status():
from swarm.task_queue.models import (
- create_task, update_task_status, TaskStatus,
+ create_task,
+ update_task_status,
+ TaskStatus,
)
task = create_task(title="Status test", created_by="test")
@@ -80,7 +85,9 @@ def test_update_task_status():
def test_update_task_running_sets_started_at():
from swarm.task_queue.models import (
- create_task, update_task_status, TaskStatus,
+ create_task,
+ update_task_status,
+ TaskStatus,
)
task = create_task(title="Running test", created_by="test")
@@ -90,7 +97,9 @@ def test_update_task_running_sets_started_at():
def test_update_task_completed_sets_completed_at():
from swarm.task_queue.models import (
- create_task, update_task_status, TaskStatus,
+ create_task,
+ update_task_status,
+ TaskStatus,
)
task = create_task(title="Complete test", created_by="test")
@@ -314,6 +323,7 @@ class TestExtractTaskFromMessage:
def test_add_to_queue(self):
from dashboard.routes.agents import _extract_task_from_message
+
result = _extract_task_from_message("Add refactor the login to the task queue")
assert result is not None
assert result["agent"] == "timmy"
@@ -321,33 +331,42 @@ class TestExtractTaskFromMessage:
def test_schedule_this(self):
from dashboard.routes.agents import _extract_task_from_message
+
result = _extract_task_from_message("Schedule this for later")
assert result is not None
def test_create_a_task(self):
from dashboard.routes.agents import _extract_task_from_message
+
result = _extract_task_from_message("Create a task to fix the login page")
assert result is not None
assert "title" in result
def test_normal_message_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
+
assert _extract_task_from_message("Hello, how are you?") is None
def test_meta_question_about_tasks_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
+
assert _extract_task_from_message("How do I create a task?") is None
def test_what_is_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
+
assert _extract_task_from_message("What is a task queue?") is None
def test_explain_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
- assert _extract_task_from_message("Can you explain how to create a task?") is None
+
+ assert (
+ _extract_task_from_message("Can you explain how to create a task?") is None
+ )
def test_what_would_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
+
assert _extract_task_from_message("What would a task flow look like?") is None
@@ -356,22 +375,32 @@ class TestExtractAgentFromMessage:
def test_extracts_forge(self):
from dashboard.routes.agents import _extract_agent_from_message
- assert _extract_agent_from_message("Create a task for Forge to refactor") == "forge"
+
+ assert (
+ _extract_agent_from_message("Create a task for Forge to refactor")
+ == "forge"
+ )
def test_extracts_echo(self):
from dashboard.routes.agents import _extract_agent_from_message
- assert _extract_agent_from_message("Add research for Echo to the queue") == "echo"
+
+ assert (
+ _extract_agent_from_message("Add research for Echo to the queue") == "echo"
+ )
def test_case_insensitive(self):
from dashboard.routes.agents import _extract_agent_from_message
+
assert _extract_agent_from_message("Schedule this for SEER") == "seer"
def test_defaults_to_timmy(self):
from dashboard.routes.agents import _extract_agent_from_message
+
assert _extract_agent_from_message("Create a task to fix the bug") == "timmy"
def test_ignores_unknown_agent(self):
from dashboard.routes.agents import _extract_agent_from_message
+
assert _extract_agent_from_message("Create a task for BobAgent") == "timmy"
@@ -380,26 +409,32 @@ class TestExtractPriorityFromMessage:
def test_urgent(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("urgent: fix the server") == "urgent"
def test_critical(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("This is critical, do it now") == "urgent"
def test_asap(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("Fix this ASAP") == "urgent"
def test_high_priority(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("This is important work") == "high"
def test_low_priority(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("minor cleanup task") == "low"
def test_default_normal(self):
from dashboard.routes.agents import _extract_priority_from_message
+
assert _extract_priority_from_message("Fix the login page") == "normal"
@@ -408,25 +443,31 @@ class TestTitleCleaning:
def test_strips_agent_from_title(self):
from dashboard.routes.agents import _extract_task_from_message
- result = _extract_task_from_message("Create a task for Forge to refactor the login")
+
+ result = _extract_task_from_message(
+ "Create a task for Forge to refactor the login"
+ )
assert result is not None
assert "forge" not in result["title"].lower()
assert "for" not in result["title"].lower().split()[0:1] # "for" stripped
def test_strips_priority_from_title(self):
from dashboard.routes.agents import _extract_task_from_message
+
result = _extract_task_from_message("Create an urgent task to fix the server")
assert result is not None
assert "urgent" not in result["title"].lower()
def test_title_is_capitalized(self):
from dashboard.routes.agents import _extract_task_from_message
+
result = _extract_task_from_message("Add refactor the login to the task queue")
assert result is not None
assert result["title"][0].isupper()
def test_title_capped_at_120_chars(self):
from dashboard.routes.agents import _extract_task_from_message
+
long_msg = "Create a task to " + "x" * 200
result = _extract_task_from_message(long_msg)
assert result is not None
@@ -438,7 +479,10 @@ class TestFullExtraction:
def test_task_includes_agent_and_priority(self):
from dashboard.routes.agents import _extract_task_from_message
- result = _extract_task_from_message("Create a high priority task for Forge to refactor auth")
+
+ result = _extract_task_from_message(
+ "Create a high priority task for Forge to refactor auth"
+ )
assert result is not None
assert result["agent"] == "forge"
assert result["priority"] == "high"
@@ -446,7 +490,10 @@ class TestFullExtraction:
def test_create_with_all_fields(self):
from dashboard.routes.agents import _extract_task_from_message
- result = _extract_task_from_message("Add an urgent task for Mace to audit security to the queue")
+
+ result = _extract_task_from_message(
+ "Add an urgent task for Mace to audit security to the queue"
+ )
assert result is not None
assert result["agent"] == "mace"
assert result["priority"] == "urgent"
@@ -482,50 +529,43 @@ class TestChatTimmyIntegration:
assert resp.status_code == 200
assert "Task queued" in resp.text or "urgent" in resp.text.lower()
- @patch("dashboard.routes.agents.timmy_chat")
- def test_chat_injects_datetime_context(self, mock_chat, client):
- mock_chat.return_value = "Hello there!"
- client.post(
+ def test_chat_queues_message_for_async_processing(self, client):
+ """Normal chat messages are now queued for async processing."""
+ resp = client.post(
"/agents/timmy/chat",
- data={"message": "Hello Timmy"},
+ data={"message": "Hello Timmy, how are you?"},
)
- mock_chat.assert_called_once()
- call_arg = mock_chat.call_args[0][0]
- assert "[System: Current date/time is" in call_arg
+ assert resp.status_code == 200
+ # Should queue the message, not respond immediately
+ assert "queued" in resp.text.lower() or "queue" in resp.text.lower()
+ # Should show position info
+ assert "position" in resp.text.lower() or "1/" in resp.text
+
+ def test_chat_creates_chat_response_task(self, client):
+ """Chat messages create a chat_response task type."""
+ from swarm.task_queue.models import list_tasks, TaskStatus
+
+ resp = client.post(
+ "/agents/timmy/chat",
+ data={"message": "Test message"},
+ )
+ assert resp.status_code == 200
+
+ # Check that a chat_response task was created
+ tasks = list_tasks(assigned_to="timmy")
+ chat_tasks = [t for t in tasks if t.task_type == "chat_response"]
+ assert len(chat_tasks) >= 1
@patch("dashboard.routes.agents.timmy_chat")
- @patch("dashboard.routes.agents._build_queue_context")
- def test_chat_injects_queue_context_on_queue_query(self, mock_ctx, mock_chat, client):
- mock_ctx.return_value = "[System: Task queue — 3 pending approval, 1 running, 5 completed.]"
- mock_chat.return_value = "There are 3 tasks pending."
- client.post(
- "/agents/timmy/chat",
- data={"message": "What tasks are in the queue?"},
- )
- mock_ctx.assert_called_once()
- mock_chat.assert_called_once()
- call_arg = mock_chat.call_args[0][0]
- assert "[System: Task queue" in call_arg
-
- @patch("dashboard.routes.agents.timmy_chat")
- @patch("dashboard.routes.agents._build_queue_context")
- def test_chat_no_queue_context_for_normal_message(self, mock_ctx, mock_chat, client):
+ def test_chat_no_queue_context_for_normal_message(self, mock_chat, client):
+ """Queue context is not built for normal queued messages."""
mock_chat.return_value = "Hi!"
client.post(
"/agents/timmy/chat",
data={"message": "Tell me a joke"},
)
- mock_ctx.assert_not_called()
-
- @patch("dashboard.routes.agents.timmy_chat")
- def test_chat_normal_message_uses_timmy(self, mock_chat, client):
- mock_chat.return_value = "I'm doing well, thank you."
- resp = client.post(
- "/agents/timmy/chat",
- data={"message": "How are you?"},
- )
- assert resp.status_code == 200
- mock_chat.assert_called_once()
+ # timmy_chat is not called directly - message is queued
+ mock_chat.assert_not_called()
class TestBuildQueueContext:
@@ -534,6 +574,7 @@ class TestBuildQueueContext:
def test_returns_string_with_counts(self):
from dashboard.routes.agents import _build_queue_context
from swarm.task_queue.models import create_task
+
create_task(title="Context test task", created_by="test")
ctx = _build_queue_context()
assert "[System: Task queue" in ctx
@@ -541,7 +582,11 @@ class TestBuildQueueContext:
def test_returns_empty_on_error(self):
from dashboard.routes.agents import _build_queue_context
- with patch("swarm.task_queue.models.get_counts_by_status", side_effect=Exception("DB error")):
+
+ with patch(
+ "swarm.task_queue.models.get_counts_by_status",
+ side_effect=Exception("DB error"),
+ ):
ctx = _build_queue_context()
assert isinstance(ctx, str)
assert ctx == ""
@@ -558,3 +603,296 @@ def test_briefing_task_queue_summary():
create_task(title="Briefing integration test", created_by="test")
summary = _gather_task_queue_summary()
assert "pending" in summary.lower() or "task" in summary.lower()
+
+
+# ── Backlog Tests ──────────────────────────────────────────────────────────
+
+
+def test_backlogged_status_exists():
+ """BACKLOGGED is a valid task status."""
+ from swarm.task_queue.models import TaskStatus
+
+ assert TaskStatus.BACKLOGGED.value == "backlogged"
+
+
+def test_backlog_task():
+ """Tasks can be moved to backlogged status with a reason."""
+ from swarm.task_queue.models import create_task, update_task_status, TaskStatus, get_task
+
+ task = create_task(title="To backlog", created_by="test")
+ updated = update_task_status(
+ task.id, TaskStatus.BACKLOGGED,
+ result="Backlogged: no handler",
+ backlog_reason="No handler for task type: external",
+ )
+ assert updated.status == TaskStatus.BACKLOGGED
+ refreshed = get_task(task.id)
+ assert refreshed.backlog_reason == "No handler for task type: external"
+
+
+def test_list_backlogged_tasks():
+ """list_backlogged_tasks returns only backlogged tasks."""
+ from swarm.task_queue.models import (
+ create_task, update_task_status, TaskStatus, list_backlogged_tasks,
+ )
+
+ task = create_task(title="Backlog list test", created_by="test", assigned_to="timmy")
+ update_task_status(
+ task.id, TaskStatus.BACKLOGGED, backlog_reason="test reason",
+ )
+ backlogged = list_backlogged_tasks(assigned_to="timmy")
+ assert any(t.id == task.id for t in backlogged)
+
+
+def test_list_backlogged_tasks_filters_by_agent():
+ """list_backlogged_tasks filters by assigned_to."""
+ from swarm.task_queue.models import (
+ create_task, update_task_status, TaskStatus, list_backlogged_tasks,
+ )
+
+ task = create_task(title="Agent filter test", created_by="test", assigned_to="forge")
+ update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
+ backlogged = list_backlogged_tasks(assigned_to="echo")
+ assert not any(t.id == task.id for t in backlogged)
+
+
+def test_get_all_actionable_tasks():
+ """get_all_actionable_tasks returns approved and pending tasks in priority order."""
+ from swarm.task_queue.models import (
+ create_task, update_task_status, TaskStatus, get_all_actionable_tasks,
+ )
+
+ t1 = create_task(title="Low prio", created_by="test", assigned_to="drain-test", priority="low")
+ t2 = create_task(title="Urgent", created_by="test", assigned_to="drain-test", priority="urgent")
+ update_task_status(t2.id, TaskStatus.APPROVED) # Approve the urgent one
+
+ tasks = get_all_actionable_tasks("drain-test")
+ assert len(tasks) >= 2
+ # Urgent should come before low
+ ids = [t.id for t in tasks]
+ assert ids.index(t2.id) < ids.index(t1.id)
+
+
+def test_briefing_includes_backlogged():
+ """Briefing summary includes backlogged count."""
+ from swarm.task_queue.models import (
+ create_task, update_task_status, TaskStatus, get_task_summary_for_briefing,
+ )
+
+ task = create_task(title="Briefing backlog test", created_by="test")
+ update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="No handler")
+ summary = get_task_summary_for_briefing()
+ assert "backlogged" in summary
+ assert "recent_backlogged" in summary
+
+
+# ── Task Processor Tests ────────────────────────────────────────────────
+
+
+class TestTaskProcessor:
+ """Tests for the TaskProcessor drain and backlog logic."""
+
+ @pytest.mark.asyncio
+ async def test_drain_empty_queue(self):
+ """drain_queue with no tasks returns zero counts."""
+ from swarm.task_processor import TaskProcessor
+
+ tp = TaskProcessor("drain-empty-test")
+ summary = await tp.drain_queue()
+ assert summary["processed"] == 0
+ assert summary["backlogged"] == 0
+ assert summary["skipped"] == 0
+
+ @pytest.mark.asyncio
+ async def test_drain_backlogs_unhandled_tasks(self):
+ """Tasks with no registered handler get backlogged during drain."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("drain-backlog-test")
+ # No handlers registered — should backlog
+ task = create_task(
+ title="Unhandleable task",
+ task_type="unknown_type",
+ assigned_to="drain-backlog-test",
+ created_by="test",
+ requires_approval=False,
+ auto_approve=True,
+ )
+
+ summary = await tp.drain_queue()
+ assert summary["backlogged"] >= 1
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.BACKLOGGED
+ assert refreshed.backlog_reason is not None
+
+ @pytest.mark.asyncio
+ async def test_drain_processes_handled_tasks(self):
+ """Tasks with a registered handler get processed during drain."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("drain-process-test")
+ tp.register_handler("test_type", lambda task: "done")
+
+ task = create_task(
+ title="Handleable task",
+ task_type="test_type",
+ assigned_to="drain-process-test",
+ created_by="test",
+ requires_approval=False,
+ auto_approve=True,
+ )
+
+ summary = await tp.drain_queue()
+ assert summary["processed"] >= 1
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.COMPLETED
+
+ @pytest.mark.asyncio
+ async def test_drain_skips_pending_approval(self):
+ """Tasks requiring approval are skipped during drain."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("drain-skip-test")
+ tp.register_handler("chat_response", lambda task: "ok")
+
+ task = create_task(
+ title="Needs approval",
+ task_type="chat_response",
+ assigned_to="drain-skip-test",
+ created_by="user",
+ requires_approval=True,
+ auto_approve=False,
+ )
+
+ summary = await tp.drain_queue()
+ assert summary["skipped"] >= 1
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.PENDING_APPROVAL
+
+ @pytest.mark.asyncio
+ async def test_process_single_task_backlogs_on_no_handler(self):
+ """process_single_task backlogs when no handler is registered."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("single-backlog-test")
+ task = create_task(
+ title="No handler",
+ task_type="exotic_type",
+ assigned_to="single-backlog-test",
+ created_by="test",
+ requires_approval=False,
+ )
+
+ result = await tp.process_single_task(task)
+ assert result is None
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.BACKLOGGED
+
+ @pytest.mark.asyncio
+ async def test_process_single_task_backlogs_permanent_error(self):
+ """process_single_task backlogs tasks with permanent errors."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("perm-error-test")
+
+ def bad_handler(task):
+ raise RuntimeError("not supported operation")
+
+ tp.register_handler("broken_type", bad_handler)
+ task = create_task(
+ title="Perm error",
+ task_type="broken_type",
+ assigned_to="perm-error-test",
+ created_by="test",
+ requires_approval=False,
+ )
+
+ result = await tp.process_single_task(task)
+ assert result is None
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.BACKLOGGED
+
+ @pytest.mark.asyncio
+ async def test_process_single_task_fails_transient_error(self):
+ """process_single_task marks transient errors as FAILED (retryable)."""
+ from swarm.task_processor import TaskProcessor
+ from swarm.task_queue.models import create_task, get_task, TaskStatus
+
+ tp = TaskProcessor("transient-error-test")
+
+ def flaky_handler(task):
+ raise ConnectionError("Ollama connection refused")
+
+ tp.register_handler("flaky_type", flaky_handler)
+ task = create_task(
+ title="Transient error",
+ task_type="flaky_type",
+ assigned_to="transient-error-test",
+ created_by="test",
+ requires_approval=False,
+ )
+
+ result = await tp.process_single_task(task)
+ assert result is None
+
+ refreshed = get_task(task.id)
+ assert refreshed.status == TaskStatus.FAILED
+
+
+# ── Backlog Route Tests ─────────────────────────────────────────────────
+
+
+def test_api_list_backlogged(client):
+ resp = client.get("/api/tasks/backlog")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "tasks" in data
+ assert "count" in data
+
+
+def test_api_unbacklog_task(client):
+ from swarm.task_queue.models import create_task, update_task_status, TaskStatus
+
+ task = create_task(title="To unbacklog", created_by="test")
+ update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
+
+ resp = client.patch(f"/api/tasks/{task.id}/unbacklog")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["task"]["status"] == "approved"
+
+
+def test_api_unbacklog_wrong_status(client):
+ from swarm.task_queue.models import create_task
+
+ task = create_task(title="Not backlogged", created_by="test")
+ resp = client.patch(f"/api/tasks/{task.id}/unbacklog")
+ assert resp.status_code == 400
+
+
+def test_htmx_unbacklog(client):
+ from swarm.task_queue.models import create_task, update_task_status, TaskStatus
+
+ task = create_task(title="HTMX unbacklog", created_by="test")
+ update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
+
+ resp = client.post(f"/tasks/{task.id}/unbacklog")
+ assert resp.status_code == 200
+
+
+def test_task_counts_include_backlogged(client):
+ resp = client.get("/api/tasks/counts")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "backlogged" in data
diff --git a/thought_stream.txt b/thought_stream.txt
new file mode 100644
index 0000000..0e78f13
--- /dev/null
+++ b/thought_stream.txt
@@ -0,0 +1,3 @@
+To ponder, I think it would be beneficial for me to engage in a continuous stream of thoughts without interruption. This will allow me to delve deeper into complex problems and explore new connections.
+
+Upon deciding to take a break from thinking, I can then respond to your prompts and provide insights on the ideas that have been brewing in my mind.
\ No newline at end of file