Co-authored-by: Claude (Opus 4.6) <claude@hermes.local> Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
315 lines
10 KiB
Python
315 lines
10 KiB
Python
"""DistributedWorker — task lifecycle management and backend routing.
|
|
|
|
Routes delegated tasks to appropriate execution backends:
|
|
|
|
- agentic_loop: local multi-step execution via Timmy's agentic loop
|
|
- kimi: heavy research tasks dispatched via Gitea kimi-ready issues
|
|
- paperclip: task submission to the Paperclip API
|
|
|
|
Task lifecycle: queued → running → completed | failed
|
|
|
|
Failure handling: auto-retry up to MAX_RETRIES, then mark failed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import UTC, datetime
|
|
from typing import Any, ClassVar
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_RETRIES = 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Task record
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class DelegatedTask:
|
|
"""Record of one delegated task and its execution state."""
|
|
|
|
task_id: str
|
|
agent_name: str
|
|
agent_role: str
|
|
task_description: str
|
|
priority: str
|
|
backend: str # "agentic_loop" | "kimi" | "paperclip"
|
|
status: str = "queued" # queued | running | completed | failed
|
|
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
|
|
result: dict[str, Any] | None = None
|
|
error: str | None = None
|
|
retries: int = 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Worker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class DistributedWorker:
|
|
"""Routes and tracks delegated task execution across multiple backends.
|
|
|
|
All methods are class-methods; DistributedWorker is a singleton-style
|
|
service — no instantiation needed.
|
|
|
|
Usage::
|
|
|
|
from brain.worker import DistributedWorker
|
|
|
|
task_id = DistributedWorker.submit("researcher", "research", "summarise X")
|
|
status = DistributedWorker.get_status(task_id)
|
|
"""
|
|
|
|
_tasks: ClassVar[dict[str, DelegatedTask]] = {}
|
|
_lock: ClassVar[threading.Lock] = threading.Lock()
|
|
|
|
@classmethod
|
|
def submit(
|
|
cls,
|
|
agent_name: str,
|
|
agent_role: str,
|
|
task_description: str,
|
|
priority: str = "normal",
|
|
) -> str:
|
|
"""Submit a task for execution. Returns task_id immediately.
|
|
|
|
The task is registered as 'queued' and a daemon thread begins
|
|
execution in the background. Use get_status(task_id) to poll.
|
|
"""
|
|
task_id = uuid.uuid4().hex[:8]
|
|
backend = cls._select_backend(agent_role, task_description)
|
|
|
|
record = DelegatedTask(
|
|
task_id=task_id,
|
|
agent_name=agent_name,
|
|
agent_role=agent_role,
|
|
task_description=task_description,
|
|
priority=priority,
|
|
backend=backend,
|
|
)
|
|
|
|
with cls._lock:
|
|
cls._tasks[task_id] = record
|
|
|
|
thread = threading.Thread(
|
|
target=cls._run_task,
|
|
args=(record,),
|
|
daemon=True,
|
|
name=f"worker-{task_id}",
|
|
)
|
|
thread.start()
|
|
|
|
logger.info(
|
|
"Task %s queued: %s → %.60s (backend=%s, priority=%s)",
|
|
task_id,
|
|
agent_name,
|
|
task_description,
|
|
backend,
|
|
priority,
|
|
)
|
|
return task_id
|
|
|
|
@classmethod
|
|
def get_status(cls, task_id: str) -> dict[str, Any]:
|
|
"""Return current status of a task by ID."""
|
|
record = cls._tasks.get(task_id)
|
|
if record is None:
|
|
return {"found": False, "task_id": task_id}
|
|
return {
|
|
"found": True,
|
|
"task_id": record.task_id,
|
|
"agent": record.agent_name,
|
|
"role": record.agent_role,
|
|
"status": record.status,
|
|
"backend": record.backend,
|
|
"priority": record.priority,
|
|
"created_at": record.created_at,
|
|
"retries": record.retries,
|
|
"result": record.result,
|
|
"error": record.error,
|
|
}
|
|
|
|
@classmethod
|
|
def list_tasks(cls) -> list[dict[str, Any]]:
|
|
"""Return a summary list of all tracked tasks."""
|
|
with cls._lock:
|
|
return [
|
|
{
|
|
"task_id": t.task_id,
|
|
"agent": t.agent_name,
|
|
"status": t.status,
|
|
"backend": t.backend,
|
|
"created_at": t.created_at,
|
|
}
|
|
for t in cls._tasks.values()
|
|
]
|
|
|
|
@classmethod
|
|
def clear(cls) -> None:
|
|
"""Clear the task registry (for tests)."""
|
|
with cls._lock:
|
|
cls._tasks.clear()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Backend selection
|
|
# ------------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _select_backend(cls, agent_role: str, task_description: str) -> str:
|
|
"""Choose the execution backend for a given agent role and task.
|
|
|
|
Priority:
|
|
1. kimi — research role + Gitea enabled + task exceeds local capacity
|
|
2. paperclip — paperclip API key is configured
|
|
3. agentic_loop — local fallback (always available)
|
|
"""
|
|
try:
|
|
from config import settings
|
|
from timmy.kimi_delegation import exceeds_local_capacity
|
|
|
|
if (
|
|
agent_role == "research"
|
|
and getattr(settings, "gitea_enabled", False)
|
|
and getattr(settings, "gitea_token", "")
|
|
and exceeds_local_capacity(task_description)
|
|
):
|
|
return "kimi"
|
|
|
|
if getattr(settings, "paperclip_api_key", ""):
|
|
return "paperclip"
|
|
|
|
except Exception as exc:
|
|
logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc)
|
|
|
|
return "agentic_loop"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Task execution
|
|
# ------------------------------------------------------------------
|
|
|
|
@classmethod
|
|
def _run_task(cls, record: DelegatedTask) -> None:
|
|
"""Execute a task with retry logic. Runs inside a daemon thread."""
|
|
record.status = "running"
|
|
|
|
for attempt in range(MAX_RETRIES + 1):
|
|
try:
|
|
if attempt > 0:
|
|
logger.info(
|
|
"Retrying task %s (attempt %d/%d)",
|
|
record.task_id,
|
|
attempt + 1,
|
|
MAX_RETRIES + 1,
|
|
)
|
|
record.retries = attempt
|
|
|
|
result = cls._dispatch(record)
|
|
record.status = "completed"
|
|
record.result = result
|
|
logger.info(
|
|
"Task %s completed via %s",
|
|
record.task_id,
|
|
record.backend,
|
|
)
|
|
return
|
|
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Task %s attempt %d failed: %s",
|
|
record.task_id,
|
|
attempt + 1,
|
|
exc,
|
|
)
|
|
if attempt == MAX_RETRIES:
|
|
record.status = "failed"
|
|
record.error = str(exc)
|
|
logger.error(
|
|
"Task %s exhausted %d retries. Final error: %s",
|
|
record.task_id,
|
|
MAX_RETRIES,
|
|
exc,
|
|
)
|
|
|
|
@classmethod
|
|
def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]:
|
|
"""Route to the selected backend. Raises on failure."""
|
|
if record.backend == "kimi":
|
|
return asyncio.run(cls._execute_kimi(record))
|
|
if record.backend == "paperclip":
|
|
return asyncio.run(cls._execute_paperclip(record))
|
|
return asyncio.run(cls._execute_agentic_loop(record))
|
|
|
|
@classmethod
|
|
async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]:
|
|
"""Create a kimi-ready Gitea issue for the task.
|
|
|
|
Kimi picks up the issue via the kimi-ready label and executes it.
|
|
"""
|
|
from timmy.kimi_delegation import create_kimi_research_issue
|
|
|
|
result = await create_kimi_research_issue(
|
|
task=record.task_description[:120],
|
|
context=f"Delegated by agent '{record.agent_name}' via delegate_task.",
|
|
question=record.task_description,
|
|
priority=record.priority,
|
|
)
|
|
if not result.get("success"):
|
|
raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}")
|
|
return result
|
|
|
|
@classmethod
|
|
async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]:
|
|
"""Submit the task to the Paperclip API."""
|
|
import httpx
|
|
|
|
from timmy.paperclip import PaperclipClient
|
|
|
|
client = PaperclipClient()
|
|
async with httpx.AsyncClient(timeout=client.timeout) as http:
|
|
resp = await http.post(
|
|
f"{client.base_url}/api/tasks",
|
|
headers={"Authorization": f"Bearer {client.api_key}"},
|
|
json={
|
|
"kind": record.agent_role,
|
|
"agent_id": client.agent_id,
|
|
"company_id": client.company_id,
|
|
"priority": record.priority,
|
|
"context": {"task": record.task_description},
|
|
},
|
|
)
|
|
|
|
if resp.status_code in (200, 201):
|
|
data = resp.json()
|
|
logger.info(
|
|
"Task %s submitted to Paperclip (paperclip_id=%s)",
|
|
record.task_id,
|
|
data.get("id"),
|
|
)
|
|
return {
|
|
"success": True,
|
|
"paperclip_task_id": data.get("id"),
|
|
"backend": "paperclip",
|
|
}
|
|
raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}")
|
|
|
|
@classmethod
|
|
async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]:
|
|
"""Execute the task via Timmy's local agentic loop."""
|
|
from timmy.agentic_loop import run_agentic_loop
|
|
|
|
result = await run_agentic_loop(record.task_description)
|
|
return {
|
|
"success": result.status != "failed",
|
|
"agentic_task_id": result.task_id,
|
|
"summary": result.summary,
|
|
"status": result.status,
|
|
"backend": "agentic_loop",
|
|
}
|