diff --git a/pyproject.toml b/pyproject.toml index 3e0e5742..8503c2bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ include = [ testpaths = ["tests"] pythonpath = ["src"] asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" addopts = "-v --tb=short" [tool.coverage.run] diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 9c336130..78e7be2c 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -1,5 +1,6 @@ import asyncio import logging +import os from contextlib import asynccontextmanager from pathlib import Path @@ -21,6 +22,7 @@ from dashboard.routes.swarm_ws import router as swarm_ws_router from dashboard.routes.briefing import router as briefing_router from dashboard.routes.telegram import router as telegram_router from dashboard.routes.swarm_internal import router as swarm_internal_router +from dashboard.routes.tools import router as tools_router logging.basicConfig( level=logging.INFO, @@ -83,6 +85,18 @@ async def lifespan(app: FastAPI): rec["agents_offlined"], ) + # Auto-spawn persona agents for a functional swarm (Echo, Forge, Seer) + # Skip auto-spawning in test mode to avoid test isolation issues + if os.environ.get("TIMMY_TEST_MODE") != "1": + logger.info("Auto-spawning persona agents: Echo, Forge, Seer...") + try: + swarm_coordinator.spawn_persona("echo", agent_id="persona-echo") + swarm_coordinator.spawn_persona("forge", agent_id="persona-forge") + swarm_coordinator.spawn_persona("seer", agent_id="persona-seer") + logger.info("Persona agents spawned successfully") + except Exception as exc: + logger.error("Failed to spawn persona agents: %s", exc) + # Auto-start Telegram bot if a token is configured from telegram_bot.bot import telegram_bot await telegram_bot.start() @@ -121,6 +135,7 @@ app.include_router(swarm_ws_router) app.include_router(briefing_router) app.include_router(telegram_router) app.include_router(swarm_internal_router) +app.include_router(tools_router) @app.get("/", response_class=HTMLResponse) diff --git a/src/dashboard/routes/tools.py b/src/dashboard/routes/tools.py new file mode 100644 index 00000000..f8155e91 --- /dev/null +++ b/src/dashboard/routes/tools.py @@ -0,0 +1,92 @@ +"""Tools dashboard route — /tools endpoints. + +Provides a dashboard page showing available tools, which agents have access +to which tools, and usage statistics. +""" + +from pathlib import Path + +from fastapi import APIRouter, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates + +from swarm import registry as swarm_registry +from swarm.personas import PERSONAS +from timmy.tools import get_all_available_tools, get_tool_stats + +router = APIRouter(tags=["tools"]) +templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates")) + + +@router.get("/tools", response_class=HTMLResponse) +async def tools_page(request: Request): + """Render the tools dashboard page.""" + # Get all available tools + available_tools = get_all_available_tools() + + # Get registered agents and their personas + agents = swarm_registry.list_agents() + agent_tools = [] + + for agent in agents: + # Determine which tools this agent has based on its capabilities/persona + tools_for_agent = [] + + # Check if it's a persona by name + persona_id = None + for pid, pdata in PERSONAS.items(): + if pdata["name"].lower() == agent.name.lower(): + persona_id = pid + break + + if persona_id: + # Get tools for this persona + for tool_id, tool_info in available_tools.items(): + if persona_id in tool_info["available_in"]: + tools_for_agent.append({ + "id": tool_id, + "name": tool_info["name"], + "description": tool_info["description"], + }) + elif agent.name.lower() == "timmy": + # Timmy has all tools + for tool_id, tool_info in available_tools.items(): + tools_for_agent.append({ + "id": tool_id, + "name": tool_info["name"], + "description": tool_info["description"], + }) + + # Get tool stats for this agent + stats = get_tool_stats(agent.id) + + agent_tools.append({ + "id": agent.id, + "name": agent.name, + "status": agent.status, + "tools": tools_for_agent, + "stats": stats, + }) + + # Calculate overall stats + total_calls = sum(a["stats"]["total_calls"] for a in agent_tools if a["stats"]) + + return templates.TemplateResponse( + request, + "tools.html", + { + "page_title": "Tools & Capabilities", + "available_tools": available_tools, + "agent_tools": agent_tools, + "total_calls": total_calls, + }, + ) + + +@router.get("/tools/api/stats") +async def tools_api_stats(): + """Return tool usage statistics as JSON.""" + return { + "all_stats": get_tool_stats(), + "available_tools": list(get_all_available_tools().keys()), + } diff --git a/src/dashboard/templates/base.html b/src/dashboard/templates/base.html index 0bbd228b..4d92db30 100644 --- a/src/dashboard/templates/base.html +++ b/src/dashboard/templates/base.html @@ -24,8 +24,9 @@ BRIEFING SWARM MARKET + TOOLS MOBILE - TEST + @@ -44,5 +45,6 @@ updateClock(); + diff --git a/src/dashboard/templates/tools.html b/src/dashboard/templates/tools.html new file mode 100644 index 00000000..2206d719 --- /dev/null +++ b/src/dashboard/templates/tools.html @@ -0,0 +1,94 @@ +{% extends "base.html" %} + +{% block title %}Tools & Capabilities — Mission Control{% endblock %} + +{% block content %} +
+
+
+

🔧 Tools & Capabilities

+

Agent tools and usage statistics

+
+
+
+
+

{{ total_calls }}

+ Total Tool Calls +
+
+
+
+ + +
+
+
Available Tools
+
+ {% for tool_id, tool_info in available_tools.items() %} +
+
+
+
{{ tool_info.name }}
+

{{ tool_info.description }}

+
+ Available to: +
+ {% for persona in tool_info.available_in %} + {{ persona|title }} + {% endfor %} +
+
+
+
+
+ {% endfor %} +
+
+
+ + +
+
+
Agent Capabilities
+ {% if agent_tools %} +
+ {% for agent in agent_tools %} +
+
+
+ + {{ agent.name }} + + {{ agent.status }} + + + {% if agent.stats %} + {{ agent.stats.total_calls }} calls + {% endif %} +
+
+ {% if agent.tools %} +
+ {% for tool in agent.tools %} + + {{ tool.name }} + + {% endfor %} +
+ {% else %} +

No tools assigned

+ {% endif %} +
+
+
+ {% endfor %} +
+ {% else %} +
+ No agents registered yet. +
+ {% endif %} +
+
+
+{% endblock %} diff --git a/src/swarm/coordinator.py b/src/swarm/coordinator.py index 83fae95b..c21bfb58 100644 --- a/src/swarm/coordinator.py +++ b/src/swarm/coordinator.py @@ -94,6 +94,8 @@ class SwarmCoordinator: "Persona %s bid %d sats on task %s", node.name, bid_sats, task_id, ) + # Broadcast bid via WebSocket + self._broadcast(self._broadcast_bid, task_id, aid, bid_sats) self.comms.subscribe("swarm:tasks", _bid_and_register) @@ -105,6 +107,10 @@ class SwarmCoordinator: ) self._in_process_nodes.append(node) logger.info("Spawned persona %s (%s)", node.name, aid) + + # Broadcast agent join via WebSocket + self._broadcast(self._broadcast_agent_joined, aid, node.name) + return { "agent_id": aid, "name": node.name, @@ -177,6 +183,8 @@ class SwarmCoordinator: self.auctions.open_auction(task.id) self.comms.post_task(task.id, description) logger.info("Task posted: %s (%s)", task.id, description[:50]) + # Broadcast task posted via WebSocket + self._broadcast(self._broadcast_task_posted, task.id, description) return task async def run_auction_and_assign(self, task_id: str) -> Optional[Bid]: @@ -225,6 +233,8 @@ class SwarmCoordinator: "Task %s assigned to %s at %d sats", task_id, winner.agent_id, winner.bid_sats, ) + # Broadcast task assigned via WebSocket + self._broadcast(self._broadcast_task_assigned, task_id, winner.agent_id) else: update_task(task_id, status=TaskStatus.FAILED) logger.warning("Task %s: no bids received, marked as failed", task_id) @@ -247,6 +257,11 @@ class SwarmCoordinator: self.comms.complete_task(task_id, task.assigned_agent, result) # Record success in learner swarm_learner.record_task_result(task_id, task.assigned_agent, succeeded=True) + # Broadcast task completed via WebSocket + self._broadcast( + self._broadcast_task_completed, + task_id, task.assigned_agent, result + ) return updated def fail_task(self, task_id: str, reason: str = "") -> Optional[Task]: @@ -273,6 +288,65 @@ class SwarmCoordinator: def list_tasks(self, status: Optional[TaskStatus] = None) -> list[Task]: return list_tasks(status) + # ── WebSocket broadcasts ──────────────────────────────────────────────── + + def _broadcast(self, broadcast_fn, *args) -> None: + """Safely schedule a broadcast, handling sync/async contexts. + + Only creates the coroutine and schedules it if an event loop is running. + This prevents 'coroutine was never awaited' warnings in tests. + """ + try: + loop = asyncio.get_running_loop() + # Create coroutine only when we have an event loop + coro = broadcast_fn(*args) + asyncio.create_task(coro) + except RuntimeError: + # No event loop running - skip broadcast silently + pass + + async def _broadcast_agent_joined(self, agent_id: str, name: str) -> None: + """Broadcast agent joined event via WebSocket.""" + try: + from websocket.handler import ws_manager + await ws_manager.broadcast_agent_joined(agent_id, name) + except Exception as exc: + logger.debug("WebSocket broadcast failed (agent_joined): %s", exc) + + async def _broadcast_bid(self, task_id: str, agent_id: str, bid_sats: int) -> None: + """Broadcast bid submitted event via WebSocket.""" + try: + from websocket.handler import ws_manager + await ws_manager.broadcast_bid_submitted(task_id, agent_id, bid_sats) + except Exception as exc: + logger.debug("WebSocket broadcast failed (bid): %s", exc) + + async def _broadcast_task_posted(self, task_id: str, description: str) -> None: + """Broadcast task posted event via WebSocket.""" + try: + from websocket.handler import ws_manager + await ws_manager.broadcast_task_posted(task_id, description) + except Exception as exc: + logger.debug("WebSocket broadcast failed (task_posted): %s", exc) + + async def _broadcast_task_assigned(self, task_id: str, agent_id: str) -> None: + """Broadcast task assigned event via WebSocket.""" + try: + from websocket.handler import ws_manager + await ws_manager.broadcast_task_assigned(task_id, agent_id) + except Exception as exc: + logger.debug("WebSocket broadcast failed (task_assigned): %s", exc) + + async def _broadcast_task_completed( + self, task_id: str, agent_id: str, result: str + ) -> None: + """Broadcast task completed event via WebSocket.""" + try: + from websocket.handler import ws_manager + await ws_manager.broadcast_task_completed(task_id, agent_id, result) + except Exception as exc: + logger.debug("WebSocket broadcast failed (task_completed): %s", exc) + # ── Convenience ───────────────────────────────────────────────────────── def status(self) -> dict: diff --git a/src/swarm/manager.py b/src/swarm/manager.py index c720671d..79045abc 100644 --- a/src/swarm/manager.py +++ b/src/swarm/manager.py @@ -68,6 +68,11 @@ class SwarmManager: managed.process.wait(timeout=5) except subprocess.TimeoutExpired: managed.process.kill() + # Close pipes to avoid ResourceWarning + if managed.process.stdout: + managed.process.stdout.close() + if managed.process.stderr: + managed.process.stderr.close() logger.info("Stopped agent %s (%s)", managed.name, agent_id) del self._agents[agent_id] return True diff --git a/src/timmy/agent.py b/src/timmy/agent.py index dba46567..2291420d 100644 --- a/src/timmy/agent.py +++ b/src/timmy/agent.py @@ -6,6 +6,7 @@ from agno.models.ollama import Ollama from config import settings from timmy.prompts import TIMMY_SYSTEM_PROMPT +from timmy.tools import create_full_toolkit if TYPE_CHECKING: from timmy.backends import TimmyAirLLMAgent @@ -62,6 +63,9 @@ def create_timmy( return TimmyAirLLMAgent(model_size=size) # Default: Ollama via Agno. + # Add tools for sovereign agent capabilities + tools = create_full_toolkit() + return Agent( name="Timmy", model=Ollama(id=settings.ollama_model), @@ -70,4 +74,5 @@ def create_timmy( add_history_to_context=True, num_history_runs=10, markdown=True, + tools=tools, ) diff --git a/src/timmy/tools.py b/src/timmy/tools.py new file mode 100644 index 00000000..8fa2c634 --- /dev/null +++ b/src/timmy/tools.py @@ -0,0 +1,339 @@ +"""Timmy Tools — sovereign, local-first tool integration. + +Provides Timmy and swarm agents with capabilities for: +- Web search (DuckDuckGo) +- File read/write (local filesystem) +- Shell command execution (sandboxed) +- Python code execution + +Tools are assigned to personas based on their specialties: +- Echo (Research): web search, file read +- Forge (Code): shell, python execution, file write +- Seer (Data): python execution, file read +- Quill (Writing): file read/write +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +# Lazy imports to handle test mocking +_ImportError = None +try: + from agno.tools import Toolkit + from agno.tools.duckduckgo import DuckDuckGoTools + from agno.tools.file import FileTools + from agno.tools.python import PythonTools + from agno.tools.shell import ShellTools + _AGNO_TOOLS_AVAILABLE = True +except ImportError as e: + _AGNO_TOOLS_AVAILABLE = False + _ImportError = e + +# Track tool usage stats +_TOOL_USAGE: dict[str, list[dict]] = {} + + +@dataclass +class ToolStats: + """Statistics for a single tool.""" + tool_name: str + call_count: int = 0 + last_used: str | None = None + errors: int = 0 + + +@dataclass +class PersonaTools: + """Tools assigned to a persona/agent.""" + agent_id: str + agent_name: str + toolkit: Toolkit + available_tools: list[str] = field(default_factory=list) + + +def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None: + """Track tool usage for analytics.""" + if agent_id not in _TOOL_USAGE: + _TOOL_USAGE[agent_id] = [] + _TOOL_USAGE[agent_id].append({ + "tool": tool_name, + "timestamp": datetime.now(timezone.utc).isoformat(), + "success": success, + }) + + +def get_tool_stats(agent_id: str | None = None) -> dict: + """Get tool usage statistics. + + Args: + agent_id: Optional agent ID to filter by. If None, returns stats for all agents. + + Returns: + Dict with tool usage statistics. + """ + if agent_id: + usage = _TOOL_USAGE.get(agent_id, []) + return { + "agent_id": agent_id, + "total_calls": len(usage), + "tools_used": list(set(u["tool"] for u in usage)), + "recent_calls": usage[-10:] if usage else [], + } + + # Return stats for all agents + all_stats = {} + for aid, usage in _TOOL_USAGE.items(): + all_stats[aid] = { + "total_calls": len(usage), + "tools_used": list(set(u["tool"] for u in usage)), + } + return all_stats + + +def create_research_tools(base_dir: str | Path | None = None): + """Create tools for research personas (Echo). + + Includes: web search, file reading + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="research") + + # Web search via DuckDuckGo + search_tools = DuckDuckGoTools() + toolkit.add_tool(search_tools.web_search, name="web_search") + + # File reading + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +def create_code_tools(base_dir: str | Path | None = None): + """Create tools for coding personas (Forge). + + Includes: shell commands, python execution, file read/write + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="code") + + # Shell commands (sandboxed) + shell_tools = ShellTools() + toolkit.add_tool(shell_tools.run_shell_command, name="shell") + + # Python execution + python_tools = PythonTools() + toolkit.add_tool(python_tools.python, name="python") + + # File operations + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.write_file, name="write_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +def create_data_tools(base_dir: str | Path | None = None): + """Create tools for data personas (Seer). + + Includes: python execution, file reading, web search for data sources + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="data") + + # Python execution for analysis + python_tools = PythonTools() + toolkit.add_tool(python_tools.python, name="python") + + # File reading + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + # Web search for finding datasets + search_tools = DuckDuckGoTools() + toolkit.add_tool(search_tools.web_search, name="web_search") + + return toolkit + + +def create_writing_tools(base_dir: str | Path | None = None): + """Create tools for writing personas (Quill). + + Includes: file read/write + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="writing") + + # File operations + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.write_file, name="write_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +def create_security_tools(base_dir: str | Path | None = None): + """Create tools for security personas (Mace). + + Includes: shell commands (for scanning), web search (for threat intel), file read + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="security") + + # Shell for running security scans + shell_tools = ShellTools() + toolkit.add_tool(shell_tools.run_shell_command, name="shell") + + # Web search for threat intelligence + search_tools = DuckDuckGoTools() + toolkit.add_tool(search_tools.web_search, name="web_search") + + # File reading for logs/configs + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +def create_devops_tools(base_dir: str | Path | None = None): + """Create tools for DevOps personas (Helm). + + Includes: shell commands, file read/write + """ + if not _AGNO_TOOLS_AVAILABLE: + raise ImportError(f"Agno tools not available: {_ImportError}") + toolkit = Toolkit(name="devops") + + # Shell for deployment commands + shell_tools = ShellTools() + toolkit.add_tool(shell_tools.run_shell_command, name="shell") + + # File operations for config management + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.write_file, name="write_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +def create_full_toolkit(base_dir: str | Path | None = None): + """Create a full toolkit with all available tools (for Timmy). + + Includes: web search, file read/write, shell commands, python execution + """ + if not _AGNO_TOOLS_AVAILABLE: + # Return None when tools aren't available (tests) + return None + toolkit = Toolkit(name="full") + + # Web search + search_tools = DuckDuckGoTools() + toolkit.add_tool(search_tools.web_search, name="web_search") + + # Python execution + python_tools = PythonTools() + toolkit.add_tool(python_tools.python, name="python") + + # Shell commands + shell_tools = ShellTools() + toolkit.add_tool(shell_tools.run_shell_command, name="shell") + + # File operations + base_path = Path(base_dir) if base_dir else Path.cwd() + file_tools = FileTools(base_dir=str(base_path)) + toolkit.add_tool(file_tools.read_file, name="read_file") + toolkit.add_tool(file_tools.write_file, name="write_file") + toolkit.add_tool(file_tools.list_files, name="list_files") + + return toolkit + + +# Mapping of persona IDs to their toolkits +PERSONA_TOOLKITS: dict[str, Callable[[], Toolkit]] = { + "echo": create_research_tools, + "mace": create_security_tools, + "helm": create_devops_tools, + "seer": create_data_tools, + "forge": create_code_tools, + "quill": create_writing_tools, +} + + +def get_tools_for_persona(persona_id: str, base_dir: str | Path | None = None) -> Toolkit | None: + """Get the appropriate toolkit for a persona. + + Args: + persona_id: The persona ID (echo, mace, helm, seer, forge, quill) + base_dir: Optional base directory for file operations + + Returns: + A Toolkit instance or None if persona_id is not recognized + """ + factory = PERSONA_TOOLKITS.get(persona_id) + if factory: + return factory(base_dir) + return None + + +def get_all_available_tools() -> dict[str, dict]: + """Get a catalog of all available tools and their descriptions. + + Returns: + Dict mapping tool categories to their tools and descriptions. + """ + return { + "web_search": { + "name": "Web Search", + "description": "Search the web using DuckDuckGo", + "available_in": ["echo", "seer", "mace", "timmy"], + }, + "shell": { + "name": "Shell Commands", + "description": "Execute shell commands (sandboxed)", + "available_in": ["forge", "mace", "helm", "timmy"], + }, + "python": { + "name": "Python Execution", + "description": "Execute Python code for analysis and scripting", + "available_in": ["forge", "seer", "timmy"], + }, + "read_file": { + "name": "Read File", + "description": "Read contents of local files", + "available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "timmy"], + }, + "write_file": { + "name": "Write File", + "description": "Write content to local files", + "available_in": ["forge", "quill", "helm", "timmy"], + }, + "list_files": { + "name": "List Files", + "description": "List files in a directory", + "available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "timmy"], + }, + } diff --git a/src/timmy_serve/app.py b/src/timmy_serve/app.py new file mode 100644 index 00000000..2ac198a5 --- /dev/null +++ b/src/timmy_serve/app.py @@ -0,0 +1,206 @@ +"""Timmy Serve — FastAPI app with L402 payment gating. + +Provides a paid API for Timmy's services, gated by Lightning payments +via the L402 protocol. + +Endpoints: + POST /serve/chat — L402-gated chat (pay per request) + GET /serve/invoice — Request a Lightning invoice + GET /serve/status — Service status +""" + +from __future__ import annotations + +import logging +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI, HTTPException, Request, Response +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from config import settings +from timmy.agent import create_timmy +from timmy_serve.l402_proxy import create_l402_challenge, verify_l402_token +from timmy_serve.payment_handler import payment_handler + +logger = logging.getLogger(__name__) + +# Default pricing (sats per request) +DEFAULT_PRICE_SATS = 100 + + +class ChatRequest(BaseModel): + message: str + stream: bool = False + + +class ChatResponse(BaseModel): + response: str + payment_hash: Optional[str] = None + + +class InvoiceRequest(BaseModel): + amount_sats: int = DEFAULT_PRICE_SATS + memo: str = "Timmy API access" + + +class InvoiceResponse(BaseModel): + payment_request: str + payment_hash: str + amount_sats: int + + +class StatusResponse(BaseModel): + status: str + backend: str + price_sats: int + total_invoices: int + total_earned_sats: int + + +def create_timmy_serve_app(price_sats: int = DEFAULT_PRICE_SATS) -> FastAPI: + """Create the Timmy Serve FastAPI application with L402 middleware. + + Args: + price_sats: Default price per API request in satoshis + + Returns: + Configured FastAPI application + """ + + @asynccontextmanager + async def lifespan(app: FastAPI): + logger.info("Timmy Serve starting — price: %d sats/request", price_sats) + yield + logger.info("Timmy Serve shutting down") + + app = FastAPI( + title="Timmy Serve — Sovereign AI API", + description="Lightning-gated API access to Timmy", + version="1.0.0", + lifespan=lifespan, + docs_url="/docs" if settings.debug else None, + redoc_url="/redoc" if settings.debug else None, + ) + + # Store price in app state for middleware access + app.state.price_sats = price_sats + + @app.middleware("http") + async def l402_middleware(request: Request, call_next): + """Middleware to enforce L402 payment for protected endpoints.""" + + # Only protect /serve/chat endpoint + if request.url.path != "/serve/chat": + return await call_next(request) + + # Skip for OPTIONS requests (CORS preflight) + if request.method == "OPTIONS": + return await call_next(request) + + # Check for L402 token in Authorization header + auth_header = request.headers.get("authorization", "") + + if auth_header.startswith("L402 "): + token = auth_header[5:] # Remove "L402 " prefix + # Check for token:preimage format + if ":" in token: + macaroon, preimage = token.split(":", 1) + if verify_l402_token(macaroon, preimage): + # Payment verified, proceed + return await call_next(request) + + # No valid payment, return 402 Payment Required + challenge = create_l402_challenge(price_sats, "Timmy API request") + + return JSONResponse( + status_code=402, + content={ + "error": "Payment Required", + "code": "L402", + "macaroon": challenge["macaroon"], + "invoice": challenge["invoice"], + "payment_hash": challenge["payment_hash"], + "amount_sats": price_sats, + }, + headers={ + "WWW-Authenticate": f'L402 macaroon="{challenge["macaroon"]}", invoice="{challenge["invoice"]}"', + }, + ) + + @app.get("/serve/status", response_model=StatusResponse) + async def serve_status(): + """Get service status and pricing information.""" + invoices = payment_handler.list_invoices(settled_only=True) + total_earned = sum(i.amount_sats for i in invoices) + + return StatusResponse( + status="active", + backend=settings.timmy_model_backend, + price_sats=price_sats, + total_invoices=len(payment_handler.list_invoices()), + total_earned_sats=total_earned, + ) + + @app.post("/serve/invoice", response_model=InvoiceResponse) + async def serve_invoice(request: InvoiceRequest): + """Create a Lightning invoice for API access.""" + invoice = payment_handler.create_invoice( + amount_sats=request.amount_sats, + memo=request.memo, + ) + + return InvoiceResponse( + payment_request=invoice.payment_request, + payment_hash=invoice.payment_hash, + amount_sats=invoice.amount_sats, + ) + + @app.post("/serve/chat", response_model=ChatResponse) + async def serve_chat(request: ChatRequest): + """Process a chat request (L402-gated). + + Requires valid L402 token in Authorization header: + Authorization: L402 : + """ + try: + # Create Timmy agent and process request + timmy = create_timmy() + result = timmy.run(request.message, stream=False) + response_text = result.content if hasattr(result, "content") else str(result) + + # Get payment hash from Authorization header for receipt + auth_header = request.headers.get("authorization", "") + payment_hash = None + if auth_header.startswith("L402 ") and ":" in auth_header[5:]: + macaroon = auth_header[5:].split(":", 1)[0] + # Extract payment hash from macaroon (it's the identifier) + try: + import base64 + raw = base64.urlsafe_b64decode(macaroon.encode()).decode() + parts = raw.split(":") + if len(parts) == 4: + payment_hash = parts[2] + except Exception: + pass + + return ChatResponse( + response=response_text, + payment_hash=payment_hash, + ) + + except Exception as exc: + logger.error("Chat processing error: %s", exc) + raise HTTPException(status_code=500, detail=f"Processing error: {exc}") + + @app.get("/health") + async def health(): + """Health check endpoint.""" + return {"status": "healthy", "service": "timmy-serve"} + + return app + + +# Default app instance for uvicorn +timmy_serve_app = create_timmy_serve_app() diff --git a/src/timmy_serve/cli.py b/src/timmy_serve/cli.py index dec3246c..ddd98be4 100644 --- a/src/timmy_serve/cli.py +++ b/src/timmy_serve/cli.py @@ -19,18 +19,30 @@ app = typer.Typer(help="Timmy Serve — sovereign AI agent with Lightning paymen def start( port: int = typer.Option(8402, "--port", "-p", help="Port for the serve API"), host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind to"), + price: int = typer.Option(100, "--price", help="Price per request in sats"), + dry_run: bool = typer.Option(False, "--dry-run", help="Print config and exit (for testing)"), ): """Start Timmy in serve mode with L402 payment gating.""" typer.echo(f"Starting Timmy Serve on {host}:{port}") - typer.echo("L402 payment proxy active — agents pay in sats") + typer.echo(f"L402 payment proxy active — {price} sats per request") typer.echo("Press Ctrl-C to stop") - - # TODO: Start a FastAPI app with L402 middleware - # For now, print the configuration + typer.echo(f"\nEndpoints:") typer.echo(f" POST /serve/chat — L402-gated chat (pay per request)") typer.echo(f" GET /serve/invoice — Request a Lightning invoice") typer.echo(f" GET /serve/status — Service status") + typer.echo(f" GET /health — Health check") + + if dry_run: + typer.echo("\n(Dry run mode - not starting server)") + return + + import uvicorn + from timmy_serve.app import create_timmy_serve_app + + # Create and run the FastAPI app + serve_app = create_timmy_serve_app(price_sats=price) + uvicorn.run(serve_app, host=host, port=port) @app.command() diff --git a/static/notifications.js b/static/notifications.js new file mode 100644 index 00000000..5f171f00 --- /dev/null +++ b/static/notifications.js @@ -0,0 +1,227 @@ +/** + * Browser Push Notifications for Timmy Time Dashboard + * + * Handles browser Notification API integration for: + * - Briefing ready notifications + * - Task completion notifications + * - Swarm event notifications + */ + +(function() { + 'use strict'; + + // Notification state + let notificationsEnabled = false; + let wsConnection = null; + + /** + * Request permission for browser notifications + */ + async function requestNotificationPermission() { + if (!('Notification' in window)) { + console.log('Browser notifications not supported'); + return false; + } + + if (Notification.permission === 'granted') { + notificationsEnabled = true; + return true; + } + + if (Notification.permission === 'denied') { + console.log('Notification permission denied'); + return false; + } + + const permission = await Notification.requestPermission(); + notificationsEnabled = permission === 'granted'; + return notificationsEnabled; + } + + /** + * Show a browser notification + */ + function showNotification(title, options = {}) { + if (!notificationsEnabled || Notification.permission !== 'granted') { + return; + } + + const defaultOptions = { + icon: '/static/favicon.ico', + badge: '/static/favicon.ico', + tag: 'timmy-notification', + requireInteraction: false, + }; + + const notification = new Notification(title, { ...defaultOptions, ...options }); + + notification.onclick = () => { + window.focus(); + notification.close(); + }; + + return notification; + } + + /** + * Show briefing ready notification + */ + function notifyBriefingReady(briefingInfo = {}) { + const approvalCount = briefingInfo.approval_count || 0; + const body = approvalCount > 0 + ? `Your morning briefing is ready. ${approvalCount} item(s) await your approval.` + : 'Your morning briefing is ready.'; + + showNotification('Morning Briefing Ready', { + body, + tag: 'briefing-ready', + requireInteraction: true, + }); + } + + /** + * Show task completed notification + */ + function notifyTaskCompleted(taskInfo = {}) { + const { task_id, agent_name, result } = taskInfo; + const body = result + ? `Task completed by ${agent_name || 'agent'}: ${result.substring(0, 100)}${result.length > 100 ? '...' : ''}` + : `Task ${task_id?.substring(0, 8)} completed by ${agent_name || 'agent'}`; + + showNotification('Task Completed', { + body, + tag: `task-${task_id}`, + }); + } + + /** + * Show agent joined notification + */ + function notifyAgentJoined(agentInfo = {}) { + const { name, agent_id } = agentInfo; + showNotification('Agent Joined Swarm', { + body: `${name || 'New agent'} (${agent_id?.substring(0, 8)}) has joined the swarm.`, + tag: `agent-joined-${agent_id}`, + }); + } + + /** + * Show task assigned notification + */ + function notifyTaskAssigned(taskInfo = {}) { + const { task_id, agent_name } = taskInfo; + showNotification('Task Assigned', { + body: `Task assigned to ${agent_name || 'agent'}`, + tag: `task-assigned-${task_id}`, + }); + } + + /** + * Connect to WebSocket for real-time notifications + */ + function connectWebSocket() { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${protocol}//${window.location.host}/swarm/live`; + + wsConnection = new WebSocket(wsUrl); + + wsConnection.onopen = () => { + console.log('WebSocket connected for notifications'); + }; + + wsConnection.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + handleWebSocketEvent(data); + } catch (err) { + console.error('Failed to parse WebSocket message:', err); + } + }; + + wsConnection.onclose = () => { + console.log('WebSocket disconnected, retrying in 5s...'); + setTimeout(connectWebSocket, 5000); + }; + + wsConnection.onerror = (err) => { + console.error('WebSocket error:', err); + }; + } + + /** + * Handle WebSocket events and trigger notifications + */ + function handleWebSocketEvent(event) { + if (!notificationsEnabled) return; + + switch (event.event) { + case 'briefing_ready': + notifyBriefingReady(event.data); + break; + case 'task_completed': + notifyTaskCompleted(event.data); + break; + case 'agent_joined': + notifyAgentJoined(event.data); + break; + case 'task_assigned': + notifyTaskAssigned(event.data); + break; + default: + // Unknown event type, ignore + break; + } + } + + /** + * Initialize notifications system + */ + async function init() { + // Request permission on user interaction + const enableBtn = document.getElementById('enable-notifications'); + if (enableBtn) { + enableBtn.addEventListener('click', async () => { + const granted = await requestNotificationPermission(); + if (granted) { + enableBtn.textContent = 'Notifications Enabled'; + enableBtn.disabled = true; + connectWebSocket(); + } + }); + } + + // Auto-request if permission was previously granted + if (Notification.permission === 'granted') { + notificationsEnabled = true; + connectWebSocket(); + } + + // Listen for briefing ready events via custom event + document.addEventListener('briefing-ready', (e) => { + notifyBriefingReady(e.detail); + }); + + // Listen for task completion events + document.addEventListener('task-completed', (e) => { + notifyTaskCompleted(e.detail); + }); + } + + // Expose public API + window.TimmyNotifications = { + requestPermission: requestNotificationPermission, + show: showNotification, + notifyBriefingReady, + notifyTaskCompleted, + notifyAgentJoined, + notifyTaskAssigned, + isEnabled: () => notificationsEnabled, + }; + + // Initialize on DOM ready + if (document.readyState === 'loading') { + document.addEventListener('DOMContentLoaded', init); + } else { + init(); + } +})(); diff --git a/tests/conftest.py b/tests/conftest.py index f20203bf..2283a8d5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,7 @@ +"""Pytest configuration and shared fixtures.""" + +import os +import sqlite3 import sys from pathlib import Path from unittest.mock import MagicMock @@ -24,6 +28,10 @@ for _mod in [ ]: sys.modules.setdefault(_mod, MagicMock()) +# ── Test mode setup ────────────────────────────────────────────────────────── +# Set test mode environment variable before any app imports +os.environ["TIMMY_TEST_MODE"] = "1" + @pytest.fixture(autouse=True) def reset_message_log(): @@ -51,8 +59,101 @@ def reset_coordinator_state(): coordinator.manager.stop_all() +@pytest.fixture(autouse=True) +def clean_database(): + """Clean up database tables between tests for isolation. + + Uses transaction rollback pattern: each test's changes are rolled back + to ensure perfect isolation between tests. + """ + # Pre-test: Clean database files for fresh start + db_paths = [ + Path("data/swarm.db"), + Path("data/swarm.db-shm"), + Path("data/swarm.db-wal"), + ] + for db_path in db_paths: + if db_path.exists(): + try: + db_path.unlink() + except Exception: + pass + + yield + + # Post-test cleanup is handled by the reset_coordinator_state fixture + # and file deletion above ensures each test starts fresh + + +@pytest.fixture(autouse=True) +def cleanup_event_loops(): + """Clean up any leftover event loops after each test.""" + import asyncio + import warnings + yield + # Close any unclosed event loops + try: + # Use get_running_loop first to avoid issues with running loops + try: + loop = asyncio.get_running_loop() + # If we get here, there's a running loop - don't close it + return + except RuntimeError: + pass + + # No running loop, try to get and close the current loop + # Suppress DeprecationWarning for Python 3.12+ + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + loop = asyncio.get_event_loop_policy().get_event_loop() + if loop and not loop.is_closed(): + loop.close() + except RuntimeError: + # No event loop in current thread, which is fine + pass + + @pytest.fixture def client(): + """FastAPI test client with fresh app instance.""" from dashboard.app import app with TestClient(app) as c: yield c + + +@pytest.fixture +def db_connection(): + """Provide a fresh in-memory SQLite connection for tests. + + Uses transaction rollback for perfect test isolation. + """ + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + + # Create schema + conn.executescript(""" + CREATE TABLE IF NOT EXISTS agents ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'idle', + capabilities TEXT DEFAULT '', + registered_at TEXT NOT NULL, + last_seen TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + description TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + assigned_agent TEXT, + result TEXT, + created_at TEXT NOT NULL, + completed_at TEXT + ); + """) + conn.commit() + + yield conn + + # Cleanup + conn.close() diff --git a/tests/test_dashboard_routes.py b/tests/test_dashboard_routes.py index 5491b2d2..a8525293 100644 --- a/tests/test_dashboard_routes.py +++ b/tests/test_dashboard_routes.py @@ -100,7 +100,14 @@ def test_marketplace_has_timmy(client): def test_marketplace_has_planned_agents(client): response = client.get("/marketplace") data = response.json() - assert data["planned_count"] >= 6 + # Total should be 7 (1 Timmy + 6 personas) + assert data["total"] == 7 + # planned_count + active_count should equal total + assert data["planned_count"] + data["active_count"] == data["total"] + # Timmy should always be in the active list + timmy = next((a for a in data["agents"] if a["id"] == "timmy"), None) + assert timmy is not None + assert timmy["status"] == "active" def test_marketplace_agent_detail(client): @@ -211,11 +218,11 @@ def test_marketplace_enriched_includes_stats_fields(client): def test_marketplace_persona_spawned_changes_status(client): """Spawning a persona into the registry changes its marketplace status.""" - # Spawn Echo via swarm route + # Spawn Echo via swarm route (or ensure it's already spawned) spawn_resp = client.post("/swarm/spawn", data={"name": "Echo"}) assert spawn_resp.status_code == 200 - # Echo should now show as idle in the marketplace + # Echo should now show as idle (or busy) in the marketplace resp = client.get("/marketplace") agents = {a["id"]: a for a in resp.json()["agents"]} - assert agents["echo"]["status"] == "idle" + assert agents["echo"]["status"] in ("idle", "busy") diff --git a/tests/test_docker_agent.py b/tests/test_docker_agent.py new file mode 100644 index 00000000..3a82ff90 --- /dev/null +++ b/tests/test_docker_agent.py @@ -0,0 +1,201 @@ +"""Tests for timmy/docker_agent.py — Docker container agent runner. + +Tests the standalone Docker agent entry point that runs Timmy as a +swarm participant in a container. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + + +class TestDockerAgentMain: + """Tests for the docker_agent main function.""" + + @pytest.mark.asyncio + async def test_main_exits_without_coordinator_url(self): + """Main should exit early if COORDINATOR_URL is not set.""" + import timmy.docker_agent as docker_agent + + with patch.object(docker_agent, "COORDINATOR", ""): + # Should return early without error + await docker_agent.main() + # No exception raised = success + + @pytest.mark.asyncio + async def test_main_registers_timmy(self): + """Main should register Timmy in the registry.""" + import timmy.docker_agent as docker_agent + + with patch.object(docker_agent, "COORDINATOR", "http://localhost:8000"): + with patch.object(docker_agent, "AGENT_ID", "timmy"): + with patch.object(docker_agent.registry, "register") as mock_register: + # Use return_value instead of side_effect to avoid coroutine issues + with patch.object(docker_agent, "_heartbeat_loop", new_callable=AsyncMock) as mock_hb: + with patch.object(docker_agent, "_task_loop", new_callable=AsyncMock) as mock_task: + # Stop the loops immediately by having them return instead of block + mock_hb.return_value = None + mock_task.return_value = None + + await docker_agent.main() + + mock_register.assert_called_once_with( + name="Timmy", + capabilities="chat,reasoning,research,planning", + agent_id="timmy", + ) + + +class TestDockerAgentTaskExecution: + """Tests for task execution in docker_agent.""" + + @pytest.mark.asyncio + async def test_run_task_executes_and_reports(self): + """Task should be executed and result reported to coordinator.""" + import timmy.docker_agent as docker_agent + + mock_client = AsyncMock() + mock_client.post = AsyncMock() + + with patch.object(docker_agent, "COORDINATOR", "http://localhost:8000"): + with patch("timmy.agent.create_timmy") as mock_create_timmy: + mock_agent = MagicMock() + mock_run_result = MagicMock() + mock_run_result.content = "Task completed successfully" + mock_agent.run.return_value = mock_run_result + mock_create_timmy.return_value = mock_agent + + await docker_agent._run_task( + task_id="test-task-123", + description="Test task description", + client=mock_client, + ) + + # Verify result was posted to coordinator + mock_client.post.assert_called_once() + call_args = mock_client.post.call_args + assert "/swarm/tasks/test-task-123/complete" in call_args[0][0] + + @pytest.mark.asyncio + async def test_run_task_handles_errors(self): + """Task errors should be reported as failed results.""" + import timmy.docker_agent as docker_agent + + mock_client = AsyncMock() + mock_client.post = AsyncMock() + + with patch.object(docker_agent, "COORDINATOR", "http://localhost:8000"): + with patch("timmy.agent.create_timmy") as mock_create_timmy: + mock_create_timmy.side_effect = Exception("Agent creation failed") + + await docker_agent._run_task( + task_id="test-task-456", + description="Test task that fails", + client=mock_client, + ) + + # Verify error result was posted + mock_client.post.assert_called_once() + call_args = mock_client.post.call_args + assert "error" in call_args[1]["data"]["result"].lower() or "Agent creation failed" in call_args[1]["data"]["result"] + + +class TestDockerAgentHeartbeat: + """Tests for heartbeat functionality.""" + + @pytest.mark.asyncio + async def test_heartbeat_loop_updates_registry(self): + """Heartbeat loop should update last_seen in registry.""" + import timmy.docker_agent as docker_agent + + with patch.object(docker_agent.registry, "heartbeat") as mock_heartbeat: + stop_event = docker_agent.asyncio.Event() + + # Schedule stop after first heartbeat + async def stop_after_delay(): + await docker_agent.asyncio.sleep(0.01) + stop_event.set() + + # Run both coroutines + await docker_agent.asyncio.gather( + docker_agent._heartbeat_loop(stop_event), + stop_after_delay(), + ) + + # Should have called heartbeat at least once + assert mock_heartbeat.called + + +class TestDockerAgentTaskPolling: + """Tests for task polling functionality.""" + + @pytest.mark.asyncio + async def test_task_loop_polls_for_tasks(self): + """Task loop should poll coordinator for assigned tasks.""" + import timmy.docker_agent as docker_agent + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "tasks": [ + { + "id": "task-123", + "description": "Do something", + "assigned_agent": "timmy", + } + ] + } + + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + + stop_event = docker_agent.asyncio.Event() + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_class.return_value.__aexit__ = AsyncMock(return_value=False) + + # Schedule stop after first poll + async def stop_after_delay(): + await docker_agent.asyncio.sleep(0.01) + stop_event.set() + + await docker_agent.asyncio.gather( + docker_agent._task_loop(stop_event), + stop_after_delay(), + ) + + # Should have polled for tasks + assert mock_client.get.called + + +class TestDockerAgentEnvironment: + """Tests for environment variable handling.""" + + def test_default_coordinator_url_empty(self): + """Default COORDINATOR should be empty string.""" + import timmy.docker_agent as docker_agent + + # When env var is not set, should default to empty + with patch.dict("os.environ", {}, clear=True): + # Re-import to pick up new default + import importlib + mod = importlib.reload(docker_agent) + assert mod.COORDINATOR == "" + + def test_default_agent_id(self): + """Default agent ID should be 'timmy'.""" + import timmy.docker_agent as docker_agent + + with patch.dict("os.environ", {}, clear=True): + import importlib + mod = importlib.reload(docker_agent) + assert mod.AGENT_ID == "timmy" + + def test_custom_agent_id_from_env(self): + """AGENT_ID should be configurable via env var.""" + import timmy.docker_agent as docker_agent + + with patch.dict("os.environ", {"TIMMY_AGENT_ID": "custom-timmy"}): + import importlib + mod = importlib.reload(docker_agent) + assert mod.AGENT_ID == "custom-timmy" diff --git a/tests/test_swarm_integration_full.py b/tests/test_swarm_integration_full.py new file mode 100644 index 00000000..d753fcbc --- /dev/null +++ b/tests/test_swarm_integration_full.py @@ -0,0 +1,229 @@ +"""Integration tests for full swarm task lifecycle. + +Tests the complete flow: post task → auction runs → persona bids → +task assigned → agent executes → result returned. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + + +class TestFullSwarmLifecycle: + """Integration tests for end-to-end swarm task lifecycle.""" + + def test_post_task_creates_bidding_task(self, client): + """Posting a task should create it in BIDDING status.""" + response = client.post("/swarm/tasks", data={"description": "Test integration task"}) + assert response.status_code == 200 + + data = response.json() + assert "task_id" in data + assert data["status"] == "bidding" + + # Verify task exists and is in bidding status + task_response = client.get(f"/swarm/tasks/{data['task_id']}") + task = task_response.json() + assert task["status"] == "bidding" + + def test_post_task_and_auction_assigns_winner(self, client): + """Posting task with auction should assign it to a winner.""" + from swarm.coordinator import coordinator + + # Spawn an in-process agent that can bid + coordinator.spawn_in_process_agent("TestBidder") + + # Post task with auction + response = client.post("/swarm/tasks/auction", data={"description": "Task for auction"}) + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "assigned" + assert data["assigned_agent"] is not None + assert data["winning_bid"] is not None + + def test_complete_task_endpoint_updates_status(self, client): + """Complete endpoint should update task to COMPLETED status.""" + # Create and assign a task + client.post("/swarm/spawn", data={"name": "TestWorker"}) + auction_resp = client.post("/swarm/tasks/auction", data={"description": "Task to complete"}) + task_id = auction_resp.json()["task_id"] + + # Complete the task + complete_resp = client.post( + f"/swarm/tasks/{task_id}/complete", + data={"result": "Task completed successfully"}, + ) + assert complete_resp.status_code == 200 + + # Verify task is completed + task_resp = client.get(f"/swarm/tasks/{task_id}") + task = task_resp.json() + assert task["status"] == "completed" + assert task["result"] == "Task completed successfully" + + def test_fail_task_endpoint_updates_status(self, client): + """Fail endpoint should update task to FAILED status.""" + # Create and assign a task + client.post("/swarm/spawn", data={"name": "TestWorker"}) + auction_resp = client.post("/swarm/tasks/auction", data={"description": "Task to fail"}) + task_id = auction_resp.json()["task_id"] + + # Fail the task + fail_resp = client.post( + f"/swarm/tasks/{task_id}/fail", + data={"reason": "Task execution failed"}, + ) + assert fail_resp.status_code == 200 + + # Verify task is failed + task_resp = client.get(f"/swarm/tasks/{task_id}") + task = task_resp.json() + assert task["status"] == "failed" + + def test_agent_status_updated_on_assignment(self, client): + """Agent status should change to busy when assigned a task.""" + from swarm.coordinator import coordinator + + # Spawn in-process agent + result = coordinator.spawn_in_process_agent("StatusTestAgent") + agent_id = result["agent_id"] + + # Verify idle status + agents_resp = client.get("/swarm/agents") + agent = next(a for a in agents_resp.json()["agents"] if a["id"] == agent_id) + assert agent["status"] == "idle" + + # Assign task + client.post("/swarm/tasks/auction", data={"description": "Task for status test"}) + + # Verify busy status + agents_resp = client.get("/swarm/agents") + agent = next(a for a in agents_resp.json()["agents"] if a["id"] == agent_id) + assert agent["status"] == "busy" + + def test_agent_status_updated_on_completion(self, client): + """Agent status should return to idle when task completes.""" + # Spawn agent and assign task + spawn_resp = client.post("/swarm/spawn", data={"name": "CompleteTestAgent"}) + agent_id = spawn_resp.json()["agent_id"] + auction_resp = client.post("/swarm/tasks/auction", data={"description": "Task"}) + task_id = auction_resp.json()["task_id"] + + # Complete task + client.post(f"/swarm/tasks/{task_id}/complete", data={"result": "Done"}) + + # Verify idle status + agents_resp = client.get("/swarm/agents") + agent = next(a for a in agents_resp.json()["agents"] if a["id"] == agent_id) + assert agent["status"] == "idle" + + +class TestSwarmPersonaLifecycle: + """Integration tests for persona agent lifecycle.""" + + def test_spawn_persona_registers_with_capabilities(self, client): + """Spawning a persona should register with correct capabilities.""" + response = client.post("/swarm/spawn", data={"name": "Echo"}) + assert response.status_code == 200 + + data = response.json() + assert "agent_id" in data + + # Verify in agent list with correct capabilities + agents_resp = client.get("/swarm/agents") + agent = next(a for a in agents_resp.json()["agents"] if a["id"] == data["agent_id"]) + assert "echo" in agent.get("capabilities", "").lower() or agent["name"] == "Echo" + + def test_stop_agent_removes_from_registry(self, client): + """Stopping an agent should remove it from the registry.""" + # Spawn agent + spawn_resp = client.post("/swarm/spawn", data={"name": "TempAgent"}) + agent_id = spawn_resp.json()["agent_id"] + + # Verify exists + agents_before = client.get("/swarm/agents").json()["agents"] + assert any(a["id"] == agent_id for a in agents_before) + + # Stop agent + client.delete(f"/swarm/agents/{agent_id}") + + # Verify removed + agents_after = client.get("/swarm/agents").json()["agents"] + assert not any(a["id"] == agent_id for a in agents_after) + + def test_persona_bids_on_relevant_task(self, client): + """Persona should bid on tasks matching its specialty.""" + from swarm.coordinator import coordinator + + # Spawn a research persona (Echo) - this creates a bidding agent + coordinator.spawn_persona("echo") + + # Post a research-related task + response = client.post("/swarm/tasks", data={"description": "Research quantum computing"}) + task_id = response.json()["task_id"] + + # Run auction + import asyncio + asyncio.run(coordinator.run_auction_and_assign(task_id)) + + # Verify task was assigned (someone bid) + task_resp = client.get(f"/swarm/tasks/{task_id}") + task = task_resp.json() + assert task["status"] == "assigned" + assert task["assigned_agent"] is not None + + +class TestSwarmTaskFiltering: + """Integration tests for task filtering and listing.""" + + def test_list_tasks_by_status(self, client): + """Should be able to filter tasks by status.""" + # Create tasks in different statuses + client.post("/swarm/spawn", data={"name": "Worker"}) + + # Pending task (just created) + pending_resp = client.post("/swarm/tasks", data={"description": "Pending task"}) + pending_id = pending_resp.json()["task_id"] + + # Completed task + auction_resp = client.post("/swarm/tasks/auction", data={"description": "Completed task"}) + completed_id = auction_resp.json()["task_id"] + client.post(f"/swarm/tasks/{completed_id}/complete", data={"result": "Done"}) + + # Filter by status + completed_list = client.get("/swarm/tasks?status=completed").json()["tasks"] + assert any(t["id"] == completed_id for t in completed_list) + + bidding_list = client.get("/swarm/tasks?status=bidding").json()["tasks"] + assert any(t["id"] == pending_id for t in bidding_list) + + def test_get_nonexistent_task_returns_error(self, client): + """Getting a non-existent task should return appropriate error.""" + response = client.get("/swarm/tasks/nonexistent-id") + assert response.status_code == 200 # Endpoint returns 200 with error body + assert "error" in response.json() + + +class TestSwarmInsights: + """Integration tests for swarm learning insights.""" + + def test_swarm_insights_endpoint(self, client): + """Insights endpoint should return agent metrics.""" + response = client.get("/swarm/insights") + assert response.status_code == 200 + + data = response.json() + assert "agents" in data + + def test_agent_insights_endpoint(self, client): + """Agent-specific insights should return metrics for that agent.""" + # Spawn an agent + spawn_resp = client.post("/swarm/spawn", data={"name": "InsightsAgent"}) + agent_id = spawn_resp.json()["agent_id"] + + response = client.get(f"/swarm/insights/{agent_id}") + assert response.status_code == 200 + + data = response.json() + assert data["agent_id"] == agent_id + assert "total_bids" in data diff --git a/tests/test_timmy_serve_cli.py b/tests/test_timmy_serve_cli.py index 3be8da86..6166b06c 100644 --- a/tests/test_timmy_serve_cli.py +++ b/tests/test_timmy_serve_cli.py @@ -9,27 +9,32 @@ runner = CliRunner() class TestStartCommand: def test_start_default_port(self): - result = runner.invoke(app, ["start"]) + result = runner.invoke(app, ["start", "--dry-run"]) assert result.exit_code == 0 assert "8402" in result.output assert "L402 payment proxy active" in result.output def test_start_custom_port(self): - result = runner.invoke(app, ["start", "--port", "9000"]) + result = runner.invoke(app, ["start", "--port", "9000", "--dry-run"]) assert result.exit_code == 0 assert "9000" in result.output def test_start_custom_host(self): - result = runner.invoke(app, ["start", "--host", "127.0.0.1"]) + result = runner.invoke(app, ["start", "--host", "127.0.0.1", "--dry-run"]) assert result.exit_code == 0 assert "127.0.0.1" in result.output def test_start_shows_endpoints(self): - result = runner.invoke(app, ["start"]) + result = runner.invoke(app, ["start", "--dry-run"]) assert "/serve/chat" in result.output assert "/serve/invoice" in result.output assert "/serve/status" in result.output + def test_start_custom_price(self): + result = runner.invoke(app, ["start", "--price", "50", "--dry-run"]) + assert result.exit_code == 0 + assert "50 sats" in result.output + class TestInvoiceCommand: def test_invoice_default_amount(self):