"""DistributedWorker — task lifecycle management and backend routing. Routes delegated tasks to appropriate execution backends: - agentic_loop: local multi-step execution via Timmy's agentic loop - kimi: heavy research tasks dispatched via Gitea kimi-ready issues - paperclip: task submission to the Paperclip API Task lifecycle: queued → running → completed | failed Failure handling: auto-retry up to MAX_RETRIES, then mark failed. """ from __future__ import annotations import asyncio import logging import threading import uuid from dataclasses import dataclass, field from datetime import UTC, datetime from typing import Any, ClassVar logger = logging.getLogger(__name__) MAX_RETRIES = 2 # --------------------------------------------------------------------------- # Task record # --------------------------------------------------------------------------- @dataclass class DelegatedTask: """Record of one delegated task and its execution state.""" task_id: str agent_name: str agent_role: str task_description: str priority: str backend: str # "agentic_loop" | "kimi" | "paperclip" status: str = "queued" # queued | running | completed | failed created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) result: dict[str, Any] | None = None error: str | None = None retries: int = 0 # --------------------------------------------------------------------------- # Worker # --------------------------------------------------------------------------- class DistributedWorker: """Routes and tracks delegated task execution across multiple backends. All methods are class-methods; DistributedWorker is a singleton-style service — no instantiation needed. Usage:: from brain.worker import DistributedWorker task_id = DistributedWorker.submit("researcher", "research", "summarise X") status = DistributedWorker.get_status(task_id) """ _tasks: ClassVar[dict[str, DelegatedTask]] = {} _lock: ClassVar[threading.Lock] = threading.Lock() @classmethod def submit( cls, agent_name: str, agent_role: str, task_description: str, priority: str = "normal", ) -> str: """Submit a task for execution. Returns task_id immediately. The task is registered as 'queued' and a daemon thread begins execution in the background. Use get_status(task_id) to poll. """ task_id = uuid.uuid4().hex[:8] backend = cls._select_backend(agent_role, task_description) record = DelegatedTask( task_id=task_id, agent_name=agent_name, agent_role=agent_role, task_description=task_description, priority=priority, backend=backend, ) with cls._lock: cls._tasks[task_id] = record thread = threading.Thread( target=cls._run_task, args=(record,), daemon=True, name=f"worker-{task_id}", ) thread.start() logger.info( "Task %s queued: %s → %.60s (backend=%s, priority=%s)", task_id, agent_name, task_description, backend, priority, ) return task_id @classmethod def get_status(cls, task_id: str) -> dict[str, Any]: """Return current status of a task by ID.""" record = cls._tasks.get(task_id) if record is None: return {"found": False, "task_id": task_id} return { "found": True, "task_id": record.task_id, "agent": record.agent_name, "role": record.agent_role, "status": record.status, "backend": record.backend, "priority": record.priority, "created_at": record.created_at, "retries": record.retries, "result": record.result, "error": record.error, } @classmethod def list_tasks(cls) -> list[dict[str, Any]]: """Return a summary list of all tracked tasks.""" with cls._lock: return [ { "task_id": t.task_id, "agent": t.agent_name, "status": t.status, "backend": t.backend, "created_at": t.created_at, } for t in cls._tasks.values() ] @classmethod def clear(cls) -> None: """Clear the task registry (for tests).""" with cls._lock: cls._tasks.clear() # ------------------------------------------------------------------ # Backend selection # ------------------------------------------------------------------ @classmethod def _select_backend(cls, agent_role: str, task_description: str) -> str: """Choose the execution backend for a given agent role and task. Priority: 1. kimi — research role + Gitea enabled + task exceeds local capacity 2. paperclip — paperclip API key is configured 3. agentic_loop — local fallback (always available) """ try: from config import settings from timmy.kimi_delegation import exceeds_local_capacity if ( agent_role == "research" and getattr(settings, "gitea_enabled", False) and getattr(settings, "gitea_token", "") and exceeds_local_capacity(task_description) ): return "kimi" if getattr(settings, "paperclip_api_key", ""): return "paperclip" except Exception as exc: logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc) return "agentic_loop" # ------------------------------------------------------------------ # Task execution # ------------------------------------------------------------------ @classmethod def _run_task(cls, record: DelegatedTask) -> None: """Execute a task with retry logic. Runs inside a daemon thread.""" record.status = "running" for attempt in range(MAX_RETRIES + 1): try: if attempt > 0: logger.info( "Retrying task %s (attempt %d/%d)", record.task_id, attempt + 1, MAX_RETRIES + 1, ) record.retries = attempt result = cls._dispatch(record) record.status = "completed" record.result = result logger.info( "Task %s completed via %s", record.task_id, record.backend, ) return except Exception as exc: logger.warning( "Task %s attempt %d failed: %s", record.task_id, attempt + 1, exc, ) if attempt == MAX_RETRIES: record.status = "failed" record.error = str(exc) logger.error( "Task %s exhausted %d retries. Final error: %s", record.task_id, MAX_RETRIES, exc, ) @classmethod def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]: """Route to the selected backend. Raises on failure.""" if record.backend == "kimi": return asyncio.run(cls._execute_kimi(record)) if record.backend == "paperclip": return asyncio.run(cls._execute_paperclip(record)) return asyncio.run(cls._execute_agentic_loop(record)) @classmethod async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]: """Create a kimi-ready Gitea issue for the task. Kimi picks up the issue via the kimi-ready label and executes it. """ from timmy.kimi_delegation import create_kimi_research_issue result = await create_kimi_research_issue( task=record.task_description[:120], context=f"Delegated by agent '{record.agent_name}' via delegate_task.", question=record.task_description, priority=record.priority, ) if not result.get("success"): raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}") return result @classmethod async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]: """Submit the task to the Paperclip API.""" import httpx from timmy.paperclip import PaperclipClient client = PaperclipClient() async with httpx.AsyncClient(timeout=client.timeout) as http: resp = await http.post( f"{client.base_url}/api/tasks", headers={"Authorization": f"Bearer {client.api_key}"}, json={ "kind": record.agent_role, "agent_id": client.agent_id, "company_id": client.company_id, "priority": record.priority, "context": {"task": record.task_description}, }, ) if resp.status_code in (200, 201): data = resp.json() logger.info( "Task %s submitted to Paperclip (paperclip_id=%s)", record.task_id, data.get("id"), ) return { "success": True, "paperclip_task_id": data.get("id"), "backend": "paperclip", } raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}") @classmethod async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]: """Execute the task via Timmy's local agentic loop.""" from timmy.agentic_loop import run_agentic_loop result = await run_agentic_loop(record.task_description) return { "success": result.status != "failed", "agentic_task_id": result.task_id, "summary": result.summary, "status": result.status, "backend": "agentic_loop", }