1
0

[claude] Wire delegate_task to DistributedWorker for actual execution (#985) (#1273)

Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
This commit is contained in:
2026-03-24 01:47:09 +00:00
committed by Timmy Time
parent 55beaf241f
commit 75ecfaba64
5 changed files with 577 additions and 5 deletions

View File

@@ -15,6 +15,7 @@ packages = [
{ include = "config.py", from = "src" },
{ include = "bannerlord", from = "src" },
{ include = "brain", from = "src" },
{ include = "dashboard", from = "src" },
{ include = "infrastructure", from = "src" },
{ include = "integrations", from = "src" },

1
src/brain/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Brain — identity system and task coordination."""

314
src/brain/worker.py Normal file
View File

@@ -0,0 +1,314 @@
"""DistributedWorker — task lifecycle management and backend routing.
Routes delegated tasks to appropriate execution backends:
- agentic_loop: local multi-step execution via Timmy's agentic loop
- kimi: heavy research tasks dispatched via Gitea kimi-ready issues
- paperclip: task submission to the Paperclip API
Task lifecycle: queued → running → completed | failed
Failure handling: auto-retry up to MAX_RETRIES, then mark failed.
"""
from __future__ import annotations
import asyncio
import logging
import threading
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any, ClassVar
logger = logging.getLogger(__name__)
MAX_RETRIES = 2
# ---------------------------------------------------------------------------
# Task record
# ---------------------------------------------------------------------------
@dataclass
class DelegatedTask:
"""Record of one delegated task and its execution state."""
task_id: str
agent_name: str
agent_role: str
task_description: str
priority: str
backend: str # "agentic_loop" | "kimi" | "paperclip"
status: str = "queued" # queued | running | completed | failed
created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
result: dict[str, Any] | None = None
error: str | None = None
retries: int = 0
# ---------------------------------------------------------------------------
# Worker
# ---------------------------------------------------------------------------
class DistributedWorker:
"""Routes and tracks delegated task execution across multiple backends.
All methods are class-methods; DistributedWorker is a singleton-style
service — no instantiation needed.
Usage::
from brain.worker import DistributedWorker
task_id = DistributedWorker.submit("researcher", "research", "summarise X")
status = DistributedWorker.get_status(task_id)
"""
_tasks: ClassVar[dict[str, DelegatedTask]] = {}
_lock: ClassVar[threading.Lock] = threading.Lock()
@classmethod
def submit(
cls,
agent_name: str,
agent_role: str,
task_description: str,
priority: str = "normal",
) -> str:
"""Submit a task for execution. Returns task_id immediately.
The task is registered as 'queued' and a daemon thread begins
execution in the background. Use get_status(task_id) to poll.
"""
task_id = uuid.uuid4().hex[:8]
backend = cls._select_backend(agent_role, task_description)
record = DelegatedTask(
task_id=task_id,
agent_name=agent_name,
agent_role=agent_role,
task_description=task_description,
priority=priority,
backend=backend,
)
with cls._lock:
cls._tasks[task_id] = record
thread = threading.Thread(
target=cls._run_task,
args=(record,),
daemon=True,
name=f"worker-{task_id}",
)
thread.start()
logger.info(
"Task %s queued: %s%.60s (backend=%s, priority=%s)",
task_id,
agent_name,
task_description,
backend,
priority,
)
return task_id
@classmethod
def get_status(cls, task_id: str) -> dict[str, Any]:
"""Return current status of a task by ID."""
record = cls._tasks.get(task_id)
if record is None:
return {"found": False, "task_id": task_id}
return {
"found": True,
"task_id": record.task_id,
"agent": record.agent_name,
"role": record.agent_role,
"status": record.status,
"backend": record.backend,
"priority": record.priority,
"created_at": record.created_at,
"retries": record.retries,
"result": record.result,
"error": record.error,
}
@classmethod
def list_tasks(cls) -> list[dict[str, Any]]:
"""Return a summary list of all tracked tasks."""
with cls._lock:
return [
{
"task_id": t.task_id,
"agent": t.agent_name,
"status": t.status,
"backend": t.backend,
"created_at": t.created_at,
}
for t in cls._tasks.values()
]
@classmethod
def clear(cls) -> None:
"""Clear the task registry (for tests)."""
with cls._lock:
cls._tasks.clear()
# ------------------------------------------------------------------
# Backend selection
# ------------------------------------------------------------------
@classmethod
def _select_backend(cls, agent_role: str, task_description: str) -> str:
"""Choose the execution backend for a given agent role and task.
Priority:
1. kimi — research role + Gitea enabled + task exceeds local capacity
2. paperclip — paperclip API key is configured
3. agentic_loop — local fallback (always available)
"""
try:
from config import settings
from timmy.kimi_delegation import exceeds_local_capacity
if (
agent_role == "research"
and getattr(settings, "gitea_enabled", False)
and getattr(settings, "gitea_token", "")
and exceeds_local_capacity(task_description)
):
return "kimi"
if getattr(settings, "paperclip_api_key", ""):
return "paperclip"
except Exception as exc:
logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc)
return "agentic_loop"
# ------------------------------------------------------------------
# Task execution
# ------------------------------------------------------------------
@classmethod
def _run_task(cls, record: DelegatedTask) -> None:
"""Execute a task with retry logic. Runs inside a daemon thread."""
record.status = "running"
for attempt in range(MAX_RETRIES + 1):
try:
if attempt > 0:
logger.info(
"Retrying task %s (attempt %d/%d)",
record.task_id,
attempt + 1,
MAX_RETRIES + 1,
)
record.retries = attempt
result = cls._dispatch(record)
record.status = "completed"
record.result = result
logger.info(
"Task %s completed via %s",
record.task_id,
record.backend,
)
return
except Exception as exc:
logger.warning(
"Task %s attempt %d failed: %s",
record.task_id,
attempt + 1,
exc,
)
if attempt == MAX_RETRIES:
record.status = "failed"
record.error = str(exc)
logger.error(
"Task %s exhausted %d retries. Final error: %s",
record.task_id,
MAX_RETRIES,
exc,
)
@classmethod
def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]:
"""Route to the selected backend. Raises on failure."""
if record.backend == "kimi":
return asyncio.run(cls._execute_kimi(record))
if record.backend == "paperclip":
return asyncio.run(cls._execute_paperclip(record))
return asyncio.run(cls._execute_agentic_loop(record))
@classmethod
async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]:
"""Create a kimi-ready Gitea issue for the task.
Kimi picks up the issue via the kimi-ready label and executes it.
"""
from timmy.kimi_delegation import create_kimi_research_issue
result = await create_kimi_research_issue(
task=record.task_description[:120],
context=f"Delegated by agent '{record.agent_name}' via delegate_task.",
question=record.task_description,
priority=record.priority,
)
if not result.get("success"):
raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}")
return result
@classmethod
async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]:
"""Submit the task to the Paperclip API."""
import httpx
from timmy.paperclip import PaperclipClient
client = PaperclipClient()
async with httpx.AsyncClient(timeout=client.timeout) as http:
resp = await http.post(
f"{client.base_url}/api/tasks",
headers={"Authorization": f"Bearer {client.api_key}"},
json={
"kind": record.agent_role,
"agent_id": client.agent_id,
"company_id": client.company_id,
"priority": record.priority,
"context": {"task": record.task_description},
},
)
if resp.status_code in (200, 201):
data = resp.json()
logger.info(
"Task %s submitted to Paperclip (paperclip_id=%s)",
record.task_id,
data.get("id"),
)
return {
"success": True,
"paperclip_task_id": data.get("id"),
"backend": "paperclip",
}
raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}")
@classmethod
async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]:
"""Execute the task via Timmy's local agentic loop."""
from timmy.agentic_loop import run_agentic_loop
result = await run_agentic_loop(record.task_description)
return {
"success": result.status != "failed",
"agentic_task_id": result.task_id,
"summary": result.summary,
"status": result.status,
"backend": "agentic_loop",
}

View File

@@ -41,17 +41,38 @@ def delegate_task(
if priority not in valid_priorities:
priority = "normal"
agent_role = available[agent_name]
# Wire to DistributedWorker for actual execution
task_id: str | None = None
status = "queued"
try:
from brain.worker import DistributedWorker
task_id = DistributedWorker.submit(agent_name, agent_role, task_description, priority)
except Exception as exc:
logger.warning("DistributedWorker unavailable — task noted only: %s", exc)
status = "noted"
logger.info(
"Delegation intent: %s%s (priority=%s)", agent_name, task_description[:80], priority
"Delegated task %s: %s%s (priority=%s, status=%s)",
task_id or "?",
agent_name,
task_description[:80],
priority,
status,
)
return {
"success": True,
"task_id": None,
"task_id": task_id,
"agent": agent_name,
"role": available[agent_name],
"status": "noted",
"message": f"Delegation to {agent_name} ({available[agent_name]}): {task_description[:100]}",
"role": agent_role,
"status": status,
"message": (
f"Task {task_id or 'noted'}: delegated to {agent_name} ({agent_role}): "
f"{task_description[:100]}"
),
}

View File

@@ -0,0 +1,235 @@
"""Unit tests for brain.worker.DistributedWorker."""
from __future__ import annotations
import threading
from unittest.mock import MagicMock, patch
import pytest
from brain.worker import MAX_RETRIES, DelegatedTask, DistributedWorker
@pytest.fixture(autouse=True)
def clear_task_registry():
"""Reset the worker registry before each test."""
DistributedWorker.clear()
yield
DistributedWorker.clear()
class TestSubmit:
def test_returns_task_id(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("researcher", "research", "find something")
assert isinstance(task_id, str)
assert len(task_id) == 8
def test_task_registered_as_queued(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("coder", "code", "fix the bug")
status = DistributedWorker.get_status(task_id)
assert status["found"] is True
assert status["task_id"] == task_id
assert status["agent"] == "coder"
def test_unique_task_ids(self):
with patch.object(DistributedWorker, "_run_task"):
ids = [DistributedWorker.submit("coder", "code", "task") for _ in range(10)]
assert len(set(ids)) == 10
def test_starts_daemon_thread(self):
event = threading.Event()
def fake_run_task(record):
event.set()
with patch.object(DistributedWorker, "_run_task", side_effect=fake_run_task):
DistributedWorker.submit("coder", "code", "something")
assert event.wait(timeout=2), "Background thread did not start"
def test_priority_stored(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("coder", "code", "task", priority="high")
status = DistributedWorker.get_status(task_id)
assert status["priority"] == "high"
class TestGetStatus:
def test_unknown_task_id(self):
result = DistributedWorker.get_status("deadbeef")
assert result["found"] is False
assert result["task_id"] == "deadbeef"
def test_known_task_has_all_fields(self):
with patch.object(DistributedWorker, "_run_task"):
task_id = DistributedWorker.submit("writer", "writing", "write a blog post")
status = DistributedWorker.get_status(task_id)
for key in ("found", "task_id", "agent", "role", "status", "backend", "created_at"):
assert key in status, f"Missing key: {key}"
class TestListTasks:
def test_empty_initially(self):
assert DistributedWorker.list_tasks() == []
def test_returns_registered_tasks(self):
with patch.object(DistributedWorker, "_run_task"):
DistributedWorker.submit("coder", "code", "task A")
DistributedWorker.submit("writer", "writing", "task B")
tasks = DistributedWorker.list_tasks()
assert len(tasks) == 2
agents = {t["agent"] for t in tasks}
assert agents == {"coder", "writer"}
class TestSelectBackend:
def test_defaults_to_agentic_loop(self):
with patch("brain.worker.logger"):
backend = DistributedWorker._select_backend("code", "fix the bug")
assert backend == "agentic_loop"
def test_kimi_for_heavy_research_with_gitea(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.paperclip_api_key = ""
with (
patch("timmy.kimi_delegation.exceeds_local_capacity", return_value=True),
patch("config.settings", mock_settings),
):
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
assert backend == "kimi"
def test_agentic_loop_when_no_gitea(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
mock_settings.paperclip_api_key = ""
with patch("config.settings", mock_settings):
backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10)
assert backend == "agentic_loop"
def test_paperclip_when_api_key_configured(self):
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
mock_settings.paperclip_api_key = "pk_test_123"
with patch("config.settings", mock_settings):
backend = DistributedWorker._select_backend("code", "build a widget")
assert backend == "paperclip"
class TestRunTask:
def test_marks_completed_on_success(self):
record = DelegatedTask(
task_id="abc12345",
agent_name="coder",
agent_role="code",
task_description="fix bug",
priority="normal",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", return_value={"success": True}):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert record.result == {"success": True}
assert record.error is None
def test_marks_failed_after_exhausting_retries(self):
record = DelegatedTask(
task_id="fail1234",
agent_name="coder",
agent_role="code",
task_description="broken task",
priority="normal",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", side_effect=RuntimeError("boom")):
DistributedWorker._run_task(record)
assert record.status == "failed"
assert "boom" in record.error
assert record.retries == MAX_RETRIES
def test_retries_before_failing(self):
record = DelegatedTask(
task_id="retry001",
agent_name="coder",
agent_role="code",
task_description="flaky task",
priority="normal",
backend="agentic_loop",
)
call_count = 0
def flaky_dispatch(r):
nonlocal call_count
call_count += 1
if call_count < MAX_RETRIES + 1:
raise RuntimeError("transient failure")
return {"success": True}
with patch.object(DistributedWorker, "_dispatch", side_effect=flaky_dispatch):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert call_count == MAX_RETRIES + 1
def test_succeeds_on_first_attempt(self):
record = DelegatedTask(
task_id="ok000001",
agent_name="writer",
agent_role="writing",
task_description="write summary",
priority="low",
backend="agentic_loop",
)
with patch.object(DistributedWorker, "_dispatch", return_value={"summary": "done"}):
DistributedWorker._run_task(record)
assert record.status == "completed"
assert record.retries == 0
class TestDelegatetaskIntegration:
"""Integration: delegate_task should wire to DistributedWorker."""
def test_delegate_task_returns_task_id(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("researcher", "research something for me")
assert result["success"] is True
assert result["task_id"] is not None
assert result["status"] == "queued"
def test_delegate_task_status_queued_for_valid_agent(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("coder", "implement feature X")
assert result["status"] == "queued"
assert len(result["task_id"]) == 8
def test_task_in_registry_after_delegation(self):
from timmy.tools_delegation import delegate_task
with patch.object(DistributedWorker, "_run_task"):
result = delegate_task("writer", "write documentation")
task_id = result["task_id"]
status = DistributedWorker.get_status(task_id)
assert status["found"] is True
assert status["agent"] == "writer"