diff --git a/pyproject.toml b/pyproject.toml index 06f2b46e..51f294eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ packages = [ { include = "config.py", from = "src" }, { include = "bannerlord", from = "src" }, + { include = "brain", from = "src" }, { include = "dashboard", from = "src" }, { include = "infrastructure", from = "src" }, { include = "integrations", from = "src" }, diff --git a/src/brain/__init__.py b/src/brain/__init__.py new file mode 100644 index 00000000..2b39daa1 --- /dev/null +++ b/src/brain/__init__.py @@ -0,0 +1 @@ +"""Brain — identity system and task coordination.""" diff --git a/src/brain/worker.py b/src/brain/worker.py new file mode 100644 index 00000000..76592f86 --- /dev/null +++ b/src/brain/worker.py @@ -0,0 +1,314 @@ +"""DistributedWorker — task lifecycle management and backend routing. + +Routes delegated tasks to appropriate execution backends: + +- agentic_loop: local multi-step execution via Timmy's agentic loop +- kimi: heavy research tasks dispatched via Gitea kimi-ready issues +- paperclip: task submission to the Paperclip API + +Task lifecycle: queued → running → completed | failed + +Failure handling: auto-retry up to MAX_RETRIES, then mark failed. +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +import uuid +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any, ClassVar + +logger = logging.getLogger(__name__) + +MAX_RETRIES = 2 + + +# --------------------------------------------------------------------------- +# Task record +# --------------------------------------------------------------------------- + + +@dataclass +class DelegatedTask: + """Record of one delegated task and its execution state.""" + + task_id: str + agent_name: str + agent_role: str + task_description: str + priority: str + backend: str # "agentic_loop" | "kimi" | "paperclip" + status: str = "queued" # queued | running | completed | failed + created_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) + result: dict[str, Any] | None = None + error: str | None = None + retries: int = 0 + + +# --------------------------------------------------------------------------- +# Worker +# --------------------------------------------------------------------------- + + +class DistributedWorker: + """Routes and tracks delegated task execution across multiple backends. + + All methods are class-methods; DistributedWorker is a singleton-style + service — no instantiation needed. + + Usage:: + + from brain.worker import DistributedWorker + + task_id = DistributedWorker.submit("researcher", "research", "summarise X") + status = DistributedWorker.get_status(task_id) + """ + + _tasks: ClassVar[dict[str, DelegatedTask]] = {} + _lock: ClassVar[threading.Lock] = threading.Lock() + + @classmethod + def submit( + cls, + agent_name: str, + agent_role: str, + task_description: str, + priority: str = "normal", + ) -> str: + """Submit a task for execution. Returns task_id immediately. + + The task is registered as 'queued' and a daemon thread begins + execution in the background. Use get_status(task_id) to poll. + """ + task_id = uuid.uuid4().hex[:8] + backend = cls._select_backend(agent_role, task_description) + + record = DelegatedTask( + task_id=task_id, + agent_name=agent_name, + agent_role=agent_role, + task_description=task_description, + priority=priority, + backend=backend, + ) + + with cls._lock: + cls._tasks[task_id] = record + + thread = threading.Thread( + target=cls._run_task, + args=(record,), + daemon=True, + name=f"worker-{task_id}", + ) + thread.start() + + logger.info( + "Task %s queued: %s → %.60s (backend=%s, priority=%s)", + task_id, + agent_name, + task_description, + backend, + priority, + ) + return task_id + + @classmethod + def get_status(cls, task_id: str) -> dict[str, Any]: + """Return current status of a task by ID.""" + record = cls._tasks.get(task_id) + if record is None: + return {"found": False, "task_id": task_id} + return { + "found": True, + "task_id": record.task_id, + "agent": record.agent_name, + "role": record.agent_role, + "status": record.status, + "backend": record.backend, + "priority": record.priority, + "created_at": record.created_at, + "retries": record.retries, + "result": record.result, + "error": record.error, + } + + @classmethod + def list_tasks(cls) -> list[dict[str, Any]]: + """Return a summary list of all tracked tasks.""" + with cls._lock: + return [ + { + "task_id": t.task_id, + "agent": t.agent_name, + "status": t.status, + "backend": t.backend, + "created_at": t.created_at, + } + for t in cls._tasks.values() + ] + + @classmethod + def clear(cls) -> None: + """Clear the task registry (for tests).""" + with cls._lock: + cls._tasks.clear() + + # ------------------------------------------------------------------ + # Backend selection + # ------------------------------------------------------------------ + + @classmethod + def _select_backend(cls, agent_role: str, task_description: str) -> str: + """Choose the execution backend for a given agent role and task. + + Priority: + 1. kimi — research role + Gitea enabled + task exceeds local capacity + 2. paperclip — paperclip API key is configured + 3. agentic_loop — local fallback (always available) + """ + try: + from config import settings + from timmy.kimi_delegation import exceeds_local_capacity + + if ( + agent_role == "research" + and getattr(settings, "gitea_enabled", False) + and getattr(settings, "gitea_token", "") + and exceeds_local_capacity(task_description) + ): + return "kimi" + + if getattr(settings, "paperclip_api_key", ""): + return "paperclip" + + except Exception as exc: + logger.debug("Backend selection error — defaulting to agentic_loop: %s", exc) + + return "agentic_loop" + + # ------------------------------------------------------------------ + # Task execution + # ------------------------------------------------------------------ + + @classmethod + def _run_task(cls, record: DelegatedTask) -> None: + """Execute a task with retry logic. Runs inside a daemon thread.""" + record.status = "running" + + for attempt in range(MAX_RETRIES + 1): + try: + if attempt > 0: + logger.info( + "Retrying task %s (attempt %d/%d)", + record.task_id, + attempt + 1, + MAX_RETRIES + 1, + ) + record.retries = attempt + + result = cls._dispatch(record) + record.status = "completed" + record.result = result + logger.info( + "Task %s completed via %s", + record.task_id, + record.backend, + ) + return + + except Exception as exc: + logger.warning( + "Task %s attempt %d failed: %s", + record.task_id, + attempt + 1, + exc, + ) + if attempt == MAX_RETRIES: + record.status = "failed" + record.error = str(exc) + logger.error( + "Task %s exhausted %d retries. Final error: %s", + record.task_id, + MAX_RETRIES, + exc, + ) + + @classmethod + def _dispatch(cls, record: DelegatedTask) -> dict[str, Any]: + """Route to the selected backend. Raises on failure.""" + if record.backend == "kimi": + return asyncio.run(cls._execute_kimi(record)) + if record.backend == "paperclip": + return asyncio.run(cls._execute_paperclip(record)) + return asyncio.run(cls._execute_agentic_loop(record)) + + @classmethod + async def _execute_kimi(cls, record: DelegatedTask) -> dict[str, Any]: + """Create a kimi-ready Gitea issue for the task. + + Kimi picks up the issue via the kimi-ready label and executes it. + """ + from timmy.kimi_delegation import create_kimi_research_issue + + result = await create_kimi_research_issue( + task=record.task_description[:120], + context=f"Delegated by agent '{record.agent_name}' via delegate_task.", + question=record.task_description, + priority=record.priority, + ) + if not result.get("success"): + raise RuntimeError(f"Kimi issue creation failed: {result.get('error')}") + return result + + @classmethod + async def _execute_paperclip(cls, record: DelegatedTask) -> dict[str, Any]: + """Submit the task to the Paperclip API.""" + import httpx + + from timmy.paperclip import PaperclipClient + + client = PaperclipClient() + async with httpx.AsyncClient(timeout=client.timeout) as http: + resp = await http.post( + f"{client.base_url}/api/tasks", + headers={"Authorization": f"Bearer {client.api_key}"}, + json={ + "kind": record.agent_role, + "agent_id": client.agent_id, + "company_id": client.company_id, + "priority": record.priority, + "context": {"task": record.task_description}, + }, + ) + + if resp.status_code in (200, 201): + data = resp.json() + logger.info( + "Task %s submitted to Paperclip (paperclip_id=%s)", + record.task_id, + data.get("id"), + ) + return { + "success": True, + "paperclip_task_id": data.get("id"), + "backend": "paperclip", + } + raise RuntimeError(f"Paperclip API error {resp.status_code}: {resp.text[:200]}") + + @classmethod + async def _execute_agentic_loop(cls, record: DelegatedTask) -> dict[str, Any]: + """Execute the task via Timmy's local agentic loop.""" + from timmy.agentic_loop import run_agentic_loop + + result = await run_agentic_loop(record.task_description) + return { + "success": result.status != "failed", + "agentic_task_id": result.task_id, + "summary": result.summary, + "status": result.status, + "backend": "agentic_loop", + } diff --git a/src/timmy/tools_delegation/__init__.py b/src/timmy/tools_delegation/__init__.py index dee9e8a5..8ea41214 100644 --- a/src/timmy/tools_delegation/__init__.py +++ b/src/timmy/tools_delegation/__init__.py @@ -41,17 +41,38 @@ def delegate_task( if priority not in valid_priorities: priority = "normal" + agent_role = available[agent_name] + + # Wire to DistributedWorker for actual execution + task_id: str | None = None + status = "queued" + try: + from brain.worker import DistributedWorker + + task_id = DistributedWorker.submit(agent_name, agent_role, task_description, priority) + except Exception as exc: + logger.warning("DistributedWorker unavailable — task noted only: %s", exc) + status = "noted" + logger.info( - "Delegation intent: %s → %s (priority=%s)", agent_name, task_description[:80], priority + "Delegated task %s: %s → %s (priority=%s, status=%s)", + task_id or "?", + agent_name, + task_description[:80], + priority, + status, ) return { "success": True, - "task_id": None, + "task_id": task_id, "agent": agent_name, - "role": available[agent_name], - "status": "noted", - "message": f"Delegation to {agent_name} ({available[agent_name]}): {task_description[:100]}", + "role": agent_role, + "status": status, + "message": ( + f"Task {task_id or 'noted'}: delegated to {agent_name} ({agent_role}): " + f"{task_description[:100]}" + ), } diff --git a/tests/unit/test_brain_worker.py b/tests/unit/test_brain_worker.py new file mode 100644 index 00000000..14c50fe4 --- /dev/null +++ b/tests/unit/test_brain_worker.py @@ -0,0 +1,235 @@ +"""Unit tests for brain.worker.DistributedWorker.""" + +from __future__ import annotations + +import threading +from unittest.mock import MagicMock, patch + +import pytest + +from brain.worker import MAX_RETRIES, DelegatedTask, DistributedWorker + + +@pytest.fixture(autouse=True) +def clear_task_registry(): + """Reset the worker registry before each test.""" + DistributedWorker.clear() + yield + DistributedWorker.clear() + + +class TestSubmit: + def test_returns_task_id(self): + with patch.object(DistributedWorker, "_run_task"): + task_id = DistributedWorker.submit("researcher", "research", "find something") + assert isinstance(task_id, str) + assert len(task_id) == 8 + + def test_task_registered_as_queued(self): + with patch.object(DistributedWorker, "_run_task"): + task_id = DistributedWorker.submit("coder", "code", "fix the bug") + status = DistributedWorker.get_status(task_id) + assert status["found"] is True + assert status["task_id"] == task_id + assert status["agent"] == "coder" + + def test_unique_task_ids(self): + with patch.object(DistributedWorker, "_run_task"): + ids = [DistributedWorker.submit("coder", "code", "task") for _ in range(10)] + assert len(set(ids)) == 10 + + def test_starts_daemon_thread(self): + event = threading.Event() + + def fake_run_task(record): + event.set() + + with patch.object(DistributedWorker, "_run_task", side_effect=fake_run_task): + DistributedWorker.submit("coder", "code", "something") + + assert event.wait(timeout=2), "Background thread did not start" + + def test_priority_stored(self): + with patch.object(DistributedWorker, "_run_task"): + task_id = DistributedWorker.submit("coder", "code", "task", priority="high") + status = DistributedWorker.get_status(task_id) + assert status["priority"] == "high" + + +class TestGetStatus: + def test_unknown_task_id(self): + result = DistributedWorker.get_status("deadbeef") + assert result["found"] is False + assert result["task_id"] == "deadbeef" + + def test_known_task_has_all_fields(self): + with patch.object(DistributedWorker, "_run_task"): + task_id = DistributedWorker.submit("writer", "writing", "write a blog post") + status = DistributedWorker.get_status(task_id) + for key in ("found", "task_id", "agent", "role", "status", "backend", "created_at"): + assert key in status, f"Missing key: {key}" + + +class TestListTasks: + def test_empty_initially(self): + assert DistributedWorker.list_tasks() == [] + + def test_returns_registered_tasks(self): + with patch.object(DistributedWorker, "_run_task"): + DistributedWorker.submit("coder", "code", "task A") + DistributedWorker.submit("writer", "writing", "task B") + tasks = DistributedWorker.list_tasks() + assert len(tasks) == 2 + agents = {t["agent"] for t in tasks} + assert agents == {"coder", "writer"} + + +class TestSelectBackend: + def test_defaults_to_agentic_loop(self): + with patch("brain.worker.logger"): + backend = DistributedWorker._select_backend("code", "fix the bug") + assert backend == "agentic_loop" + + def test_kimi_for_heavy_research_with_gitea(self): + mock_settings = MagicMock() + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "tok" + mock_settings.paperclip_api_key = "" + + with ( + patch("timmy.kimi_delegation.exceeds_local_capacity", return_value=True), + patch("config.settings", mock_settings), + ): + backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10) + assert backend == "kimi" + + def test_agentic_loop_when_no_gitea(self): + mock_settings = MagicMock() + mock_settings.gitea_enabled = False + mock_settings.gitea_token = "" + mock_settings.paperclip_api_key = "" + + with patch("config.settings", mock_settings): + backend = DistributedWorker._select_backend("research", "comprehensive survey " * 10) + assert backend == "agentic_loop" + + def test_paperclip_when_api_key_configured(self): + mock_settings = MagicMock() + mock_settings.gitea_enabled = False + mock_settings.gitea_token = "" + mock_settings.paperclip_api_key = "pk_test_123" + + with patch("config.settings", mock_settings): + backend = DistributedWorker._select_backend("code", "build a widget") + assert backend == "paperclip" + + +class TestRunTask: + def test_marks_completed_on_success(self): + record = DelegatedTask( + task_id="abc12345", + agent_name="coder", + agent_role="code", + task_description="fix bug", + priority="normal", + backend="agentic_loop", + ) + + with patch.object(DistributedWorker, "_dispatch", return_value={"success": True}): + DistributedWorker._run_task(record) + + assert record.status == "completed" + assert record.result == {"success": True} + assert record.error is None + + def test_marks_failed_after_exhausting_retries(self): + record = DelegatedTask( + task_id="fail1234", + agent_name="coder", + agent_role="code", + task_description="broken task", + priority="normal", + backend="agentic_loop", + ) + + with patch.object(DistributedWorker, "_dispatch", side_effect=RuntimeError("boom")): + DistributedWorker._run_task(record) + + assert record.status == "failed" + assert "boom" in record.error + assert record.retries == MAX_RETRIES + + def test_retries_before_failing(self): + record = DelegatedTask( + task_id="retry001", + agent_name="coder", + agent_role="code", + task_description="flaky task", + priority="normal", + backend="agentic_loop", + ) + + call_count = 0 + + def flaky_dispatch(r): + nonlocal call_count + call_count += 1 + if call_count < MAX_RETRIES + 1: + raise RuntimeError("transient failure") + return {"success": True} + + with patch.object(DistributedWorker, "_dispatch", side_effect=flaky_dispatch): + DistributedWorker._run_task(record) + + assert record.status == "completed" + assert call_count == MAX_RETRIES + 1 + + def test_succeeds_on_first_attempt(self): + record = DelegatedTask( + task_id="ok000001", + agent_name="writer", + agent_role="writing", + task_description="write summary", + priority="low", + backend="agentic_loop", + ) + + with patch.object(DistributedWorker, "_dispatch", return_value={"summary": "done"}): + DistributedWorker._run_task(record) + + assert record.status == "completed" + assert record.retries == 0 + + +class TestDelegatetaskIntegration: + """Integration: delegate_task should wire to DistributedWorker.""" + + def test_delegate_task_returns_task_id(self): + from timmy.tools_delegation import delegate_task + + with patch.object(DistributedWorker, "_run_task"): + result = delegate_task("researcher", "research something for me") + + assert result["success"] is True + assert result["task_id"] is not None + assert result["status"] == "queued" + + def test_delegate_task_status_queued_for_valid_agent(self): + from timmy.tools_delegation import delegate_task + + with patch.object(DistributedWorker, "_run_task"): + result = delegate_task("coder", "implement feature X") + + assert result["status"] == "queued" + assert len(result["task_id"]) == 8 + + def test_task_in_registry_after_delegation(self): + from timmy.tools_delegation import delegate_task + + with patch.object(DistributedWorker, "_run_task"): + result = delegate_task("writer", "write documentation") + + task_id = result["task_id"] + status = DistributedWorker.get_status(task_id) + assert status["found"] is True + assert status["agent"] == "writer"