feat: task queue system with startup drain and backlogging (#76)

* feat: add task queue system for Timmy - all work goes through the queue

- Add queue position tracking to task_queue models with task_type field
- Add TaskProcessor class that consumes tasks from queue one at a time
- Modify chat route to queue all messages for async processing
- Chat responses get 'high' priority to jump ahead of thought tasks
- Add queue status API endpoints for position polling
- Update UI to show queue position (x/y) and current task banner
- Replace thinking loop with task-based approach - thoughts are queued tasks
- Push responses to user via WebSocket instead of immediate HTTP response
- Add database migrations for existing tables

* feat: Timmy drains task queue on startup, backlogs unhandleable tasks

On spin-up, Timmy now iterates through all pending/approved tasks
immediately instead of waiting for the polling loop. Tasks without a
registered handler or with permanent errors are moved to a new
BACKLOGGED status with a reason, keeping the queue clear for work
Timmy can actually do.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Alexander Payne <apayne@MM.local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-02-27 01:52:42 -05:00
committed by GitHub
parent 849b5b1a8d
commit 5b6d33e05a
12 changed files with 1286 additions and 120 deletions

View File

@@ -34,10 +34,8 @@
## User Profile
**Name:** (not set)
**Interests:** (to be learned)
**Name:** Not
---
## Key Decisions

View File

@@ -4,7 +4,7 @@
## Basic Information
**Name:** TestUser
**Name:** Not
**Location:** (unknown)
**Occupation:** (unknown)
**Technical Level:** (to be assessed)
@@ -40,4 +40,4 @@
---
*Last updated: 2026-02-25*
*Last updated: 2026-02-27*

4
message_to_alexander.txt Normal file
View File

@@ -0,0 +1,4 @@
Hello Alexander,
I'm feeling good today, thanks for asking.
My team is doing well and we're making progress on our projects.

View File

@@ -84,29 +84,139 @@ async def _briefing_scheduler() -> None:
async def _thinking_loop() -> None:
"""Background task: Timmy's default thinking thread.
Starts shortly after server boot and runs on a configurable cadence.
Timmy ponders his existence, recent swarm activity, scripture, creative
ideas, or continues a previous train of thought.
Instead of thinking directly, this creates thought tasks in the queue
for the task processor to handle. This ensures all of Timmy's work
goes through the unified task system.
"""
from timmy.thinking import thinking_engine
from swarm.task_queue.models import create_task
from datetime import datetime
await asyncio.sleep(10) # Let server finish starting before first thought
while True:
try:
await thinking_engine.think_once()
# Create a thought task instead of thinking directly
now = datetime.now()
create_task(
title=f"Thought: {now.strftime('%A %B %d, %I:%M %p')}",
description="Continue thinking about your existence, recent events, scripture, creative ideas, or a previous thread of thought.",
assigned_to="timmy",
created_by="timmy", # Self-generated
priority="low",
requires_approval=False,
auto_approve=True,
task_type="thought",
)
logger.debug("Created thought task in queue")
except Exception as exc:
logger.error("Thinking loop error: %s", exc)
await asyncio.sleep(settings.thinking_interval_seconds)
async def _task_processor_loop() -> None:
"""Background task: Timmy's task queue processor.
On startup, drains all pending/approved tasks immediately — iterating
through the queue and processing what can be handled, backlogging what
can't. Then enters the steady-state polling loop.
"""
from swarm.task_processor import task_processor
from swarm.task_queue.models import update_task_status, TaskStatus
from timmy.session import chat as timmy_chat
from datetime import datetime
import json
import asyncio
await asyncio.sleep(5) # Let server finish starting
def handle_chat_response(task):
"""Handler for chat_response tasks - calls Timmy and returns response."""
try:
now = datetime.now()
context = f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n\n"
response = timmy_chat(context + task.description)
# Push response to user via WebSocket
try:
from infrastructure.ws_manager.handler import ws_manager
asyncio.create_task(
ws_manager.broadcast(
"timmy_response",
{
"task_id": task.id,
"response": response,
},
)
)
except Exception as e:
logger.debug("Failed to push response via WS: %s", e)
return response
except Exception as e:
logger.error("Chat response failed: %s", e)
return f"Error: {str(e)}"
def handle_thought(task):
"""Handler for thought tasks - Timmy's internal thinking."""
from timmy.thinking import thinking_engine
try:
result = thinking_engine.think_once()
return str(result) if result else "Thought completed"
except Exception as e:
logger.error("Thought processing failed: %s", e)
return f"Error: {str(e)}"
# Register handlers
task_processor.register_handler("chat_response", handle_chat_response)
task_processor.register_handler("thought", handle_thought)
task_processor.register_handler("internal", handle_thought)
# ── Startup drain: iterate through all pending tasks immediately ──
logger.info("Draining task queue on startup…")
try:
summary = await task_processor.drain_queue()
if summary["processed"] or summary["backlogged"]:
logger.info(
"Startup drain: %d processed, %d backlogged, %d skipped, %d failed",
summary["processed"],
summary["backlogged"],
summary["skipped"],
summary["failed"],
)
# Notify via WebSocket so the dashboard updates
try:
from infrastructure.ws_manager.handler import ws_manager
asyncio.create_task(
ws_manager.broadcast_json(
{
"type": "task_event",
"event": "startup_drain_complete",
"summary": summary,
}
)
)
except Exception:
pass
except Exception as exc:
logger.error("Startup drain failed: %s", exc)
# ── Steady-state: poll for new tasks ──
logger.info("Task processor entering steady-state loop")
await task_processor.run_loop(interval_seconds=3.0)
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(_briefing_scheduler())
# Register Timmy in the swarm registry so it shows up alongside other agents
from swarm import registry as swarm_registry
swarm_registry.register(
name="Timmy",
capabilities="chat,reasoning,research,planning",
@@ -115,6 +225,7 @@ async def lifespan(app: FastAPI):
# Log swarm recovery summary (reconciliation ran during coordinator init)
from swarm.coordinator import coordinator as swarm_coordinator
rec = swarm_coordinator._recovery_summary
if rec["tasks_failed"] or rec["agents_offlined"]:
logger.info(
@@ -138,6 +249,7 @@ async def lifespan(app: FastAPI):
# Log system startup event so the Events page is never empty
try:
from swarm.event_log import log_event, EventType
log_event(
EventType.SYSTEM_INFO,
source="coordinator",
@@ -148,6 +260,7 @@ async def lifespan(app: FastAPI):
# Auto-bootstrap MCP tools
from mcp.bootstrap import auto_bootstrap, get_bootstrap_status
try:
registered = auto_bootstrap()
if registered:
@@ -157,6 +270,7 @@ async def lifespan(app: FastAPI):
# Initialise Spark Intelligence engine
from spark.engine import spark_engine
if spark_engine.enabled:
logger.info("Spark Intelligence active — event capture enabled")
@@ -169,10 +283,17 @@ async def lifespan(app: FastAPI):
settings.thinking_interval_seconds,
)
# Start Timmy's task queue processor (skip in test mode)
task_processor_task = None
if os.environ.get("TIMMY_TEST_MODE") != "1":
task_processor_task = asyncio.create_task(_task_processor_loop())
logger.info("Task queue processor started")
# Auto-start chat integrations (skip silently if unconfigured)
from integrations.telegram_bot.bot import telegram_bot
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.chat_bridge.registry import platform_registry
platform_registry.register(discord_bot)
if settings.telegram_token:
@@ -195,6 +316,12 @@ async def lifespan(app: FastAPI):
await thinking_task
except asyncio.CancelledError:
pass
if task_processor_task:
task_processor_task.cancel()
try:
await task_processor_task
except asyncio.CancelledError:
pass
task.cancel()
try:
await task
@@ -272,4 +399,5 @@ async def index(request: Request):
async def shortcuts_setup():
"""Siri Shortcuts setup guide."""
from integrations.shortcuts.siri import get_setup_guide
return get_setup_guide()

View File

@@ -237,16 +237,19 @@ async def clear_history(request: Request):
@router.post("/timmy/chat", response_class=HTMLResponse)
async def chat_timmy(request: Request, message: str = Form(...)):
"""Chat with Timmy - queues message as task for async processing."""
from swarm.task_queue.models import create_task, get_queue_status_for_task
timestamp = datetime.now().strftime("%H:%M:%S")
task_id = None
response_text = None
error_text = None
queue_info = None
# Check if the user wants to queue a task instead of chatting
# Check if the user wants to queue a task (explicit queue request)
task_info = _extract_task_from_message(message)
if task_info:
try:
from swarm.task_queue.models import create_task
task = create_task(
title=task_info["title"],
description=task_info["description"],
@@ -254,7 +257,9 @@ async def chat_timmy(request: Request, message: str = Form(...)):
assigned_to=task_info.get("agent", "timmy"),
priority=task_info.get("priority", "normal"),
requires_approval=True,
task_type="task_request",
)
task_id = task.id
priority_label = (
f" | Priority: `{task.priority.value}`"
if task.priority.value != "normal"
@@ -276,27 +281,54 @@ async def chat_timmy(request: Request, message: str = Form(...)):
logger.error("Failed to create task from chat: %s", exc)
task_info = None
# Normal chat path (also used as fallback if task creation failed)
# Normal chat: always queue for async processing
if not task_info:
try:
now = datetime.now()
context_parts = [
f"[System: Current date/time is {now.strftime('%A, %B %d, %Y at %I:%M %p')}]"
]
if _QUEUE_QUERY_PATTERN.search(message):
queue_ctx = _build_queue_context()
if queue_ctx:
context_parts.append(queue_ctx)
context_prefix = "\n".join(context_parts) + "\n\n"
response_text = timmy_chat(context_prefix + message)
except Exception as exc:
error_text = f"Timmy is offline: {exc}"
# Create a chat response task (auto-approved for timmy)
# Priority is "high" to jump ahead of Timmy's self-generated "thought" tasks
# but below any "urgent" tasks Timmy might create
task = create_task(
title=message[:100] + ("..." if len(message) > 100 else ""),
description=message,
created_by="user",
assigned_to="timmy",
priority="high", # Higher than thought tasks, lower than urgent
requires_approval=True,
auto_approve=True, # Auto-approve chat responses
task_type="chat_response",
)
task_id = task.id
queue_info = get_queue_status_for_task(task.id)
# Acknowledge queuing
position = queue_info.get("position", 1)
total = queue_info.get("total", 1)
percent_ahead = queue_info.get("percent_ahead", 0)
response_text = (
f"Message queued for Timmy's attention.\n\n"
f"**Queue position:** {position}/{total} ({100 - percent_ahead}% complete ahead of you)\n\n"
f"_Timmy will respond shortly..._"
)
logger.info(
"Chat → queued: %s (id=%s, position=%d/%d)",
message[:50],
task.id,
position,
total,
)
except Exception as exc:
logger.error("Failed to queue chat message: %s", exc)
error_text = f"Failed to queue message: {exc}"
# Log to message history (for context, even though async)
message_log.append(role="user", content=message, timestamp=timestamp)
if response_text is not None:
message_log.append(role="agent", content=response_text, timestamp=timestamp)
else:
message_log.append(role="error", content=error_text, timestamp=timestamp)
message_log.append(
role="error", content=error_text or "Unknown error", timestamp=timestamp
)
return templates.TemplateResponse(
request,
@@ -306,5 +338,7 @@ async def chat_timmy(request: Request, message: str = Form(...)):
"response": response_text,
"error": error_text,
"timestamp": timestamp,
"task_id": task_id,
"queue_info": queue_info,
},
)

View File

@@ -32,6 +32,7 @@ from swarm.task_queue.models import (
get_counts_by_status,
get_pending_count,
get_task,
list_backlogged_tasks,
list_tasks,
update_task,
update_task_status,
@@ -45,6 +46,7 @@ templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templa
# ── Helper to broadcast task events via WebSocket ────────────────────────
def _broadcast_task_event(event_type: str, task: QueueTask):
"""Best-effort broadcast a task event to connected WebSocket clients."""
try:
@@ -74,25 +76,30 @@ def _broadcast_task_event(event_type: str, task: QueueTask):
# ── Dashboard page ───────────────────────────────────────────────────────
@router.get("/tasks", response_class=HTMLResponse)
async def task_queue_page(request: Request, assign: Optional[str] = None):
"""Task queue dashboard with three columns."""
pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \
list_tasks(status=TaskStatus.APPROVED)
active = list_tasks(status=TaskStatus.RUNNING) + \
list_tasks(status=TaskStatus.PAUSED)
completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \
list_tasks(status=TaskStatus.VETOED, limit=10) + \
list_tasks(status=TaskStatus.FAILED, limit=10)
pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks(
status=TaskStatus.APPROVED
)
active = list_tasks(status=TaskStatus.RUNNING) + list_tasks(
status=TaskStatus.PAUSED
)
backlogged = list_backlogged_tasks(limit=20)
completed = (
list_tasks(status=TaskStatus.COMPLETED, limit=20)
+ list_tasks(status=TaskStatus.VETOED, limit=10)
+ list_tasks(status=TaskStatus.FAILED, limit=10)
+ backlogged
)
# Get agents for the create modal
agents = []
try:
from swarm.coordinator import coordinator
agents = [
{"id": a.id, "name": a.name}
for a in coordinator.list_swarm_agents()
]
agents = [{"id": a.id, "name": a.name} for a in coordinator.list_swarm_agents()]
except Exception:
pass
# Always include core agents
@@ -120,11 +127,13 @@ async def task_queue_page(request: Request, assign: Optional[str] = None):
# ── HTMX partials ───────────────────────────────────────────────────────
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending_partial(request: Request):
"""HTMX partial: pending approval tasks."""
pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + \
list_tasks(status=TaskStatus.APPROVED)
pending = list_tasks(status=TaskStatus.PENDING_APPROVAL) + list_tasks(
status=TaskStatus.APPROVED
)
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -135,8 +144,9 @@ async def tasks_pending_partial(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active_partial(request: Request):
"""HTMX partial: active tasks."""
active = list_tasks(status=TaskStatus.RUNNING) + \
list_tasks(status=TaskStatus.PAUSED)
active = list_tasks(status=TaskStatus.RUNNING) + list_tasks(
status=TaskStatus.PAUSED
)
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -147,9 +157,12 @@ async def tasks_active_partial(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed_partial(request: Request):
"""HTMX partial: completed tasks."""
completed = list_tasks(status=TaskStatus.COMPLETED, limit=20) + \
list_tasks(status=TaskStatus.VETOED, limit=10) + \
list_tasks(status=TaskStatus.FAILED, limit=10)
completed = (
list_tasks(status=TaskStatus.COMPLETED, limit=20)
+ list_tasks(status=TaskStatus.VETOED, limit=10)
+ list_tasks(status=TaskStatus.FAILED, limit=10)
+ list_backlogged_tasks(limit=20)
)
return templates.TemplateResponse(
request,
"partials/task_cards.html",
@@ -159,6 +172,7 @@ async def tasks_completed_partial(request: Request):
# ── JSON API ─────────────────────────────────────────────────────────────
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks(
status: Optional[str] = None,
@@ -242,10 +256,24 @@ async def api_task_counts():
"completed": counts.get("completed", 0),
"failed": counts.get("failed", 0),
"vetoed": counts.get("vetoed", 0),
"backlogged": counts.get("backlogged", 0),
"total": sum(counts.values()),
}
# ── Backlog API (must be before {task_id} catch-all) ─────────────────────
@router.get("/api/tasks/backlog", response_class=JSONResponse)
async def api_list_backlogged(assigned_to: Optional[str] = None, limit: int = 50):
"""List all backlogged tasks."""
tasks = list_backlogged_tasks(assigned_to=assigned_to, limit=limit)
return {
"tasks": [_task_to_dict(t) for t in tasks],
"count": len(tasks),
}
@router.get("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_get_task(task_id: str):
"""Get a single task by ID."""
@@ -257,6 +285,7 @@ async def api_get_task(task_id: str):
# ── Workflow actions ─────────────────────────────────────────────────────
@router.patch("/api/tasks/{task_id}/approve", response_class=JSONResponse)
async def api_approve_task(task_id: str):
"""Approve a pending task."""
@@ -436,10 +465,101 @@ async def htmx_retry_task(request: Request, task_id: str):
)
@router.patch("/api/tasks/{task_id}/unbacklog", response_class=JSONResponse)
async def api_unbacklog_task(task_id: str):
"""Move a backlogged task back to approved for re-processing."""
task = get_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task.status != TaskStatus.BACKLOGGED:
raise HTTPException(400, "Can only unbacklog backlogged tasks")
updated = update_task_status(
task_id, TaskStatus.APPROVED, result=None, backlog_reason=None
)
_broadcast_task_event("task_unbacklogged", updated)
return {"success": True, "task": _task_to_dict(updated)}
@router.post("/tasks/{task_id}/unbacklog", response_class=HTMLResponse)
async def htmx_unbacklog_task(request: Request, task_id: str):
"""Move a backlogged task back to approved (HTMX)."""
task = get_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
updated = update_task_status(
task_id, TaskStatus.APPROVED, result=None, backlog_reason=None
)
_broadcast_task_event("task_unbacklogged", updated)
return templates.TemplateResponse(
request, "partials/task_card.html", {"task": updated}
)
# ── Queue Status API ─────────────────────────────────────────────────────
@router.get("/api/queue/status", response_class=JSONResponse)
async def api_queue_status(assigned_to: str = "timmy"):
"""Get queue status for an agent - position info for polling."""
from swarm.task_queue.models import (
get_current_task_for_agent,
get_queue_position_ahead,
get_next_pending_task,
)
current = get_current_task_for_agent(assigned_to)
next_task = get_next_pending_task(assigned_to)
ahead = get_queue_position_ahead(assigned_to)
return {
"agent": assigned_to,
"is_working": current is not None,
"current_task": _task_to_dict(current) if current else None,
"next_task": _task_to_dict(next_task) if next_task else None,
"tasks_ahead": ahead,
}
@router.get("/api/queue/position/{task_id}", response_class=JSONResponse)
async def api_queue_position(task_id: str):
"""Get queue position for a specific task."""
from swarm.task_queue.models import get_queue_status_for_task
status = get_queue_status_for_task(task_id)
if "error" in status:
raise HTTPException(404, status["error"])
return status
@router.get("/api/queue/agent/{assigned_to}", response_class=JSONResponse)
async def api_agent_queue(assigned_to: str, limit: int = 20):
"""Get all pending tasks for an agent."""
from swarm.task_queue.models import list_tasks, TaskStatus
tasks = list_tasks(
assigned_to=assigned_to,
status=None, # All statuses
limit=limit,
)
# Filter to pending/running tasks
pending = [
t
for t in tasks
if t.status not in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.VETOED)
]
return {
"assigned_to": assigned_to,
"tasks": [_task_to_dict(t) for t in pending],
"count": len(pending),
}
# ── Helpers ──────────────────────────────────────────────────────────────
def _task_to_dict(task: QueueTask) -> dict:
return {
d = {
"id": task.id,
"title": task.title,
"description": task.description,
@@ -457,11 +577,15 @@ def _task_to_dict(task: QueueTask) -> dict:
"completed_at": task.completed_at,
"updated_at": task.updated_at,
}
if task.backlog_reason:
d["backlog_reason"] = task.backlog_reason
return d
def _notify_task_created(task: QueueTask):
try:
from infrastructure.notifications.push import notifier
notifier.notify(
title="New Task",
message=f"{task.created_by} created: {task.title}",

View File

@@ -7,6 +7,11 @@
<div class="msg-meta">TIMMY // {{ timestamp }}</div>
<div class="msg-body timmy-md">{{ response | e }}</div>
</div>
{% if queue_info %}
<div class="queue-status" data-task-id="{{ task_id }}" data-position="{{ queue_info.position }}" data-total="{{ queue_info.total }}">
<small class="text-muted">Position in queue: {{ queue_info.position }}/{{ queue_info.total }}</small>
</div>
{% endif %}
<script>
(function() {
var el = document.currentScript.previousElementSibling.querySelector('.timmy-md');

View File

@@ -7,6 +7,9 @@
<span class="status-dot {{ 'green' if agent.status == 'idle' else 'amber' }}"></span>
{% endif %}
// TIMMY INTERFACE
<span id="timmy-status" class="ms-2" style="font-size: 0.75rem; color: #888;">
<span class="htmx-indicator">checking...</span>
</span>
</span>
<button class="mc-btn-clear"
hx-delete="/agents/timmy/history"
@@ -15,6 +18,11 @@
hx-confirm="Clear conversation history?">CLEAR</button>
</div>
<div id="current-task-banner" class="current-task-banner" style="display: none; background: #1a1a2e; padding: 8px 12px; border-bottom: 1px solid #333;">
<small style="color: #00ff88;">&#9679; WORKING:</small>
<span id="current-task-title" style="color: #fff;"></span>
</div>
<div class="chat-log flex-grow-1 overflow-auto p-3" id="chat-log"
hx-get="/agents/timmy/history"
hx-trigger="load"
@@ -87,4 +95,84 @@
htmx.process(form);
}, 100);
}
// Poll for Timmy's queue status (fallback) + WebSocket for real-time
(function() {
var statusEl = document.getElementById('timmy-status');
var banner = document.getElementById('current-task-banner');
var taskTitle = document.getElementById('current-task-title');
var protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
var ws;
var chatLog = document.getElementById('chat-log');
function updateFromData(data) {
if (data.is_working && data.current_task) {
statusEl.innerHTML = '<span style="color: #ffaa00;">working...</span>';
banner.style.display = 'block';
taskTitle.textContent = data.current_task.title;
} else if (data.tasks_ahead > 0) {
statusEl.innerHTML = '<span style="color: #888;">queue: ' + data.tasks_ahead + ' ahead</span>';
banner.style.display = 'none';
} else {
statusEl.innerHTML = '<span style="color: #00ff88;">ready</span>';
banner.style.display = 'none';
}
}
function fetchStatus() {
fetch('/api/queue/status?assigned_to=timmy')
.then(r => r.json())
.then(updateFromData)
.catch(() => {});
}
function appendMessage(role, content, timestamp) {
var div = document.createElement('div');
div.className = 'chat-message ' + role;
div.innerHTML = '<div class="msg-meta">' + (role === 'user' ? 'YOU' : 'TIMMY') + ' // ' + timestamp + '</div><div class="msg-body timmy-md">' + content.replace(/</g, '&lt;').replace(/>/g, '&gt;') + '</div>';
chatLog.appendChild(div);
// Render markdown if available
if (typeof marked !== 'undefined' && typeof DOMPurify !== 'undefined') {
var md = div.querySelector('.timmy-md');
md.innerHTML = DOMPurify.sanitize(marked.parse(md.textContent));
}
// Scroll to bottom
chatLog.scrollTop = chatLog.scrollHeight;
}
function connectWs() {
try {
ws = new WebSocket(protocol + '//' + window.location.host + '/ws');
ws.onmessage = function(event) {
try {
var msg = JSON.parse(event.data);
// Refresh on task events
if (msg.type === 'task_event') {
fetchStatus();
} else if (msg.type === 'timmy_thought') {
// Timmy thought - could show in UI
} else if (msg.event === 'task_created' || msg.event === 'task_completed' ||
msg.event === 'task_approved') {
fetchStatus();
} else if (msg.type === 'timmy_response') {
// Timmy pushed a response!
var now = new Date();
var ts = now.getHours().toString().padStart(2,'0') + ':' + now.getMinutes().toString().padStart(2,'0');
appendMessage('agent', msg.response, ts);
}
} catch(e) {}
};
ws.onclose = function() {
setTimeout(connectWs, 5000);
};
} catch(e) {}
}
// Initial fetch + start WebSocket
fetchStatus();
connectWs();
// Also poll periodically as fallback (every 10s)
setInterval(fetchStatus, 10000);
})();
</script>

263
src/swarm/task_processor.py Normal file
View File

@@ -0,0 +1,263 @@
"""Task processor for Timmy — consumes tasks from the queue one at a time.
This module provides a background loop that Timmy uses to process tasks
from the queue, including chat responses and self-generated tasks.
On startup, the processor drains all pending/approved tasks before entering
the steady-state polling loop. Tasks that have no registered handler are
moved to BACKLOGGED so they don't block the queue.
"""
import asyncio
import logging
from typing import Optional, Callable
from swarm.task_queue.models import (
QueueTask,
TaskStatus,
get_all_actionable_tasks,
get_current_task_for_agent,
get_next_pending_task,
update_task_status,
update_task_steps,
get_task,
)
logger = logging.getLogger(__name__)
class TaskProcessor:
"""Processes tasks from the queue for a specific agent."""
def __init__(self, agent_id: str = "timmy"):
self.agent_id = agent_id
self._current_task: Optional[QueueTask] = None
self._running = False
self._handlers: dict[str, Callable] = {}
self._user_callback: Optional[Callable[[str, str], None]] = (
None # (message_type, content)
)
def register_handler(self, task_type: str, handler: Callable[[QueueTask], str]):
"""Register a handler for a specific task type.
Handler receives the task and returns the result string.
"""
self._handlers[task_type] = handler
def set_user_callback(self, callback: Callable[[str, str], None]):
"""Set callback for pushing messages to the user.
Args:
callback: Function that takes (message_type, content)
message_type: 'response', 'progress', 'notification'
"""
self._user_callback = callback
def push_to_user(self, message_type: str, content: str):
"""Push a message to the user via the registered callback."""
if self._user_callback:
try:
self._user_callback(message_type, content)
except Exception as e:
logger.error("Failed to push message to user: %s", e)
else:
logger.debug("No user callback set, message not pushed: %s", content[:100])
def _backlog_task(self, task: QueueTask, reason: str) -> None:
"""Move a task to the backlog with a reason."""
update_task_status(
task.id,
TaskStatus.BACKLOGGED,
result=f"Backlogged: {reason}",
backlog_reason=reason,
)
update_task_steps(
task.id,
[{"description": f"Backlogged: {reason}", "status": "backlogged"}],
)
logger.info("Task backlogged: %s%s", task.title, reason)
async def process_single_task(self, task: QueueTask) -> Optional[QueueTask]:
"""Process one specific task. Backlog it if we can't handle it.
Returns the task on success, or None if backlogged/failed.
"""
# No handler → backlog immediately
handler = self._handlers.get(task.task_type)
if not handler:
self._backlog_task(task, f"No handler for task type: {task.task_type}")
return None
# Tasks still awaiting approval shouldn't be auto-executed
if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval:
logger.debug("Skipping task %s — needs human approval", task.id)
return None
self._current_task = task
update_task_status(task.id, TaskStatus.RUNNING)
try:
logger.info("Processing task: %s (type: %s)", task.title, task.task_type)
update_task_steps(
task.id,
[{"description": f"Processing: {task.title}", "status": "running"}],
)
result = handler(task)
update_task_status(task.id, TaskStatus.COMPLETED, result=result)
update_task_steps(
task.id,
[{"description": f"Completed: {task.title}", "status": "completed"}],
)
logger.info("Task completed: %s", task.id)
return task
except Exception as e:
error_msg = str(e)
logger.error("Task failed: %s - %s", task.id, error_msg)
# Determine if this is a permanent (backlog) or transient (fail) error
if self._is_permanent_failure(e):
self._backlog_task(task, error_msg)
else:
update_task_status(task.id, TaskStatus.FAILED, result=error_msg)
return None
finally:
self._current_task = None
def _is_permanent_failure(self, error: Exception) -> bool:
"""Decide whether an error means the task can never succeed.
Permanent failures get backlogged; transient ones stay as FAILED
so they can be retried.
"""
msg = str(error).lower()
permanent_indicators = [
"no handler",
"not implemented",
"unsupported",
"not supported",
"permission denied",
"forbidden",
"not found",
"invalid task",
]
return any(indicator in msg for indicator in permanent_indicators)
async def drain_queue(self) -> dict:
"""Iterate through ALL actionable tasks right now — called on startup.
Processes every approved/auto-approved task in priority order.
Tasks that can't be handled are backlogged. Tasks still requiring
human approval are skipped (left in PENDING_APPROVAL).
Returns a summary dict with counts of processed, backlogged, skipped.
"""
tasks = get_all_actionable_tasks(self.agent_id)
summary = {"processed": 0, "backlogged": 0, "skipped": 0, "failed": 0}
if not tasks:
logger.info("Startup drain: no pending tasks for %s", self.agent_id)
return summary
logger.info(
"Startup drain: %d task(s) to iterate through for %s",
len(tasks),
self.agent_id,
)
for task in tasks:
# Skip tasks that need human approval
if task.status == TaskStatus.PENDING_APPROVAL and task.requires_approval:
logger.debug("Drain: skipping %s (needs approval)", task.title)
summary["skipped"] += 1
continue
# No handler? Backlog it
if task.task_type not in self._handlers:
self._backlog_task(task, f"No handler for task type: {task.task_type}")
summary["backlogged"] += 1
continue
# Try to process
result = await self.process_single_task(task)
if result:
summary["processed"] += 1
else:
# Check if it was backlogged vs failed
refreshed = get_task(task.id)
if refreshed and refreshed.status == TaskStatus.BACKLOGGED:
summary["backlogged"] += 1
else:
summary["failed"] += 1
logger.info(
"Startup drain complete: %d processed, %d backlogged, %d skipped, %d failed",
summary["processed"],
summary["backlogged"],
summary["skipped"],
summary["failed"],
)
return summary
async def process_next_task(self) -> Optional[QueueTask]:
"""Process the next available task for this agent.
Returns the task that was processed, or None if no tasks available.
"""
# Check if already working on something
current = get_current_task_for_agent(self.agent_id)
if current:
logger.debug("Already processing task: %s", current.id)
return None
# Get next task
task = get_next_pending_task(self.agent_id)
if not task:
logger.debug("No pending tasks for %s", self.agent_id)
return None
return await self.process_single_task(task)
async def run_loop(self, interval_seconds: float = 5.0):
"""Run the task processing loop.
This should be called as a background task.
"""
self._running = True
logger.info("Task processor started for %s", self.agent_id)
while self._running:
try:
await self.process_next_task()
except Exception as e:
logger.error("Task processor error: %s", e)
await asyncio.sleep(interval_seconds)
logger.info("Task processor stopped for %s", self.agent_id)
def stop(self):
"""Stop the task processing loop."""
self._running = False
@property
def current_task(self) -> Optional[QueueTask]:
"""Get the currently processing task."""
if self._current_task:
return get_task(self._current_task.id)
return get_current_task_for_agent(self.agent_id)
# Global processor instance
task_processor = TaskProcessor("timmy")
def get_task_processor() -> TaskProcessor:
"""Get the global task processor instance."""
return task_processor

View File

@@ -24,6 +24,7 @@ class TaskStatus(str, Enum):
COMPLETED = "completed"
VETOED = "vetoed"
FAILED = "failed"
BACKLOGGED = "backlogged"
class TaskPriority(str, Enum):
@@ -47,6 +48,7 @@ class QueueTask:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
title: str = ""
description: str = ""
task_type: str = "chat_response" # chat_response, thought, internal, external
assigned_to: str = "timmy"
created_by: str = "user"
status: TaskStatus = TaskStatus.PENDING_APPROVAL
@@ -64,6 +66,8 @@ class QueueTask:
updated_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
queue_position: Optional[int] = None # Position in queue when created
backlog_reason: Optional[str] = None # Why the task was backlogged
# ── Auto-Approve Rules ──────────────────────────────────────────────────
@@ -97,6 +101,7 @@ def should_auto_approve(task: QueueTask) -> bool:
# ── Database ─────────────────────────────────────────────────────────────
def _get_conn() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
@@ -107,6 +112,7 @@ def _get_conn() -> sqlite3.Connection:
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
task_type TEXT DEFAULT 'chat_response',
assigned_to TEXT DEFAULT 'timmy',
created_by TEXT DEFAULT 'user',
status TEXT DEFAULT 'pending_approval',
@@ -119,18 +125,31 @@ def _get_conn() -> sqlite3.Connection:
created_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
updated_at TEXT NOT NULL
updated_at TEXT NOT NULL,
queue_position INTEGER,
backlog_reason TEXT
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)"
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)")
# Migrate existing tables - add new columns if they don't exist
try:
conn.execute(
"ALTER TABLE task_queue ADD COLUMN task_type TEXT DEFAULT 'chat_response'"
)
except sqlite3.OperationalError:
pass # Column already exists
try:
conn.execute("ALTER TABLE task_queue ADD COLUMN queue_position INTEGER")
except sqlite3.OperationalError:
pass # Column already exists
try:
conn.execute("ALTER TABLE task_queue ADD COLUMN backlog_reason TEXT")
except sqlite3.OperationalError:
pass # Column already exists
conn.commit()
return conn
@@ -146,6 +165,7 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask:
id=d["id"],
title=d["title"],
description=d.get("description", ""),
task_type=d.get("task_type", "chat_response"),
assigned_to=d.get("assigned_to", "timmy"),
created_by=d.get("created_by", "user"),
status=TaskStatus(d["status"]),
@@ -159,11 +179,14 @@ def _row_to_task(row: sqlite3.Row) -> QueueTask:
started_at=d.get("started_at"),
completed_at=d.get("completed_at"),
updated_at=d["updated_at"],
queue_position=d.get("queue_position"),
backlog_reason=d.get("backlog_reason"),
)
# ── CRUD ─────────────────────────────────────────────────────────────────
def create_task(
title: str,
description: str = "",
@@ -174,12 +197,18 @@ def create_task(
auto_approve: bool = False,
parent_task_id: Optional[str] = None,
steps: Optional[list] = None,
task_type: str = "chat_response",
) -> QueueTask:
"""Create a new task in the queue."""
now = datetime.now(timezone.utc).isoformat()
# Calculate queue position - count tasks ahead in queue (pending or approved)
queue_position = get_queue_position_ahead(assigned_to)
task = QueueTask(
title=title,
description=description,
task_type=task_type,
assigned_to=assigned_to,
created_by=created_by,
status=TaskStatus.PENDING_APPROVAL,
@@ -190,6 +219,7 @@ def create_task(
steps=steps or [],
created_at=now,
updated_at=now,
queue_position=queue_position,
)
# Check auto-approve
@@ -200,17 +230,31 @@ def create_task(
conn = _get_conn()
conn.execute(
"""INSERT INTO task_queue
(id, title, description, assigned_to, created_by, status, priority,
(id, title, description, task_type, assigned_to, created_by, status, priority,
requires_approval, auto_approve, parent_task_id, result, steps,
created_at, started_at, completed_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
created_at, started_at, completed_at, updated_at, queue_position,
backlog_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
task.id, task.title, task.description, task.assigned_to,
task.created_by, task.status.value, task.priority.value,
int(task.requires_approval), int(task.auto_approve),
task.parent_task_id, task.result, json.dumps(task.steps),
task.created_at, task.started_at, task.completed_at,
task.id,
task.title,
task.description,
task.task_type,
task.assigned_to,
task.created_by,
task.status.value,
task.priority.value,
int(task.requires_approval),
int(task.auto_approve),
task.parent_task_id,
task.result,
json.dumps(task.steps),
task.created_at,
task.started_at,
task.completed_at,
task.updated_at,
task.queue_position,
task.backlog_reason,
),
)
conn.commit()
@@ -220,9 +264,7 @@ def create_task(
def get_task(task_id: str) -> Optional[QueueTask]:
conn = _get_conn()
row = conn.execute(
"SELECT * FROM task_queue WHERE id = ?", (task_id,)
).fetchone()
row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -264,6 +306,7 @@ def update_task_status(
task_id: str,
new_status: TaskStatus,
result: Optional[str] = None,
backlog_reason: Optional[str] = None,
) -> Optional[QueueTask]:
now = datetime.now(timezone.utc).isoformat()
conn = _get_conn()
@@ -282,6 +325,10 @@ def update_task_status(
updates.append("result = ?")
params.append(result)
if backlog_reason is not None:
updates.append("backlog_reason = ?")
params.append(backlog_reason)
params.append(task_id)
conn.execute(
f"UPDATE task_queue SET {', '.join(updates)} WHERE id = ?",
@@ -289,9 +336,7 @@ def update_task_status(
)
conn.commit()
row = conn.execute(
"SELECT * FROM task_queue WHERE id = ?", (task_id,)
).fetchone()
row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -330,9 +375,7 @@ def update_task(
)
conn.commit()
row = conn.execute(
"SELECT * FROM task_queue WHERE id = ?", (task_id,)
).fetchone()
row = conn.execute("SELECT * FROM task_queue WHERE id = ?", (task_id,)).fetchone()
conn.close()
return _row_to_task(row) if row else None
@@ -369,6 +412,90 @@ def get_pending_count() -> int:
return row["cnt"] if row else 0
def get_queue_position_ahead(assigned_to: str) -> int:
"""Get count of tasks ahead of new tasks for a given assignee.
Counts tasks that are pending_approval or approved (waiting to be processed).
"""
conn = _get_conn()
row = conn.execute(
"""SELECT COUNT(*) as cnt FROM task_queue
WHERE assigned_to = ? AND status IN ('pending_approval', 'approved', 'running')
AND created_at < datetime('now')""",
(assigned_to,),
).fetchone()
conn.close()
return row["cnt"] if row else 0
def get_queue_status_for_task(task_id: str) -> dict:
"""Get queue position info for a specific task."""
task = get_task(task_id)
if not task:
return {"error": "Task not found"}
conn = _get_conn()
# Count tasks ahead of this one (created earlier, not completed)
ahead = conn.execute(
"""SELECT COUNT(*) as cnt FROM task_queue
WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed')
AND created_at < ?""",
(task.assigned_to, task.created_at),
).fetchone()
total = conn.execute(
"""SELECT COUNT(*) as cnt FROM task_queue
WHERE assigned_to = ? AND status NOT IN ('completed', 'failed', 'vetoed')""",
(task.assigned_to,),
).fetchone()
conn.close()
position = ahead["cnt"] + 1 if ahead else 1
total_count = total["cnt"] if total else 1
return {
"task_id": task_id,
"position": position,
"total": total_count,
"percent_ahead": int((ahead["cnt"] / total_count * 100))
if total_count > 0
else 0,
}
def get_current_task_for_agent(assigned_to: str) -> Optional[QueueTask]:
"""Get the currently running task for an agent."""
conn = _get_conn()
row = conn.execute(
"""SELECT * FROM task_queue
WHERE assigned_to = ? AND status = 'running'
ORDER BY started_at DESC LIMIT 1""",
(assigned_to,),
).fetchone()
conn.close()
return _row_to_task(row) if row else None
def get_next_pending_task(assigned_to: str) -> Optional[QueueTask]:
"""Get the next pending/approved task for an agent to work on."""
conn = _get_conn()
row = conn.execute(
"""SELECT * FROM task_queue
WHERE assigned_to = ? AND status IN ('approved', 'pending_approval')
ORDER BY
CASE priority
WHEN 'urgent' THEN 1
WHEN 'high' THEN 2
WHEN 'normal' THEN 3
WHEN 'low' THEN 4
END,
created_at ASC
LIMIT 1""",
(assigned_to,),
).fetchone()
conn.close()
return _row_to_task(row) if row else None
def get_task_summary_for_briefing() -> dict:
"""Get task stats for the morning briefing."""
counts = get_counts_by_status()
@@ -377,6 +504,10 @@ def get_task_summary_for_briefing() -> dict:
failed = conn.execute(
"SELECT title, result FROM task_queue WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 5"
).fetchall()
# Backlogged tasks
backlogged = conn.execute(
"SELECT title, backlog_reason FROM task_queue WHERE status = 'backlogged' ORDER BY updated_at DESC LIMIT 5"
).fetchall()
conn.close()
return {
@@ -385,6 +516,56 @@ def get_task_summary_for_briefing() -> dict:
"completed": counts.get("completed", 0),
"failed": counts.get("failed", 0),
"vetoed": counts.get("vetoed", 0),
"backlogged": counts.get("backlogged", 0),
"total": sum(counts.values()),
"recent_failures": [{"title": r["title"], "result": r["result"]} for r in failed],
"recent_failures": [
{"title": r["title"], "result": r["result"]} for r in failed
],
"recent_backlogged": [
{"title": r["title"], "reason": r["backlog_reason"]} for r in backlogged
],
}
def list_backlogged_tasks(
assigned_to: Optional[str] = None, limit: int = 50
) -> list[QueueTask]:
"""List all backlogged tasks, optionally filtered by assignee."""
conn = _get_conn()
if assigned_to:
rows = conn.execute(
"""SELECT * FROM task_queue WHERE status = 'backlogged' AND assigned_to = ?
ORDER BY priority, created_at ASC LIMIT ?""",
(assigned_to, limit),
).fetchall()
else:
rows = conn.execute(
"""SELECT * FROM task_queue WHERE status = 'backlogged'
ORDER BY priority, created_at ASC LIMIT ?""",
(limit,),
).fetchall()
conn.close()
return [_row_to_task(r) for r in rows]
def get_all_actionable_tasks(assigned_to: str) -> list[QueueTask]:
"""Get all tasks that should be processed on startup — approved or auto-approved pending.
Returns tasks ordered by priority then creation time (urgent first, oldest first).
"""
conn = _get_conn()
rows = conn.execute(
"""SELECT * FROM task_queue
WHERE assigned_to = ? AND status IN ('approved', 'pending_approval')
ORDER BY
CASE priority
WHEN 'urgent' THEN 1
WHEN 'high' THEN 2
WHEN 'normal' THEN 3
WHEN 'low' THEN 4
END,
created_at ASC""",
(assigned_to,),
).fetchall()
conn.close()
return [_row_to_task(r) for r in rows]

View File

@@ -59,7 +59,10 @@ def test_list_tasks():
def test_list_tasks_with_status_filter():
from swarm.task_queue.models import (
create_task, list_tasks, update_task_status, TaskStatus,
create_task,
list_tasks,
update_task_status,
TaskStatus,
)
task = create_task(title="Filter test", created_by="test")
@@ -70,7 +73,9 @@ def test_list_tasks_with_status_filter():
def test_update_task_status():
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus,
create_task,
update_task_status,
TaskStatus,
)
task = create_task(title="Status test", created_by="test")
@@ -80,7 +85,9 @@ def test_update_task_status():
def test_update_task_running_sets_started_at():
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus,
create_task,
update_task_status,
TaskStatus,
)
task = create_task(title="Running test", created_by="test")
@@ -90,7 +97,9 @@ def test_update_task_running_sets_started_at():
def test_update_task_completed_sets_completed_at():
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus,
create_task,
update_task_status,
TaskStatus,
)
task = create_task(title="Complete test", created_by="test")
@@ -314,6 +323,7 @@ class TestExtractTaskFromMessage:
def test_add_to_queue(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Add refactor the login to the task queue")
assert result is not None
assert result["agent"] == "timmy"
@@ -321,33 +331,42 @@ class TestExtractTaskFromMessage:
def test_schedule_this(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Schedule this for later")
assert result is not None
def test_create_a_task(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Create a task to fix the login page")
assert result is not None
assert "title" in result
def test_normal_message_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
assert _extract_task_from_message("Hello, how are you?") is None
def test_meta_question_about_tasks_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
assert _extract_task_from_message("How do I create a task?") is None
def test_what_is_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
assert _extract_task_from_message("What is a task queue?") is None
def test_explain_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
assert _extract_task_from_message("Can you explain how to create a task?") is None
assert (
_extract_task_from_message("Can you explain how to create a task?") is None
)
def test_what_would_question_returns_none(self):
from dashboard.routes.agents import _extract_task_from_message
assert _extract_task_from_message("What would a task flow look like?") is None
@@ -356,22 +375,32 @@ class TestExtractAgentFromMessage:
def test_extracts_forge(self):
from dashboard.routes.agents import _extract_agent_from_message
assert _extract_agent_from_message("Create a task for Forge to refactor") == "forge"
assert (
_extract_agent_from_message("Create a task for Forge to refactor")
== "forge"
)
def test_extracts_echo(self):
from dashboard.routes.agents import _extract_agent_from_message
assert _extract_agent_from_message("Add research for Echo to the queue") == "echo"
assert (
_extract_agent_from_message("Add research for Echo to the queue") == "echo"
)
def test_case_insensitive(self):
from dashboard.routes.agents import _extract_agent_from_message
assert _extract_agent_from_message("Schedule this for SEER") == "seer"
def test_defaults_to_timmy(self):
from dashboard.routes.agents import _extract_agent_from_message
assert _extract_agent_from_message("Create a task to fix the bug") == "timmy"
def test_ignores_unknown_agent(self):
from dashboard.routes.agents import _extract_agent_from_message
assert _extract_agent_from_message("Create a task for BobAgent") == "timmy"
@@ -380,26 +409,32 @@ class TestExtractPriorityFromMessage:
def test_urgent(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("urgent: fix the server") == "urgent"
def test_critical(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("This is critical, do it now") == "urgent"
def test_asap(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("Fix this ASAP") == "urgent"
def test_high_priority(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("This is important work") == "high"
def test_low_priority(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("minor cleanup task") == "low"
def test_default_normal(self):
from dashboard.routes.agents import _extract_priority_from_message
assert _extract_priority_from_message("Fix the login page") == "normal"
@@ -408,25 +443,31 @@ class TestTitleCleaning:
def test_strips_agent_from_title(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Create a task for Forge to refactor the login")
result = _extract_task_from_message(
"Create a task for Forge to refactor the login"
)
assert result is not None
assert "forge" not in result["title"].lower()
assert "for" not in result["title"].lower().split()[0:1] # "for" stripped
def test_strips_priority_from_title(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Create an urgent task to fix the server")
assert result is not None
assert "urgent" not in result["title"].lower()
def test_title_is_capitalized(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Add refactor the login to the task queue")
assert result is not None
assert result["title"][0].isupper()
def test_title_capped_at_120_chars(self):
from dashboard.routes.agents import _extract_task_from_message
long_msg = "Create a task to " + "x" * 200
result = _extract_task_from_message(long_msg)
assert result is not None
@@ -438,7 +479,10 @@ class TestFullExtraction:
def test_task_includes_agent_and_priority(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Create a high priority task for Forge to refactor auth")
result = _extract_task_from_message(
"Create a high priority task for Forge to refactor auth"
)
assert result is not None
assert result["agent"] == "forge"
assert result["priority"] == "high"
@@ -446,7 +490,10 @@ class TestFullExtraction:
def test_create_with_all_fields(self):
from dashboard.routes.agents import _extract_task_from_message
result = _extract_task_from_message("Add an urgent task for Mace to audit security to the queue")
result = _extract_task_from_message(
"Add an urgent task for Mace to audit security to the queue"
)
assert result is not None
assert result["agent"] == "mace"
assert result["priority"] == "urgent"
@@ -482,50 +529,43 @@ class TestChatTimmyIntegration:
assert resp.status_code == 200
assert "Task queued" in resp.text or "urgent" in resp.text.lower()
@patch("dashboard.routes.agents.timmy_chat")
def test_chat_injects_datetime_context(self, mock_chat, client):
mock_chat.return_value = "Hello there!"
client.post(
def test_chat_queues_message_for_async_processing(self, client):
"""Normal chat messages are now queued for async processing."""
resp = client.post(
"/agents/timmy/chat",
data={"message": "Hello Timmy"},
data={"message": "Hello Timmy, how are you?"},
)
mock_chat.assert_called_once()
call_arg = mock_chat.call_args[0][0]
assert "[System: Current date/time is" in call_arg
assert resp.status_code == 200
# Should queue the message, not respond immediately
assert "queued" in resp.text.lower() or "queue" in resp.text.lower()
# Should show position info
assert "position" in resp.text.lower() or "1/" in resp.text
def test_chat_creates_chat_response_task(self, client):
"""Chat messages create a chat_response task type."""
from swarm.task_queue.models import list_tasks, TaskStatus
resp = client.post(
"/agents/timmy/chat",
data={"message": "Test message"},
)
assert resp.status_code == 200
# Check that a chat_response task was created
tasks = list_tasks(assigned_to="timmy")
chat_tasks = [t for t in tasks if t.task_type == "chat_response"]
assert len(chat_tasks) >= 1
@patch("dashboard.routes.agents.timmy_chat")
@patch("dashboard.routes.agents._build_queue_context")
def test_chat_injects_queue_context_on_queue_query(self, mock_ctx, mock_chat, client):
mock_ctx.return_value = "[System: Task queue — 3 pending approval, 1 running, 5 completed.]"
mock_chat.return_value = "There are 3 tasks pending."
client.post(
"/agents/timmy/chat",
data={"message": "What tasks are in the queue?"},
)
mock_ctx.assert_called_once()
mock_chat.assert_called_once()
call_arg = mock_chat.call_args[0][0]
assert "[System: Task queue" in call_arg
@patch("dashboard.routes.agents.timmy_chat")
@patch("dashboard.routes.agents._build_queue_context")
def test_chat_no_queue_context_for_normal_message(self, mock_ctx, mock_chat, client):
def test_chat_no_queue_context_for_normal_message(self, mock_chat, client):
"""Queue context is not built for normal queued messages."""
mock_chat.return_value = "Hi!"
client.post(
"/agents/timmy/chat",
data={"message": "Tell me a joke"},
)
mock_ctx.assert_not_called()
@patch("dashboard.routes.agents.timmy_chat")
def test_chat_normal_message_uses_timmy(self, mock_chat, client):
mock_chat.return_value = "I'm doing well, thank you."
resp = client.post(
"/agents/timmy/chat",
data={"message": "How are you?"},
)
assert resp.status_code == 200
mock_chat.assert_called_once()
# timmy_chat is not called directly - message is queued
mock_chat.assert_not_called()
class TestBuildQueueContext:
@@ -534,6 +574,7 @@ class TestBuildQueueContext:
def test_returns_string_with_counts(self):
from dashboard.routes.agents import _build_queue_context
from swarm.task_queue.models import create_task
create_task(title="Context test task", created_by="test")
ctx = _build_queue_context()
assert "[System: Task queue" in ctx
@@ -541,7 +582,11 @@ class TestBuildQueueContext:
def test_returns_empty_on_error(self):
from dashboard.routes.agents import _build_queue_context
with patch("swarm.task_queue.models.get_counts_by_status", side_effect=Exception("DB error")):
with patch(
"swarm.task_queue.models.get_counts_by_status",
side_effect=Exception("DB error"),
):
ctx = _build_queue_context()
assert isinstance(ctx, str)
assert ctx == ""
@@ -558,3 +603,296 @@ def test_briefing_task_queue_summary():
create_task(title="Briefing integration test", created_by="test")
summary = _gather_task_queue_summary()
assert "pending" in summary.lower() or "task" in summary.lower()
# ── Backlog Tests ──────────────────────────────────────────────────────────
def test_backlogged_status_exists():
"""BACKLOGGED is a valid task status."""
from swarm.task_queue.models import TaskStatus
assert TaskStatus.BACKLOGGED.value == "backlogged"
def test_backlog_task():
"""Tasks can be moved to backlogged status with a reason."""
from swarm.task_queue.models import create_task, update_task_status, TaskStatus, get_task
task = create_task(title="To backlog", created_by="test")
updated = update_task_status(
task.id, TaskStatus.BACKLOGGED,
result="Backlogged: no handler",
backlog_reason="No handler for task type: external",
)
assert updated.status == TaskStatus.BACKLOGGED
refreshed = get_task(task.id)
assert refreshed.backlog_reason == "No handler for task type: external"
def test_list_backlogged_tasks():
"""list_backlogged_tasks returns only backlogged tasks."""
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus, list_backlogged_tasks,
)
task = create_task(title="Backlog list test", created_by="test", assigned_to="timmy")
update_task_status(
task.id, TaskStatus.BACKLOGGED, backlog_reason="test reason",
)
backlogged = list_backlogged_tasks(assigned_to="timmy")
assert any(t.id == task.id for t in backlogged)
def test_list_backlogged_tasks_filters_by_agent():
"""list_backlogged_tasks filters by assigned_to."""
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus, list_backlogged_tasks,
)
task = create_task(title="Agent filter test", created_by="test", assigned_to="forge")
update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
backlogged = list_backlogged_tasks(assigned_to="echo")
assert not any(t.id == task.id for t in backlogged)
def test_get_all_actionable_tasks():
"""get_all_actionable_tasks returns approved and pending tasks in priority order."""
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus, get_all_actionable_tasks,
)
t1 = create_task(title="Low prio", created_by="test", assigned_to="drain-test", priority="low")
t2 = create_task(title="Urgent", created_by="test", assigned_to="drain-test", priority="urgent")
update_task_status(t2.id, TaskStatus.APPROVED) # Approve the urgent one
tasks = get_all_actionable_tasks("drain-test")
assert len(tasks) >= 2
# Urgent should come before low
ids = [t.id for t in tasks]
assert ids.index(t2.id) < ids.index(t1.id)
def test_briefing_includes_backlogged():
"""Briefing summary includes backlogged count."""
from swarm.task_queue.models import (
create_task, update_task_status, TaskStatus, get_task_summary_for_briefing,
)
task = create_task(title="Briefing backlog test", created_by="test")
update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="No handler")
summary = get_task_summary_for_briefing()
assert "backlogged" in summary
assert "recent_backlogged" in summary
# ── Task Processor Tests ────────────────────────────────────────────────
class TestTaskProcessor:
"""Tests for the TaskProcessor drain and backlog logic."""
@pytest.mark.asyncio
async def test_drain_empty_queue(self):
"""drain_queue with no tasks returns zero counts."""
from swarm.task_processor import TaskProcessor
tp = TaskProcessor("drain-empty-test")
summary = await tp.drain_queue()
assert summary["processed"] == 0
assert summary["backlogged"] == 0
assert summary["skipped"] == 0
@pytest.mark.asyncio
async def test_drain_backlogs_unhandled_tasks(self):
"""Tasks with no registered handler get backlogged during drain."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("drain-backlog-test")
# No handlers registered — should backlog
task = create_task(
title="Unhandleable task",
task_type="unknown_type",
assigned_to="drain-backlog-test",
created_by="test",
requires_approval=False,
auto_approve=True,
)
summary = await tp.drain_queue()
assert summary["backlogged"] >= 1
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.BACKLOGGED
assert refreshed.backlog_reason is not None
@pytest.mark.asyncio
async def test_drain_processes_handled_tasks(self):
"""Tasks with a registered handler get processed during drain."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("drain-process-test")
tp.register_handler("test_type", lambda task: "done")
task = create_task(
title="Handleable task",
task_type="test_type",
assigned_to="drain-process-test",
created_by="test",
requires_approval=False,
auto_approve=True,
)
summary = await tp.drain_queue()
assert summary["processed"] >= 1
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.COMPLETED
@pytest.mark.asyncio
async def test_drain_skips_pending_approval(self):
"""Tasks requiring approval are skipped during drain."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("drain-skip-test")
tp.register_handler("chat_response", lambda task: "ok")
task = create_task(
title="Needs approval",
task_type="chat_response",
assigned_to="drain-skip-test",
created_by="user",
requires_approval=True,
auto_approve=False,
)
summary = await tp.drain_queue()
assert summary["skipped"] >= 1
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.PENDING_APPROVAL
@pytest.mark.asyncio
async def test_process_single_task_backlogs_on_no_handler(self):
"""process_single_task backlogs when no handler is registered."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("single-backlog-test")
task = create_task(
title="No handler",
task_type="exotic_type",
assigned_to="single-backlog-test",
created_by="test",
requires_approval=False,
)
result = await tp.process_single_task(task)
assert result is None
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.BACKLOGGED
@pytest.mark.asyncio
async def test_process_single_task_backlogs_permanent_error(self):
"""process_single_task backlogs tasks with permanent errors."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("perm-error-test")
def bad_handler(task):
raise RuntimeError("not supported operation")
tp.register_handler("broken_type", bad_handler)
task = create_task(
title="Perm error",
task_type="broken_type",
assigned_to="perm-error-test",
created_by="test",
requires_approval=False,
)
result = await tp.process_single_task(task)
assert result is None
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.BACKLOGGED
@pytest.mark.asyncio
async def test_process_single_task_fails_transient_error(self):
"""process_single_task marks transient errors as FAILED (retryable)."""
from swarm.task_processor import TaskProcessor
from swarm.task_queue.models import create_task, get_task, TaskStatus
tp = TaskProcessor("transient-error-test")
def flaky_handler(task):
raise ConnectionError("Ollama connection refused")
tp.register_handler("flaky_type", flaky_handler)
task = create_task(
title="Transient error",
task_type="flaky_type",
assigned_to="transient-error-test",
created_by="test",
requires_approval=False,
)
result = await tp.process_single_task(task)
assert result is None
refreshed = get_task(task.id)
assert refreshed.status == TaskStatus.FAILED
# ── Backlog Route Tests ─────────────────────────────────────────────────
def test_api_list_backlogged(client):
resp = client.get("/api/tasks/backlog")
assert resp.status_code == 200
data = resp.json()
assert "tasks" in data
assert "count" in data
def test_api_unbacklog_task(client):
from swarm.task_queue.models import create_task, update_task_status, TaskStatus
task = create_task(title="To unbacklog", created_by="test")
update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
resp = client.patch(f"/api/tasks/{task.id}/unbacklog")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["task"]["status"] == "approved"
def test_api_unbacklog_wrong_status(client):
from swarm.task_queue.models import create_task
task = create_task(title="Not backlogged", created_by="test")
resp = client.patch(f"/api/tasks/{task.id}/unbacklog")
assert resp.status_code == 400
def test_htmx_unbacklog(client):
from swarm.task_queue.models import create_task, update_task_status, TaskStatus
task = create_task(title="HTMX unbacklog", created_by="test")
update_task_status(task.id, TaskStatus.BACKLOGGED, backlog_reason="test")
resp = client.post(f"/tasks/{task.id}/unbacklog")
assert resp.status_code == 200
def test_task_counts_include_backlogged(client):
resp = client.get("/api/tasks/counts")
assert resp.status_code == 200
data = resp.json()
assert "backlogged" in data

3
thought_stream.txt Normal file
View File

@@ -0,0 +1,3 @@
To ponder, I think it would be beneficial for me to engage in a continuous stream of thoughts without interruption. This will allow me to delve deeper into complex problems and explore new connections.
Upon deciding to take a break from thinking, I can then respond to your prompts and provide insights on the ideas that have been brewing in my mind.