+ {% endif %}
+
+
+ {% if wo.status.value in ('submitted', 'triaged') %}
+
+
+
+
+ {% elif wo.status.value == 'approved' %}
+
+
+
+ {% elif wo.status.value == 'in_progress' %}
+
+ Executing...
+
+ {% endif %}
+
+
diff --git a/src/dashboard/templates/partials/work_order_cards.html b/src/dashboard/templates/partials/work_order_cards.html
new file mode 100644
index 0000000..f180358
--- /dev/null
+++ b/src/dashboard/templates/partials/work_order_cards.html
@@ -0,0 +1,15 @@
+{% if orders %}
+ {% for wo in orders %}
+ {% include "partials/work_order_card.html" %}
+ {% endfor %}
+{% else %}
+
+ {% if section == "pending" %}
+ No pending work orders.
+ {% elif section == "active" %}
+ No active work orders.
+ {% else %}
+ No work orders found.
+ {% endif %}
+
+ {% if pending %}
+ {% for wo in pending %}
+ {% include "partials/work_order_card.html" %}
+ {% endfor %}
+ {% else %}
+
+ No pending work orders. Use the + NEW button or the API to submit one.
+
+ {% endif %}
+
+
+
+
+
+
+ ACTIVE WORK
+
+
+ {% if active %}
+ {% for wo in active %}
+ {% include "partials/work_order_card.html" %}
+ {% endfor %}
+ {% else %}
+
+ No work orders currently in progress.
+
+ {% endif %}
+
+
+
+
+
+
+ HISTORY
+
+ {% if completed or rejected %}
+ {% for wo in completed %}
+ {% include "partials/work_order_card.html" %}
+ {% endfor %}
+ {% for wo in rejected %}
+ {% include "partials/work_order_card.html" %}
+ {% endfor %}
+ {% else %}
+
+ No completed or rejected work orders yet.
+
+ {% endif %}
+
+
+
+{% endblock %}
diff --git a/src/memory/vector_store.py b/src/memory/vector_store.py
index 638233a..a8171d2 100644
--- a/src/memory/vector_store.py
+++ b/src/memory/vector_store.py
@@ -377,6 +377,35 @@ def recall_personal_facts(agent_id: Optional[str] = None) -> list[str]:
return [r["content"] for r in rows]
+def recall_personal_facts_with_ids(agent_id: Optional[str] = None) -> list[dict]:
+ """Recall personal facts with their IDs for edit/delete operations."""
+ conn = _get_conn()
+ if agent_id:
+ rows = conn.execute(
+ "SELECT id, content FROM memory_entries WHERE context_type = 'fact' AND agent_id = ? ORDER BY timestamp DESC LIMIT 100",
+ (agent_id,),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ "SELECT id, content FROM memory_entries WHERE context_type = 'fact' ORDER BY timestamp DESC LIMIT 100",
+ ).fetchall()
+ conn.close()
+ return [{"id": r["id"], "content": r["content"]} for r in rows]
+
+
+def update_personal_fact(memory_id: str, new_content: str) -> bool:
+ """Update a personal fact's content."""
+ conn = _get_conn()
+ cursor = conn.execute(
+ "UPDATE memory_entries SET content = ? WHERE id = ? AND context_type = 'fact'",
+ (new_content, memory_id),
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+ return updated
+
+
def store_personal_fact(fact: str, agent_id: Optional[str] = None) -> MemoryEntry:
"""Store a personal fact about the user or system.
diff --git a/src/task_queue/__init__.py b/src/task_queue/__init__.py
new file mode 100644
index 0000000..a6136e4
--- /dev/null
+++ b/src/task_queue/__init__.py
@@ -0,0 +1 @@
+"""Task Queue — Human-in-the-loop approval and execution system."""
diff --git a/src/task_queue/models.py b/src/task_queue/models.py
new file mode 100644
index 0000000..25f3a6b
--- /dev/null
+++ b/src/task_queue/models.py
@@ -0,0 +1,390 @@
+"""Task Queue data model — SQLite-backed CRUD with human-in-the-loop states.
+
+Table: task_queue in data/swarm.db
+"""
+
+import json
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+
+DB_PATH = Path(__file__).resolve().parent.parent.parent / "data" / "swarm.db"
+
+
+class TaskStatus(str, Enum):
+ PENDING_APPROVAL = "pending_approval"
+ APPROVED = "approved"
+ RUNNING = "running"
+ PAUSED = "paused"
+ COMPLETED = "completed"
+ VETOED = "vetoed"
+ FAILED = "failed"
+
+
+class TaskPriority(str, Enum):
+ LOW = "low"
+ NORMAL = "normal"
+ HIGH = "high"
+ URGENT = "urgent"
+
+
+@dataclass
+class TaskStep:
+ description: str
+ status: str = "pending" # pending, running, completed, failed
+ started_at: Optional[str] = None
+ completed_at: Optional[str] = None
+ output: Optional[str] = None
+
+
+@dataclass
+class QueueTask:
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ title: str = ""
+ description: str = ""
+ assigned_to: str = "timmy"
+ created_by: str = "user"
+ status: TaskStatus = TaskStatus.PENDING_APPROVAL
+ priority: TaskPriority = TaskPriority.NORMAL
+ requires_approval: bool = True
+ auto_approve: bool = False
+ parent_task_id: Optional[str] = None
+ result: Optional[str] = None
+ steps: list = field(default_factory=list)
+ created_at: str = field(
+ default_factory=lambda: datetime.now(timezone.utc).isoformat()
+ )
+ started_at: Optional[str] = None
+ completed_at: Optional[str] = None
+ updated_at: str = field(
+ default_factory=lambda: datetime.now(timezone.utc).isoformat()
+ )
+
+
+# ── Auto-Approve Rules ──────────────────────────────────────────────────
+
+AUTO_APPROVE_RULES = [
+ {"assigned_to": "timmy", "type": "chat_response"},
+ {"assigned_to": "forge", "type": "run_tests"},
+ {"priority": "urgent", "created_by": "timmy"},
+]
+
+
+def should_auto_approve(task: QueueTask) -> bool:
+ """Check if a task matches any auto-approve rule."""
+ if not task.auto_approve:
+ return False
+ for rule in AUTO_APPROVE_RULES:
+ match = True
+ for key, val in rule.items():
+ if key == "type":
+ continue # type matching is informational for now
+ task_val = getattr(task, key, None)
+ if isinstance(task_val, Enum):
+ task_val = task_val.value
+ if task_val != val:
+ match = False
+ break
+ if match:
+ return True
+ return False
+
+
+# ── Database ─────────────────────────────────────────────────────────────
+
+def _get_conn() -> sqlite3.Connection:
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS task_queue (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ description TEXT DEFAULT '',
+ assigned_to TEXT DEFAULT 'timmy',
+ created_by TEXT DEFAULT 'user',
+ status TEXT DEFAULT 'pending_approval',
+ priority TEXT DEFAULT 'normal',
+ requires_approval INTEGER DEFAULT 1,
+ auto_approve INTEGER DEFAULT 0,
+ parent_task_id TEXT,
+ result TEXT,
+ steps TEXT DEFAULT '[]',
+ created_at TEXT NOT NULL,
+ started_at TEXT,
+ completed_at TEXT,
+ updated_at TEXT NOT NULL
+ )
+ """)
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_tq_status ON task_queue(status)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_tq_priority ON task_queue(priority)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_tq_created ON task_queue(created_at)"
+ )
+ conn.commit()
+ return conn
+
+
+def _row_to_task(row: sqlite3.Row) -> QueueTask:
+ d = dict(row)
+ steps_raw = d.pop("steps", "[]")
+ try:
+ steps = json.loads(steps_raw) if steps_raw else []
+ except (json.JSONDecodeError, TypeError):
+ steps = []
+ return QueueTask(
+ id=d["id"],
+ title=d["title"],
+ description=d.get("description", ""),
+ assigned_to=d.get("assigned_to", "timmy"),
+ created_by=d.get("created_by", "user"),
+ status=TaskStatus(d["status"]),
+ priority=TaskPriority(d.get("priority", "normal")),
+ requires_approval=bool(d.get("requires_approval", 1)),
+ auto_approve=bool(d.get("auto_approve", 0)),
+ parent_task_id=d.get("parent_task_id"),
+ result=d.get("result"),
+ steps=steps,
+ created_at=d["created_at"],
+ started_at=d.get("started_at"),
+ completed_at=d.get("completed_at"),
+ updated_at=d["updated_at"],
+ )
+
+
+# ── CRUD ─────────────────────────────────────────────────────────────────
+
+def create_task(
+ title: str,
+ description: str = "",
+ assigned_to: str = "timmy",
+ created_by: str = "user",
+ priority: str = "normal",
+ requires_approval: bool = True,
+ auto_approve: bool = False,
+ parent_task_id: Optional[str] = None,
+ steps: Optional[list] = None,
+) -> QueueTask:
+ """Create a new task in the queue."""
+ now = datetime.now(timezone.utc).isoformat()
+ task = QueueTask(
+ title=title,
+ description=description,
+ assigned_to=assigned_to,
+ created_by=created_by,
+ status=TaskStatus.PENDING_APPROVAL,
+ priority=TaskPriority(priority),
+ requires_approval=requires_approval,
+ auto_approve=auto_approve,
+ parent_task_id=parent_task_id,
+ steps=steps or [],
+ created_at=now,
+ updated_at=now,
+ )
+
+ # Check auto-approve
+ if should_auto_approve(task):
+ task.status = TaskStatus.APPROVED
+ task.requires_approval = False
+
+ conn = _get_conn()
+ conn.execute(
+ """INSERT INTO task_queue
+ (id, title, description, assigned_to, created_by, status, priority,
+ requires_approval, auto_approve, parent_task_id, result, steps,
+ created_at, started_at, completed_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ (
+ task.id, task.title, task.description, task.assigned_to,
+ task.created_by, task.status.value, task.priority.value,
+ int(task.requires_approval), int(task.auto_approve),
+ task.parent_task_id, task.result, json.dumps(task.steps),
+ task.created_at, task.started_at, task.completed_at,
+ task.updated_at,
+ ),
+ )
+ conn.commit()
+ conn.close()
+ return task
+
+
+def get_task(task_id: str) -> Optional[QueueTask]:
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT * FROM task_queue WHERE id = ?", (task_id,)
+ ).fetchone()
+ conn.close()
+ return _row_to_task(row) if row else None
+
+
+def list_tasks(
+ status: Optional[TaskStatus] = None,
+ priority: Optional[TaskPriority] = None,
+ assigned_to: Optional[str] = None,
+ created_by: Optional[str] = None,
+ limit: int = 100,
+) -> list[QueueTask]:
+ clauses, params = [], []
+ if status:
+ clauses.append("status = ?")
+ params.append(status.value)
+ if priority:
+ clauses.append("priority = ?")
+ params.append(priority.value)
+ if assigned_to:
+ clauses.append("assigned_to = ?")
+ params.append(assigned_to)
+ if created_by:
+ clauses.append("created_by = ?")
+ params.append(created_by)
+
+ where = " WHERE " + " AND ".join(clauses) if clauses else ""
+ params.append(limit)
+
+ conn = _get_conn()
+ rows = conn.execute(
+ f"SELECT * FROM task_queue{where} ORDER BY created_at DESC LIMIT ?",
+ params,
+ ).fetchall()
+ conn.close()
+ return [_row_to_task(r) for r in rows]
+
+
+def update_task_status(
+ task_id: str,
+ new_status: TaskStatus,
+ result: Optional[str] = None,
+) -> Optional[QueueTask]:
+ now = datetime.now(timezone.utc).isoformat()
+ conn = _get_conn()
+
+ updates = ["status = ?", "updated_at = ?"]
+ params = [new_status.value, now]
+
+ if new_status == TaskStatus.RUNNING:
+ updates.append("started_at = ?")
+ params.append(now)
+ elif new_status in (TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.VETOED):
+ updates.append("completed_at = ?")
+ params.append(now)
+
+ if result is not None:
+ updates.append("result = ?")
+ params.append(result)
+
+ params.append(task_id)
+ conn.execute(
+ f"UPDATE task_queue SET {', '.join(updates)} WHERE id = ?",
+ params,
+ )
+ conn.commit()
+
+ row = conn.execute(
+ "SELECT * FROM task_queue WHERE id = ?", (task_id,)
+ ).fetchone()
+ conn.close()
+ return _row_to_task(row) if row else None
+
+
+def update_task(
+ task_id: str,
+ title: Optional[str] = None,
+ description: Optional[str] = None,
+ assigned_to: Optional[str] = None,
+ priority: Optional[str] = None,
+) -> Optional[QueueTask]:
+ """Update task fields (for MODIFY action)."""
+ now = datetime.now(timezone.utc).isoformat()
+ conn = _get_conn()
+
+ updates = ["updated_at = ?"]
+ params = [now]
+
+ if title is not None:
+ updates.append("title = ?")
+ params.append(title)
+ if description is not None:
+ updates.append("description = ?")
+ params.append(description)
+ if assigned_to is not None:
+ updates.append("assigned_to = ?")
+ params.append(assigned_to)
+ if priority is not None:
+ updates.append("priority = ?")
+ params.append(priority)
+
+ params.append(task_id)
+ conn.execute(
+ f"UPDATE task_queue SET {', '.join(updates)} WHERE id = ?",
+ params,
+ )
+ conn.commit()
+
+ row = conn.execute(
+ "SELECT * FROM task_queue WHERE id = ?", (task_id,)
+ ).fetchone()
+ conn.close()
+ return _row_to_task(row) if row else None
+
+
+def update_task_steps(task_id: str, steps: list) -> bool:
+ """Update the steps array for a running task."""
+ now = datetime.now(timezone.utc).isoformat()
+ conn = _get_conn()
+ cursor = conn.execute(
+ "UPDATE task_queue SET steps = ?, updated_at = ? WHERE id = ?",
+ (json.dumps(steps), now, task_id),
+ )
+ conn.commit()
+ ok = cursor.rowcount > 0
+ conn.close()
+ return ok
+
+
+def get_counts_by_status() -> dict[str, int]:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT status, COUNT(*) as cnt FROM task_queue GROUP BY status"
+ ).fetchall()
+ conn.close()
+ return {r["status"]: r["cnt"] for r in rows}
+
+
+def get_pending_count() -> int:
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT COUNT(*) as cnt FROM task_queue WHERE status = 'pending_approval'"
+ ).fetchone()
+ conn.close()
+ return row["cnt"] if row else 0
+
+
+def get_task_summary_for_briefing() -> dict:
+ """Get task stats for the morning briefing."""
+ counts = get_counts_by_status()
+ conn = _get_conn()
+ # Failed tasks
+ failed = conn.execute(
+ "SELECT title, result FROM task_queue WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 5"
+ ).fetchall()
+ conn.close()
+
+ return {
+ "pending_approval": counts.get("pending_approval", 0),
+ "running": counts.get("running", 0),
+ "completed": counts.get("completed", 0),
+ "failed": counts.get("failed", 0),
+ "vetoed": counts.get("vetoed", 0),
+ "total": sum(counts.values()),
+ "recent_failures": [{"title": r["title"], "result": r["result"]} for r in failed],
+ }
diff --git a/src/work_orders/__init__.py b/src/work_orders/__init__.py
new file mode 100644
index 0000000..ddf43ba
--- /dev/null
+++ b/src/work_orders/__init__.py
@@ -0,0 +1 @@
+"""Work Order system for external and internal task submission."""
diff --git a/src/work_orders/executor.py b/src/work_orders/executor.py
new file mode 100644
index 0000000..8c8fd0b
--- /dev/null
+++ b/src/work_orders/executor.py
@@ -0,0 +1,49 @@
+"""Work order execution — bridges work orders to self-modify and swarm."""
+
+import logging
+
+from work_orders.models import WorkOrder, WorkOrderCategory
+
+logger = logging.getLogger(__name__)
+
+
+class WorkOrderExecutor:
+ """Dispatches approved work orders to the appropriate execution backend."""
+
+ def execute(self, wo: WorkOrder) -> tuple[bool, str]:
+ """Execute a work order.
+
+ Returns:
+ (success, result_message) tuple
+ """
+ if self._is_code_task(wo):
+ return self._execute_via_swarm(wo, code_hint=True)
+ return self._execute_via_swarm(wo)
+
+ def _is_code_task(self, wo: WorkOrder) -> bool:
+ """Check if this work order involves code changes."""
+ code_categories = {WorkOrderCategory.BUG, WorkOrderCategory.OPTIMIZATION}
+ if wo.category in code_categories:
+ return True
+ if wo.related_files:
+ return any(f.endswith(".py") for f in wo.related_files)
+ return False
+
+ def _execute_via_swarm(self, wo: WorkOrder, code_hint: bool = False) -> tuple[bool, str]:
+ """Dispatch as a swarm task for agent bidding."""
+ try:
+ from swarm.coordinator import coordinator
+ prefix = "[Code] " if code_hint else ""
+ description = f"{prefix}[WO-{wo.id[:8]}] {wo.title}"
+ if wo.description:
+ description += f": {wo.description}"
+ task = coordinator.post_task(description)
+ logger.info("Work order %s dispatched as swarm task %s", wo.id[:8], task.id)
+ return True, f"Dispatched as swarm task {task.id}"
+ except Exception as exc:
+ logger.error("Failed to dispatch work order %s: %s", wo.id[:8], exc)
+ return False, str(exc)
+
+
+# Module-level singleton
+work_order_executor = WorkOrderExecutor()
diff --git a/src/work_orders/models.py b/src/work_orders/models.py
new file mode 100644
index 0000000..55c7476
--- /dev/null
+++ b/src/work_orders/models.py
@@ -0,0 +1,286 @@
+"""Database models for Work Order system."""
+
+import json
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+DB_PATH = Path("data/swarm.db")
+
+
+class WorkOrderStatus(str, Enum):
+ SUBMITTED = "submitted"
+ TRIAGED = "triaged"
+ APPROVED = "approved"
+ IN_PROGRESS = "in_progress"
+ COMPLETED = "completed"
+ REJECTED = "rejected"
+
+
+class WorkOrderPriority(str, Enum):
+ CRITICAL = "critical"
+ HIGH = "high"
+ MEDIUM = "medium"
+ LOW = "low"
+
+
+class WorkOrderCategory(str, Enum):
+ BUG = "bug"
+ FEATURE = "feature"
+ IMPROVEMENT = "improvement"
+ OPTIMIZATION = "optimization"
+ SUGGESTION = "suggestion"
+
+
+@dataclass
+class WorkOrder:
+ """A work order / suggestion submitted by a user or agent."""
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ title: str = ""
+ description: str = ""
+ priority: WorkOrderPriority = WorkOrderPriority.MEDIUM
+ category: WorkOrderCategory = WorkOrderCategory.SUGGESTION
+ status: WorkOrderStatus = WorkOrderStatus.SUBMITTED
+ submitter: str = "unknown"
+ submitter_type: str = "user" # user | agent | system
+ estimated_effort: Optional[str] = None # small | medium | large
+ related_files: list[str] = field(default_factory=list)
+ execution_mode: Optional[str] = None # auto | manual
+ swarm_task_id: Optional[str] = None
+ result: Optional[str] = None
+ rejection_reason: Optional[str] = None
+ created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
+ triaged_at: Optional[str] = None
+ approved_at: Optional[str] = None
+ started_at: Optional[str] = None
+ completed_at: Optional[str] = None
+ updated_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
+
+
+def _get_conn() -> sqlite3.Connection:
+ """Get database connection with schema initialized."""
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS work_orders (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ description TEXT NOT NULL DEFAULT '',
+ priority TEXT NOT NULL DEFAULT 'medium',
+ category TEXT NOT NULL DEFAULT 'suggestion',
+ status TEXT NOT NULL DEFAULT 'submitted',
+ submitter TEXT NOT NULL DEFAULT 'unknown',
+ submitter_type TEXT NOT NULL DEFAULT 'user',
+ estimated_effort TEXT,
+ related_files TEXT,
+ execution_mode TEXT,
+ swarm_task_id TEXT,
+ result TEXT,
+ rejection_reason TEXT,
+ created_at TEXT NOT NULL,
+ triaged_at TEXT,
+ approved_at TEXT,
+ started_at TEXT,
+ completed_at TEXT,
+ updated_at TEXT NOT NULL
+ )
+ """
+ )
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_wo_status ON work_orders(status)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_wo_priority ON work_orders(priority)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_wo_submitter ON work_orders(submitter)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_wo_created ON work_orders(created_at)")
+ conn.commit()
+ return conn
+
+
+def _row_to_work_order(row: sqlite3.Row) -> WorkOrder:
+ """Convert a database row to a WorkOrder."""
+ return WorkOrder(
+ id=row["id"],
+ title=row["title"],
+ description=row["description"],
+ priority=WorkOrderPriority(row["priority"]),
+ category=WorkOrderCategory(row["category"]),
+ status=WorkOrderStatus(row["status"]),
+ submitter=row["submitter"],
+ submitter_type=row["submitter_type"],
+ estimated_effort=row["estimated_effort"],
+ related_files=json.loads(row["related_files"]) if row["related_files"] else [],
+ execution_mode=row["execution_mode"],
+ swarm_task_id=row["swarm_task_id"],
+ result=row["result"],
+ rejection_reason=row["rejection_reason"],
+ created_at=row["created_at"],
+ triaged_at=row["triaged_at"],
+ approved_at=row["approved_at"],
+ started_at=row["started_at"],
+ completed_at=row["completed_at"],
+ updated_at=row["updated_at"],
+ )
+
+
+def create_work_order(
+ title: str,
+ description: str = "",
+ priority: str = "medium",
+ category: str = "suggestion",
+ submitter: str = "unknown",
+ submitter_type: str = "user",
+ estimated_effort: Optional[str] = None,
+ related_files: Optional[list[str]] = None,
+) -> WorkOrder:
+ """Create a new work order."""
+ wo = WorkOrder(
+ title=title,
+ description=description,
+ priority=WorkOrderPriority(priority),
+ category=WorkOrderCategory(category),
+ submitter=submitter,
+ submitter_type=submitter_type,
+ estimated_effort=estimated_effort,
+ related_files=related_files or [],
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO work_orders (
+ id, title, description, priority, category, status,
+ submitter, submitter_type, estimated_effort, related_files,
+ created_at, updated_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ wo.id, wo.title, wo.description,
+ wo.priority.value, wo.category.value, wo.status.value,
+ wo.submitter, wo.submitter_type, wo.estimated_effort,
+ json.dumps(wo.related_files) if wo.related_files else None,
+ wo.created_at, wo.updated_at,
+ ),
+ )
+ conn.commit()
+ conn.close()
+ return wo
+
+
+def get_work_order(wo_id: str) -> Optional[WorkOrder]:
+ """Get a work order by ID."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT * FROM work_orders WHERE id = ?", (wo_id,)
+ ).fetchone()
+ conn.close()
+ if not row:
+ return None
+ return _row_to_work_order(row)
+
+
+def list_work_orders(
+ status: Optional[WorkOrderStatus] = None,
+ priority: Optional[WorkOrderPriority] = None,
+ category: Optional[WorkOrderCategory] = None,
+ submitter: Optional[str] = None,
+ limit: int = 100,
+) -> list[WorkOrder]:
+ """List work orders with optional filters."""
+ conn = _get_conn()
+ conditions = []
+ params: list = []
+
+ if status:
+ conditions.append("status = ?")
+ params.append(status.value)
+ if priority:
+ conditions.append("priority = ?")
+ params.append(priority.value)
+ if category:
+ conditions.append("category = ?")
+ params.append(category.value)
+ if submitter:
+ conditions.append("submitter = ?")
+ params.append(submitter)
+
+ where = "WHERE " + " AND ".join(conditions) if conditions else ""
+ rows = conn.execute(
+ f"SELECT * FROM work_orders {where} ORDER BY created_at DESC LIMIT ?",
+ params + [limit],
+ ).fetchall()
+ conn.close()
+ return [_row_to_work_order(r) for r in rows]
+
+
+def update_work_order_status(
+ wo_id: str,
+ new_status: WorkOrderStatus,
+ **kwargs,
+) -> Optional[WorkOrder]:
+ """Update a work order's status and optional fields."""
+ now = datetime.now(timezone.utc).isoformat()
+ sets = ["status = ?", "updated_at = ?"]
+ params: list = [new_status.value, now]
+
+ # Auto-set timestamp fields based on status transition
+ timestamp_map = {
+ WorkOrderStatus.TRIAGED: "triaged_at",
+ WorkOrderStatus.APPROVED: "approved_at",
+ WorkOrderStatus.IN_PROGRESS: "started_at",
+ WorkOrderStatus.COMPLETED: "completed_at",
+ WorkOrderStatus.REJECTED: "completed_at",
+ }
+ ts_field = timestamp_map.get(new_status)
+ if ts_field:
+ sets.append(f"{ts_field} = ?")
+ params.append(now)
+
+ # Apply additional keyword fields
+ allowed_fields = {
+ "execution_mode", "swarm_task_id", "result",
+ "rejection_reason", "estimated_effort",
+ }
+ for key, val in kwargs.items():
+ if key in allowed_fields:
+ sets.append(f"{key} = ?")
+ params.append(val)
+
+ params.append(wo_id)
+ conn = _get_conn()
+ cursor = conn.execute(
+ f"UPDATE work_orders SET {', '.join(sets)} WHERE id = ?",
+ params,
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+
+ if not updated:
+ return None
+ return get_work_order(wo_id)
+
+
+def get_pending_count() -> int:
+ """Get count of submitted/triaged work orders awaiting review."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT COUNT(*) as count FROM work_orders WHERE status IN (?, ?)",
+ (WorkOrderStatus.SUBMITTED.value, WorkOrderStatus.TRIAGED.value),
+ ).fetchone()
+ conn.close()
+ return row["count"]
+
+
+def get_counts_by_status() -> dict[str, int]:
+ """Get work order counts grouped by status."""
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT status, COUNT(*) as count FROM work_orders GROUP BY status"
+ ).fetchall()
+ conn.close()
+ return {r["status"]: r["count"] for r in rows}
diff --git a/src/work_orders/risk.py b/src/work_orders/risk.py
new file mode 100644
index 0000000..7a93996
--- /dev/null
+++ b/src/work_orders/risk.py
@@ -0,0 +1,74 @@
+"""Risk scoring and auto-execution threshold logic for work orders."""
+
+from work_orders.models import WorkOrder, WorkOrderCategory, WorkOrderPriority
+
+
+PRIORITY_WEIGHTS = {
+ WorkOrderPriority.CRITICAL: 4,
+ WorkOrderPriority.HIGH: 3,
+ WorkOrderPriority.MEDIUM: 2,
+ WorkOrderPriority.LOW: 1,
+}
+
+CATEGORY_WEIGHTS = {
+ WorkOrderCategory.BUG: 3,
+ WorkOrderCategory.FEATURE: 3,
+ WorkOrderCategory.IMPROVEMENT: 2,
+ WorkOrderCategory.OPTIMIZATION: 2,
+ WorkOrderCategory.SUGGESTION: 1,
+}
+
+SENSITIVE_PATHS = [
+ "swarm/coordinator",
+ "l402",
+ "lightning/",
+ "config.py",
+ "security",
+ "auth",
+]
+
+
+def compute_risk_score(wo: WorkOrder) -> int:
+ """Compute a risk score for a work order. Higher = riskier.
+
+ Score components:
+ - Priority weight: critical=4, high=3, medium=2, low=1
+ - Category weight: bug/feature=3, improvement/optimization=2, suggestion=1
+ - File sensitivity: +2 per related file in security-sensitive areas
+ """
+ score = PRIORITY_WEIGHTS.get(wo.priority, 2)
+ score += CATEGORY_WEIGHTS.get(wo.category, 1)
+
+ for f in wo.related_files:
+ if any(s in f for s in SENSITIVE_PATHS):
+ score += 2
+
+ return score
+
+
+def should_auto_execute(wo: WorkOrder) -> bool:
+ """Determine if a work order can auto-execute without human approval.
+
+ Checks:
+ 1. Global auto-execute must be enabled
+ 2. Work order priority must be at or below the configured threshold
+ 3. Total risk score must be <= 3
+ """
+ from config import settings
+
+ if not settings.work_orders_auto_execute:
+ return False
+
+ threshold_map = {"none": 0, "low": 1, "medium": 2, "high": 3}
+ max_auto = threshold_map.get(settings.work_orders_auto_threshold, 1)
+
+ priority_values = {
+ WorkOrderPriority.LOW: 1,
+ WorkOrderPriority.MEDIUM: 2,
+ WorkOrderPriority.HIGH: 3,
+ WorkOrderPriority.CRITICAL: 4,
+ }
+ if priority_values.get(wo.priority, 2) > max_auto:
+ return False
+
+ return compute_risk_score(wo) <= 3
diff --git a/tests/test_task_queue.py b/tests/test_task_queue.py
new file mode 100644
index 0000000..2726093
--- /dev/null
+++ b/tests/test_task_queue.py
@@ -0,0 +1,306 @@
+"""Tests for the Task Queue system."""
+
+import json
+import os
+import sqlite3
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+# Set test mode before importing app modules
+os.environ["TIMMY_TEST_MODE"] = "1"
+
+
+# ── Model Tests ──────────────────────────────────────────────────────────
+
+
+def test_create_task():
+ from task_queue.models import create_task, TaskStatus, TaskPriority
+
+ task = create_task(
+ title="Test task",
+ description="A test description",
+ assigned_to="timmy",
+ created_by="user",
+ priority="normal",
+ )
+ assert task.id
+ assert task.title == "Test task"
+ assert task.status == TaskStatus.PENDING_APPROVAL
+ assert task.priority == TaskPriority.NORMAL
+ assert task.assigned_to == "timmy"
+ assert task.created_by == "user"
+
+
+def test_get_task():
+ from task_queue.models import create_task, get_task
+
+ task = create_task(title="Get me", created_by="test")
+ retrieved = get_task(task.id)
+ assert retrieved is not None
+ assert retrieved.title == "Get me"
+
+
+def test_get_task_not_found():
+ from task_queue.models import get_task
+
+ assert get_task("nonexistent-id") is None
+
+
+def test_list_tasks():
+ from task_queue.models import create_task, list_tasks, TaskStatus
+
+ create_task(title="List test 1", created_by="test")
+ create_task(title="List test 2", created_by="test")
+ tasks = list_tasks()
+ assert len(tasks) >= 2
+
+
+def test_list_tasks_with_status_filter():
+ from task_queue.models import (
+ create_task, list_tasks, update_task_status, TaskStatus,
+ )
+
+ task = create_task(title="Filter test", created_by="test")
+ update_task_status(task.id, TaskStatus.APPROVED)
+ approved = list_tasks(status=TaskStatus.APPROVED)
+ assert any(t.id == task.id for t in approved)
+
+
+def test_update_task_status():
+ from task_queue.models import (
+ create_task, update_task_status, TaskStatus,
+ )
+
+ task = create_task(title="Status test", created_by="test")
+ updated = update_task_status(task.id, TaskStatus.APPROVED)
+ assert updated.status == TaskStatus.APPROVED
+
+
+def test_update_task_running_sets_started_at():
+ from task_queue.models import (
+ create_task, update_task_status, TaskStatus,
+ )
+
+ task = create_task(title="Running test", created_by="test")
+ updated = update_task_status(task.id, TaskStatus.RUNNING)
+ assert updated.started_at is not None
+
+
+def test_update_task_completed_sets_completed_at():
+ from task_queue.models import (
+ create_task, update_task_status, TaskStatus,
+ )
+
+ task = create_task(title="Complete test", created_by="test")
+ updated = update_task_status(task.id, TaskStatus.COMPLETED, result="Done!")
+ assert updated.completed_at is not None
+ assert updated.result == "Done!"
+
+
+def test_update_task_fields():
+ from task_queue.models import create_task, update_task
+
+ task = create_task(title="Modify test", created_by="test")
+ updated = update_task(task.id, title="Modified title", priority="high")
+ assert updated.title == "Modified title"
+ assert updated.priority.value == "high"
+
+
+def test_get_counts_by_status():
+ from task_queue.models import create_task, get_counts_by_status
+
+ create_task(title="Count test", created_by="test")
+ counts = get_counts_by_status()
+ assert "pending_approval" in counts
+
+
+def test_get_pending_count():
+ from task_queue.models import create_task, get_pending_count
+
+ create_task(title="Pending count test", created_by="test")
+ count = get_pending_count()
+ assert count >= 1
+
+
+def test_update_task_steps():
+ from task_queue.models import create_task, update_task_steps, get_task
+
+ task = create_task(title="Steps test", created_by="test")
+ steps = [
+ {"description": "Step 1", "status": "completed"},
+ {"description": "Step 2", "status": "running"},
+ ]
+ ok = update_task_steps(task.id, steps)
+ assert ok
+ retrieved = get_task(task.id)
+ assert len(retrieved.steps) == 2
+ assert retrieved.steps[0]["description"] == "Step 1"
+
+
+def test_auto_approve_not_triggered_by_default():
+ from task_queue.models import create_task, TaskStatus
+
+ task = create_task(title="No auto", created_by="user", auto_approve=False)
+ assert task.status == TaskStatus.PENDING_APPROVAL
+
+
+def test_get_task_summary_for_briefing():
+ from task_queue.models import create_task, get_task_summary_for_briefing
+
+ create_task(title="Briefing test", created_by="test")
+ summary = get_task_summary_for_briefing()
+ assert "pending_approval" in summary
+ assert "total" in summary
+
+
+# ── Route Tests ──────────────────────────────────────────────────────────
+
+
+@pytest.fixture
+def client():
+ """FastAPI test client."""
+ from fastapi.testclient import TestClient
+ from dashboard.app import app
+
+ return TestClient(app)
+
+
+def test_tasks_page(client):
+ resp = client.get("/tasks")
+ assert resp.status_code == 200
+ assert "TASK QUEUE" in resp.text
+
+
+def test_api_list_tasks(client):
+ resp = client.get("/api/tasks")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "tasks" in data
+ assert "count" in data
+
+
+def test_api_create_task(client):
+ resp = client.post(
+ "/api/tasks",
+ json={
+ "title": "API created task",
+ "description": "Test via API",
+ "assigned_to": "timmy",
+ "priority": "high",
+ },
+ )
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["task"]["title"] == "API created task"
+ assert data["task"]["status"] == "pending_approval"
+
+
+def test_api_task_counts(client):
+ resp = client.get("/api/tasks/counts")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "pending" in data
+ assert "total" in data
+
+
+def test_form_create_task(client):
+ resp = client.post(
+ "/tasks/create",
+ data={
+ "title": "Form created task",
+ "description": "From form",
+ "assigned_to": "forge",
+ "priority": "normal",
+ },
+ )
+ assert resp.status_code == 200
+ assert "Form created task" in resp.text
+
+
+def test_approve_task_htmx(client):
+ # Create then approve
+ create_resp = client.post(
+ "/api/tasks",
+ json={"title": "To approve", "assigned_to": "timmy"},
+ )
+ task_id = create_resp.json()["task"]["id"]
+
+ resp = client.post(f"/tasks/{task_id}/approve")
+ assert resp.status_code == 200
+ assert "APPROVED" in resp.text.upper() or "approved" in resp.text
+
+
+def test_veto_task_htmx(client):
+ create_resp = client.post(
+ "/api/tasks",
+ json={"title": "To veto", "assigned_to": "timmy"},
+ )
+ task_id = create_resp.json()["task"]["id"]
+
+ resp = client.post(f"/tasks/{task_id}/veto")
+ assert resp.status_code == 200
+ assert "VETOED" in resp.text.upper() or "vetoed" in resp.text
+
+
+def test_modify_task_htmx(client):
+ create_resp = client.post(
+ "/api/tasks",
+ json={"title": "To modify", "assigned_to": "timmy"},
+ )
+ task_id = create_resp.json()["task"]["id"]
+
+ resp = client.post(
+ f"/tasks/{task_id}/modify",
+ data={"title": "Modified via HTMX"},
+ )
+ assert resp.status_code == 200
+ assert "Modified via HTMX" in resp.text
+
+
+def test_cancel_task_htmx(client):
+ create_resp = client.post(
+ "/api/tasks",
+ json={"title": "To cancel", "assigned_to": "timmy"},
+ )
+ task_id = create_resp.json()["task"]["id"]
+
+ resp = client.post(f"/tasks/{task_id}/cancel")
+ assert resp.status_code == 200
+
+
+def test_retry_failed_task(client):
+ from task_queue.models import create_task, update_task_status, TaskStatus
+
+ task = create_task(title="To retry", created_by="test")
+ update_task_status(task.id, TaskStatus.FAILED, result="Something broke")
+
+ resp = client.post(f"/tasks/{task.id}/retry")
+ assert resp.status_code == 200
+
+
+def test_pending_partial(client):
+ resp = client.get("/tasks/pending")
+ assert resp.status_code == 200
+
+
+def test_active_partial(client):
+ resp = client.get("/tasks/active")
+ assert resp.status_code == 200
+
+
+def test_completed_partial(client):
+ resp = client.get("/tasks/completed")
+ assert resp.status_code == 200
+
+
+def test_api_approve_nonexistent(client):
+ resp = client.patch("/api/tasks/nonexistent/approve")
+ assert resp.status_code == 404
+
+
+def test_api_veto_nonexistent(client):
+ resp = client.patch("/api/tasks/nonexistent/veto")
+ assert resp.status_code == 404
diff --git a/tests/test_work_orders.py b/tests/test_work_orders.py
new file mode 100644
index 0000000..1a86552
--- /dev/null
+++ b/tests/test_work_orders.py
@@ -0,0 +1,285 @@
+"""Tests for the work order system."""
+
+from work_orders.models import (
+ WorkOrder,
+ WorkOrderCategory,
+ WorkOrderPriority,
+ WorkOrderStatus,
+ create_work_order,
+ get_counts_by_status,
+ get_pending_count,
+ get_work_order,
+ list_work_orders,
+ update_work_order_status,
+)
+from work_orders.risk import compute_risk_score, should_auto_execute
+
+
+# ── Model CRUD tests ──────────────────────────────────────────────────────────
+
+
+def test_create_work_order():
+ wo = create_work_order(
+ title="Fix the login bug",
+ description="Login fails on mobile",
+ priority="high",
+ category="bug",
+ submitter="comet",
+ )
+ assert wo.id
+ assert wo.title == "Fix the login bug"
+ assert wo.priority == WorkOrderPriority.HIGH
+ assert wo.category == WorkOrderCategory.BUG
+ assert wo.status == WorkOrderStatus.SUBMITTED
+ assert wo.submitter == "comet"
+
+
+def test_get_work_order():
+ wo = create_work_order(title="Test get", submitter="test")
+ fetched = get_work_order(wo.id)
+ assert fetched is not None
+ assert fetched.title == "Test get"
+ assert fetched.submitter == "test"
+
+
+def test_get_work_order_not_found():
+ assert get_work_order("nonexistent-id") is None
+
+
+def test_list_work_orders_no_filter():
+ create_work_order(title="Order A", submitter="a")
+ create_work_order(title="Order B", submitter="b")
+ orders = list_work_orders()
+ assert len(orders) >= 2
+
+
+def test_list_work_orders_by_status():
+ wo = create_work_order(title="Status test")
+ update_work_order_status(wo.id, WorkOrderStatus.APPROVED)
+ approved = list_work_orders(status=WorkOrderStatus.APPROVED)
+ assert any(o.id == wo.id for o in approved)
+
+
+def test_list_work_orders_by_priority():
+ create_work_order(title="Critical item", priority="critical")
+ critical = list_work_orders(priority=WorkOrderPriority.CRITICAL)
+ assert len(critical) >= 1
+ assert all(o.priority == WorkOrderPriority.CRITICAL for o in critical)
+
+
+def test_update_work_order_status():
+ wo = create_work_order(title="Update test")
+ updated = update_work_order_status(wo.id, WorkOrderStatus.APPROVED)
+ assert updated is not None
+ assert updated.status == WorkOrderStatus.APPROVED
+ assert updated.approved_at is not None
+
+
+def test_update_work_order_with_kwargs():
+ wo = create_work_order(title="Kwargs test")
+ updated = update_work_order_status(
+ wo.id, WorkOrderStatus.REJECTED, rejection_reason="Not needed"
+ )
+ assert updated is not None
+ assert updated.rejection_reason == "Not needed"
+
+
+def test_update_nonexistent():
+ result = update_work_order_status("fake-id", WorkOrderStatus.APPROVED)
+ assert result is None
+
+
+def test_get_pending_count():
+ create_work_order(title="Pending 1")
+ create_work_order(title="Pending 2")
+ count = get_pending_count()
+ assert count >= 2
+
+
+def test_get_counts_by_status():
+ create_work_order(title="Count test")
+ counts = get_counts_by_status()
+ assert "submitted" in counts
+ assert counts["submitted"] >= 1
+
+
+def test_related_files_roundtrip():
+ wo = create_work_order(
+ title="Files test",
+ related_files=["src/config.py", "src/timmy/agent.py"],
+ )
+ fetched = get_work_order(wo.id)
+ assert fetched.related_files == ["src/config.py", "src/timmy/agent.py"]
+
+
+# ── Risk scoring tests ────────────────────────────────────────────────────────
+
+
+def test_risk_score_low_suggestion():
+ wo = WorkOrder(
+ priority=WorkOrderPriority.LOW,
+ category=WorkOrderCategory.SUGGESTION,
+ )
+ score = compute_risk_score(wo)
+ assert score == 2 # 1 (low) + 1 (suggestion)
+
+
+def test_risk_score_critical_bug():
+ wo = WorkOrder(
+ priority=WorkOrderPriority.CRITICAL,
+ category=WorkOrderCategory.BUG,
+ )
+ score = compute_risk_score(wo)
+ assert score == 7 # 4 (critical) + 3 (bug)
+
+
+def test_risk_score_sensitive_files():
+ wo = WorkOrder(
+ priority=WorkOrderPriority.LOW,
+ category=WorkOrderCategory.SUGGESTION,
+ related_files=["src/swarm/coordinator.py"],
+ )
+ score = compute_risk_score(wo)
+ assert score == 4 # 1 + 1 + 2 (sensitive)
+
+
+def test_should_auto_execute_disabled(monkeypatch):
+ monkeypatch.setattr("config.settings.work_orders_auto_execute", False)
+ wo = WorkOrder(
+ priority=WorkOrderPriority.LOW,
+ category=WorkOrderCategory.SUGGESTION,
+ )
+ assert should_auto_execute(wo) is False
+
+
+def test_should_auto_execute_low_risk(monkeypatch):
+ monkeypatch.setattr("config.settings.work_orders_auto_execute", True)
+ monkeypatch.setattr("config.settings.work_orders_auto_threshold", "low")
+ wo = WorkOrder(
+ priority=WorkOrderPriority.LOW,
+ category=WorkOrderCategory.SUGGESTION,
+ )
+ assert should_auto_execute(wo) is True
+
+
+def test_should_auto_execute_high_priority_blocked(monkeypatch):
+ monkeypatch.setattr("config.settings.work_orders_auto_execute", True)
+ monkeypatch.setattr("config.settings.work_orders_auto_threshold", "low")
+ wo = WorkOrder(
+ priority=WorkOrderPriority.HIGH,
+ category=WorkOrderCategory.BUG,
+ )
+ assert should_auto_execute(wo) is False
+
+
+# ── Route tests ───────────────────────────────────────────────────────────────
+
+
+def test_submit_work_order(client):
+ resp = client.post(
+ "/work-orders/submit",
+ data={
+ "title": "Test submission",
+ "description": "Testing the API",
+ "priority": "low",
+ "category": "suggestion",
+ "submitter": "test-agent",
+ "submitter_type": "agent",
+ "related_files": "",
+ },
+ )
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["work_order_id"]
+ assert data["execution_mode"] in ("auto", "manual")
+
+
+def test_submit_json(client):
+ resp = client.post(
+ "/work-orders/submit/json",
+ json={
+ "title": "JSON test",
+ "description": "Testing JSON API",
+ "priority": "medium",
+ "category": "improvement",
+ "submitter": "comet",
+ },
+ )
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+
+
+def test_list_work_orders_route(client):
+ client.post(
+ "/work-orders/submit",
+ data={"title": "List test", "submitter": "test"},
+ )
+ resp = client.get("/work-orders")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "work_orders" in data
+ assert data["count"] >= 1
+
+
+def test_get_work_order_route(client):
+ submit = client.post(
+ "/work-orders/submit",
+ data={"title": "Get test", "submitter": "test"},
+ )
+ wo_id = submit.json()["work_order_id"]
+ resp = client.get(f"/work-orders/{wo_id}")
+ assert resp.status_code == 200
+ assert resp.json()["title"] == "Get test"
+
+
+def test_get_work_order_not_found_route(client):
+ resp = client.get("/work-orders/nonexistent-id")
+ assert resp.status_code == 404
+
+
+def test_approve_work_order(client):
+ submit = client.post(
+ "/work-orders/submit",
+ data={"title": "Approve test", "submitter": "test"},
+ )
+ wo_id = submit.json()["work_order_id"]
+ resp = client.post(f"/work-orders/{wo_id}/approve")
+ assert resp.status_code == 200
+
+
+def test_reject_work_order(client):
+ submit = client.post(
+ "/work-orders/submit",
+ data={"title": "Reject test", "submitter": "test"},
+ )
+ wo_id = submit.json()["work_order_id"]
+ resp = client.post(
+ f"/work-orders/{wo_id}/reject",
+ data={"reason": "Not needed"},
+ )
+ assert resp.status_code == 200
+
+
+def test_work_order_counts(client):
+ client.post(
+ "/work-orders/submit",
+ data={"title": "Count test", "submitter": "test"},
+ )
+ resp = client.get("/work-orders/api/counts")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "pending" in data
+ assert "total" in data
+
+
+def test_work_order_queue_page(client):
+ resp = client.get("/work-orders/queue")
+ assert resp.status_code == 200
+ assert b"WORK ORDERS" in resp.content
+
+
+def test_work_order_pending_partial(client):
+ resp = client.get("/work-orders/queue/pending")
+ assert resp.status_code == 200