Files
Timmy-time-dashboard/src/dashboard/routes/tasks.py
Claude (Opus 4.6) 43030b7db2
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
[claude] DRY up tasks_pending/active/completed in tasks.py (#942) (#1020)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 15:10:05 +00:00

414 lines
14 KiB
Python

"""Task Queue routes — SQLite-backed CRUD for the task management dashboard."""
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from pathlib import Path
from fastapi import APIRouter, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from config import settings
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(tags=["tasks"])
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
DB_PATH = Path(settings.repo_root) / "data" / "tasks.db"
VALID_STATUSES = {
"pending_approval",
"approved",
"running",
"paused",
"completed",
"vetoed",
"failed",
"backlogged",
}
VALID_PRIORITIES = {"low", "normal", "high", "urgent"}
@contextmanager
def _get_db() -> Generator[sqlite3.Connection, None, None]:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
status TEXT DEFAULT 'pending_approval',
priority TEXT DEFAULT 'normal',
assigned_to TEXT DEFAULT '',
created_by TEXT DEFAULT 'operator',
result TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)
""")
conn.commit()
yield conn
def _row_to_dict(row: sqlite3.Row) -> dict:
return dict(row)
class _EnumLike:
"""Thin wrapper so Jinja templates can use task.status.value."""
def __init__(self, v: str):
self.value = v
def __str__(self):
return self.value
def __eq__(self, other):
if isinstance(other, str):
return self.value == other
return NotImplemented
class _TaskView:
"""Lightweight view object for Jinja template rendering."""
def __init__(self, row: dict):
self.id = row["id"]
self.title = row.get("title", "")
self.description = row.get("description", "")
self.status = _EnumLike(row.get("status", "pending_approval"))
self.priority = _EnumLike(row.get("priority", "normal"))
self.assigned_to = row.get("assigned_to", "")
self.created_by = row.get("created_by", "operator")
self.result = row.get("result", "")
self.created_at = row.get("created_at", "")
self.completed_at = row.get("completed_at")
self.steps = [] # reserved for future use
# ---------------------------------------------------------------------------
# Page routes
# ---------------------------------------------------------------------------
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
return templates.TemplateResponse(
request,
"tasks.html",
{
"pending_count": len(pending),
"pending": pending,
"active": active,
"completed": completed,
"agents": [], # no agent roster wired yet
"pre_assign": "",
},
)
# ---------------------------------------------------------------------------
# HTMX partials (polled by the template)
# ---------------------------------------------------------------------------
def _render_task_list(request: Request, query: str, empty_msg: str) -> HTMLResponse:
"""Fetch tasks by query and render as HTMX task-card partials."""
with _get_db() as db:
rows = db.execute(query).fetchall()
parts = [
templates.TemplateResponse(
request, "partials/task_card.html", {"task": _TaskView(_row_to_dict(r))}
).body.decode()
for r in rows
]
if not parts:
return HTMLResponse(f'<div class="empty-column">{empty_msg}</div>')
return HTMLResponse("".join(parts))
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
return _render_task_list(
request,
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC",
"No pending tasks",
)
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
return _render_task_list(
request,
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC",
"No active tasks",
)
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
return _render_task_list(
request,
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50",
"No completed tasks yet",
)
# ---------------------------------------------------------------------------
# Form-based create (used by the modal in tasks.html)
# ---------------------------------------------------------------------------
@router.post("/tasks/create", response_class=HTMLResponse)
async def create_task_form(
request: Request,
title: str = Form(...),
description: str = Form(""),
priority: str = Form("normal"),
assigned_to: str = Form(""),
):
"""Create a task from the modal form and return a task card partial."""
title = title.strip()
if not title:
raise HTTPException(status_code=400, detail="Task title cannot be empty")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
# ---------------------------------------------------------------------------
# Task action endpoints (approve, veto, modify, pause, cancel, retry)
# ---------------------------------------------------------------------------
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/modify", response_class=HTMLResponse)
async def modify_task(
request: Request,
task_id: str,
title: str = Form(...),
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
async def _set_status(request: Request, task_id: str, new_status: str):
"""Helper to update status and return refreshed task card."""
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
# ---------------------------------------------------------------------------
# JSON API (for programmatic access / Timmy's tool calls)
# ---------------------------------------------------------------------------
@router.post("/api/tasks", response_class=JSONResponse, status_code=201)
async def api_create_task(request: Request):
"""Create a task via JSON API."""
body = await request.json()
title = body.get("title")
if not title:
raise HTTPException(422, "title is required")
task_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
priority = body.get("priority", "normal")
if priority not in VALID_PRIORITIES:
priority = "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
task_id,
title,
body.get("description", ""),
priority,
body.get("assigned_to", ""),
body.get("created_by", "operator"),
now,
),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
return JSONResponse(_row_to_dict(row), status_code=201)
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks():
"""List all tasks as JSON."""
with _get_db() as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
return JSONResponse([_row_to_dict(r) for r in rows])
@router.patch("/api/tasks/{task_id}/status", response_class=JSONResponse)
async def api_update_status(task_id: str, request: Request):
"""Update task status via JSON API."""
body = await request.json()
new_status = body.get("status")
if not new_status or new_status not in VALID_STATUSES:
raise HTTPException(422, f"Invalid status. Must be one of: {VALID_STATUSES}")
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
if not row:
raise HTTPException(404, "Task not found")
return JSONResponse(_row_to_dict(row))
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_delete_task(task_id: str):
"""Delete a task."""
with _get_db() as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
if cursor.rowcount == 0:
raise HTTPException(404, "Task not found")
return JSONResponse({"success": True, "id": task_id})
# ---------------------------------------------------------------------------
# Queue status (polled by the chat panel every 10 seconds)
# ---------------------------------------------------------------------------
@router.get("/api/queue/status", response_class=JSONResponse)
async def queue_status(assigned_to: str = "default"):
"""Return queue status for the chat panel's agent status indicator."""
with _get_db() as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
).fetchone()
ahead = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
if running:
return JSONResponse(
{
"is_working": True,
"current_task": {"id": running["id"], "title": running["title"]},
"tasks_ahead": 0,
}
)
return JSONResponse(
{
"is_working": False,
"current_task": None,
"tasks_ahead": ahead["cnt"] if ahead else 0,
}
)