Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
985488bcbe feat: add A2A task delegation over mTLS (#804)
All checks were successful
Lint / lint (pull_request) Successful in 11s
2026-04-22 11:14:26 -04:00
Alexander Whitestone
524868d4f4 test: add red coverage for A2A task delegation (#804) 2026-04-22 11:09:18 -04:00
10 changed files with 614 additions and 274 deletions

View File

@@ -29,6 +29,8 @@ import logging
import os
import ssl
import threading
import time
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Callable, Dict, Optional
@@ -441,3 +443,244 @@ class A2AMTLSClient:
def post(self, url: str, json: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Dict[str, Any]:
data = (__import__("json").dumps(json).encode() if json is not None else None)
return self._request("POST", url, data=data, **kwargs)
# ---------------------------------------------------------------------------
# Structured A2A task delegation over mTLS
# ---------------------------------------------------------------------------
_TERMINAL_TASK_STATES = {"completed", "failed", "canceled", "rejected"}
def _iso_now() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _task_status(state: str, message: str) -> Dict[str, Any]:
return {
"state": state,
"message": message,
"timestamp": _iso_now(),
}
def _coerce_artifact(result: Any) -> Dict[str, Any]:
if isinstance(result, dict):
if "text" in result:
return result
if "artifact" in result and isinstance(result["artifact"], dict):
return result["artifact"]
return {"text": str(result)}
def _build_task_record(task_id: str, task: str, requester: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return {
"taskId": task_id,
"task": task,
"requester": requester,
"metadata": metadata or {},
"artifacts": [],
"status": _task_status("submitted", "Task submitted"),
}
def _default_agent_card(host: str, port: int) -> Dict[str, Any]:
base_url = f"https://{host}:{port}"
try:
from agent.agent_card import build_agent_card
from dataclasses import asdict
card = asdict(build_agent_card())
except Exception as exc: # pragma: no cover - fallback only exercised when card build breaks
logger.warning("Falling back to minimal agent card: %s", exc)
card = {
"name": os.environ.get("HERMES_AGENT_NAME", "hermes"),
"description": "Hermes A2A task server",
"version": "unknown",
}
card["url"] = base_url
card["a2aTaskEndpoint"] = f"{base_url}/a2a/rpc"
return card
def _default_local_hermes_executor(task_payload: Dict[str, Any]) -> Dict[str, Any]:
task_text = str(task_payload.get("task", "")).strip()
if not task_text:
return {"text": ""}
from run_agent import AIAgent
agent = AIAgent(quiet_mode=True)
result = agent.chat(task_text)
return {
"text": result,
"metadata": {"executor": "local-hermes"},
}
class A2ATaskServer:
"""JSON-RPC A2A task server running over the routing mTLS server."""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
host: str = "127.0.0.1",
port: int = 9443,
executor: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
card_factory: Optional[Callable[[], Dict[str, Any]]] = None,
) -> None:
self.host = host
self.port = port
self._server = A2AMTLSServer(cert=cert, key=key, ca=ca, host=host, port=port)
self._executor = executor or _default_local_hermes_executor
self._card_factory = card_factory or (lambda: _default_agent_card(self.host, self.port))
self._tasks: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
self._server.add_route("/.well-known/agent-card.json", self._handle_agent_card)
self._server.add_route("/agent-card.json", self._handle_agent_card)
self._server.add_route("/a2a/rpc", self._handle_rpc)
def __enter__(self) -> "A2ATaskServer":
self.start()
return self
def __exit__(self, *_: Any) -> None:
self.stop()
def start(self) -> None:
self._server.start()
def stop(self) -> None:
self._server.stop()
def _handle_agent_card(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
return self._card_factory()
def _handle_rpc(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
req_id = payload.get("id")
if payload.get("jsonrpc") != "2.0":
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32600, "message": "invalid jsonrpc version"}}
method = payload.get("method")
params = payload.get("params") or {}
try:
if method == "tasks/send":
result = self._rpc_send_task(params, peer_cn=peer_cn)
elif method == "tasks/get":
result = self._rpc_get_task(params)
else:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"unknown method: {method}"}}
except Exception as exc:
logger.exception("A2A task RPC failed: %s", exc)
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(exc)}}
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def _rpc_send_task(self, params: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
task_text = str(params.get("task", "")).strip()
if not task_text:
raise ValueError("task is required")
task_id = params.get("taskId") or uuid.uuid4().hex
requester = params.get("requester") or peer_cn
metadata = dict(params.get("metadata") or {})
if peer_cn:
metadata.setdefault("peer_cn", peer_cn)
record = _build_task_record(task_id, task_text, requester, metadata)
with self._lock:
self._tasks[task_id] = record
worker = threading.Thread(target=self._run_task, args=(task_id,), daemon=True, name=f"a2a-task-{task_id[:8]}")
worker.start()
return self._copy_task(task_id)
def _rpc_get_task(self, params: Dict[str, Any]) -> Dict[str, Any]:
task_id = str(params.get("taskId", "")).strip()
if not task_id:
raise ValueError("taskId is required")
return self._copy_task(task_id)
def _copy_task(self, task_id: str) -> Dict[str, Any]:
with self._lock:
if task_id not in self._tasks:
raise KeyError(f"unknown taskId: {task_id}")
return json.loads(json.dumps(self._tasks[task_id]))
def _run_task(self, task_id: str) -> None:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("working", "Task is running")
task_payload = {
"taskId": task["taskId"],
"task": task["task"],
"requester": task.get("requester"),
"metadata": dict(task.get("metadata") or {}),
}
try:
result = self._executor(task_payload)
artifact = _coerce_artifact(result)
with self._lock:
task = self._tasks[task_id]
task["artifacts"] = [artifact]
task["status"] = _task_status("completed", "Task completed")
except Exception as exc:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("failed", f"Task failed: {exc}")
class A2ATaskClient(A2AMTLSClient):
"""Client helper for A2A JSON-RPC task send/get flows."""
def discover_card(self, base_url: str) -> Dict[str, Any]:
return self.get(f"{base_url.rstrip('/')}/.well-known/agent-card.json")
def _rpc_call(self, base_url: str, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
payload = {
"jsonrpc": "2.0",
"id": uuid.uuid4().hex,
"method": method,
"params": params,
}
response = self.post(f"{base_url.rstrip('/')}/a2a/rpc", json=payload)
if "error" in response:
error = response["error"]
raise RuntimeError(error.get("message") or str(error))
return response.get("result", {})
def send_task(
self,
base_url: str,
*,
task: str,
requester: str | None = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
return self._rpc_call(
base_url,
"tasks/send",
{
"task": task,
"requester": requester,
"metadata": metadata or {},
},
)
def get_task(self, base_url: str, task_id: str) -> Dict[str, Any]:
return self._rpc_call(base_url, "tasks/get", {"taskId": task_id})
def wait_for_task(
self,
base_url: str,
task_id: str,
*,
timeout: float = 30.0,
poll_interval: float = 0.5,
) -> Dict[str, Any]:
deadline = time.monotonic() + timeout
while True:
task = self.get_task(base_url, task_id)
state = str(((task.get("status") or {}).get("state") or "")).lower()
if state in _TERMINAL_TASK_STATES:
return task
if time.monotonic() >= deadline:
raise TimeoutError(f"Timed out waiting for task {task_id}")
time.sleep(poll_interval)

132
hermes_cli/a2a_cmd.py Normal file
View File

@@ -0,0 +1,132 @@
"""CLI helpers for A2A task delegation."""
from __future__ import annotations
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Any
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
from hermes_cli.config import get_hermes_home
def _registry_path() -> Path:
return get_hermes_home() / "a2a_agents.json"
def _default_identity_paths() -> tuple[str, str, str]:
hermes_home = get_hermes_home()
agent_name = os.environ.get("HERMES_AGENT_NAME", "hermes").lower()
cert = os.environ.get(
"HERMES_A2A_CERT",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.crt"),
)
key = os.environ.get(
"HERMES_A2A_KEY",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.key"),
)
ca = os.environ.get(
"HERMES_A2A_CA",
str(hermes_home / "pki" / "ca" / "fleet-ca.crt"),
)
return cert, key, ca
def load_agent_registry(path: Path | None = None) -> dict[str, Any]:
registry_path = path or _registry_path()
if not registry_path.exists():
return {}
return json.loads(registry_path.read_text(encoding="utf-8"))
def resolve_agent_url(agent: str, *, registry_path: Path | None = None) -> str:
key = re.sub(r"[^A-Za-z0-9]+", "_", agent).upper()
env_value = os.getenv(f"HERMES_A2A_{key}_URL")
if env_value:
return env_value
registry = load_agent_registry(registry_path)
entry = registry.get(agent)
if isinstance(entry, str) and entry:
return entry
if isinstance(entry, dict):
url = entry.get("url") or entry.get("base_url") or entry.get("card_url")
if url:
return str(url)
if agent.startswith("https://") or agent.startswith("http://"):
return agent
raise SystemExit(f"Unknown A2A agent '{agent}'. Set HERMES_A2A_{key}_URL or add it to {_registry_path()}.")
def _print(data: dict[str, Any]) -> None:
print(json.dumps(data, indent=2, ensure_ascii=False))
def cmd_send(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
card = client.discover_card(base_url)
task = client.send_task(
base_url,
task=args.task,
requester=args.requester,
metadata={"agent": args.agent},
)
if args.wait:
task = client.wait_for_task(
base_url,
task["taskId"],
timeout=args.timeout,
poll_interval=args.poll_interval,
)
_print({
"agent": args.agent,
"url": base_url,
"card": card,
"task": task,
})
def cmd_status(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
task = client.get_task(base_url, args.task_id)
_print({"agent": args.agent, "url": base_url, "task": task})
def cmd_serve(args) -> None:
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
server = A2ATaskServer(cert=cert, key=key, ca=ca, host=args.host, port=args.port)
server.start()
print(f"A2A task server listening on https://{args.host}:{args.port}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
def cmd_a2a(args) -> None:
command = getattr(args, "a2a_command", None) or "send"
if command == "send":
cmd_send(args)
return
if command == "status":
cmd_status(args)
return
if command == "serve":
cmd_serve(args)
return
raise SystemExit(f"Unknown a2a command: {command}")

View File

@@ -173,6 +173,13 @@ from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
def cmd_a2a(args):
"""Dispatch A2A CLI subcommands lazily to avoid heavy imports at startup."""
from hermes_cli.a2a_cmd import cmd_a2a as _cmd_a2a
return _cmd_a2a(args)
def _relative_time(ts) -> str:
"""Format a timestamp as relative time (e.g., '2h ago', 'yesterday')."""
if not ts:
@@ -4781,6 +4788,45 @@ For more help on a command:
gateway_parser.set_defaults(func=cmd_gateway)
# =========================================================================
# a2a command
# =========================================================================
a2a_parser = subparsers.add_parser(
"a2a",
help="A2A task delegation over mutual TLS",
description="Send, inspect, and serve structured A2A tasks between Hermes agents",
)
a2a_subparsers = a2a_parser.add_subparsers(dest="a2a_command")
a2a_send = a2a_subparsers.add_parser("send", help="Send an A2A task to another agent")
a2a_send.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_send.add_argument("--task", required=True, help="Task text to delegate")
a2a_send.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_send.add_argument("--requester", default=None, help="Requester label included in task metadata")
a2a_send.add_argument("--wait", action="store_true", help="Poll until the task reaches a terminal state")
a2a_send.add_argument("--timeout", type=float, default=30.0, help="Wait timeout in seconds (default: 30)")
a2a_send.add_argument("--poll-interval", type=float, default=0.5, help="Polling interval in seconds while waiting (default: 0.5)")
a2a_send.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_send.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_send.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_status = a2a_subparsers.add_parser("status", help="Fetch the current status of an A2A task")
a2a_status.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_status.add_argument("--task-id", required=True, help="Task identifier returned by a2a send")
a2a_status.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_status.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_status.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_status.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_serve = a2a_subparsers.add_parser("serve", help="Run the local A2A task server")
a2a_serve.add_argument("--host", default=os.environ.get("HERMES_A2A_HOST", "127.0.0.1"), help="Bind host (default: HERMES_A2A_HOST or 127.0.0.1)")
a2a_serve.add_argument("--port", type=int, default=int(os.environ.get("HERMES_A2A_PORT", "9443")), help="Bind port (default: HERMES_A2A_PORT or 9443)")
a2a_serve.add_argument("--cert", default=None, help="Server certificate path (defaults from HERMES_A2A_CERT)")
a2a_serve.add_argument("--key", default=None, help="Server private key path (defaults from HERMES_A2A_KEY)")
a2a_serve.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_parser.set_defaults(func=cmd_a2a)
# =========================================================================
# setup command
# =========================================================================

View File

@@ -57,7 +57,7 @@ CONFIGURABLE_TOOLSETS = [
("moa", "🧠 Mixture of Agents", "mixture_of_agents"),
("tts", "🔊 Text-to-Speech", "text_to_speech"),
("skills", "📚 Skills", "list, view, manage"),
("todo", "📋 Task Planning", "todo, ultraplan"),
("todo", "📋 Task Planning", "todo"),
("memory", "💾 Memory", "persistent memory across sessions"),
("session_search", "🔎 Session Search", "search past conversations"),
("clarify", "❓ Clarifying Questions", "clarify"),

View File

@@ -572,3 +572,94 @@ class TestA2AMTLSServerAndClient:
assert not errors, f"Concurrent connection errors: {errors}"
assert len(results) == 3
@_requires_crypto
class TestA2ATaskServerAndClient:
"""Structured A2A task send/get flow over mTLS."""
@pytest.fixture(autouse=True)
def _pki(self, tmp_path):
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
self.ca_crt, self.ca_key = _make_ca_keypair(ca_dir)
agent_dir = tmp_path / "agents"
agent_dir.mkdir()
self.srv_crt, self.srv_key = _make_agent_keypair(
agent_dir, "timmy", self.ca_crt, self.ca_key
)
self.cli_crt, self.cli_key = _make_agent_keypair(
agent_dir, "allegro", self.ca_crt, self.ca_key
)
@pytest.fixture()
def task_server(self):
from agent.a2a_mtls import A2ATaskServer
gate = threading.Event()
def analyze_executor(task: dict[str, object]) -> dict[str, object]:
gate.wait(timeout=2)
text = str(task.get("task", ""))
return {
"text": f"analysis:{text}",
"metadata": {"tool": "local-hermes-stub"},
}
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=analyze_executor,
)
with server:
time.sleep(0.1)
yield server, port, gate
def test_task_send_get_and_completion_flow(self, task_server):
from agent.a2a_mtls import A2ATaskClient
server, port, gate = task_server
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
card = client.discover_card(base_url)
assert card["name"]
submitted = client.send_task(base_url, task="Analyze README.md", requester="timmy")
assert submitted["status"]["state"] in {"submitted", "working"}
in_flight = client.get_task(base_url, submitted["taskId"])
assert in_flight["status"]["state"] in {"submitted", "working"}
gate.set()
completed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert completed["status"]["state"] == "completed"
assert completed["artifacts"][0]["text"] == "analysis:Analyze README.md"
def test_failed_executor_marks_task_failed(self):
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
def failing_executor(task: dict[str, object]) -> dict[str, object]:
raise RuntimeError("boom")
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=failing_executor,
)
with server:
time.sleep(0.1)
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
submitted = client.send_task(base_url, task="explode", requester="timmy")
failed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert failed["status"]["state"] == "failed"
assert "boom" in failed["status"]["message"]

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from unittest.mock import patch
import pytest
def test_cmd_send_uses_registry_and_waits_for_terminal_task(tmp_path, monkeypatch, capsys):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "a2a_agents.json").write_text(
json.dumps({"allegro": {"url": "https://127.0.0.1:9443"}}),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def discover_card(self, base_url: str):
assert base_url == "https://127.0.0.1:9443"
return {"name": "allegro", "url": base_url}
def send_task(self, base_url: str, *, task: str, requester: str | None = None, metadata=None):
assert task == "analyze README"
return {"taskId": "task-123", "status": {"state": "submitted"}}
def wait_for_task(self, base_url: str, task_id: str, *, timeout: float, poll_interval: float):
assert task_id == "task-123"
return {
"taskId": task_id,
"status": {"state": "completed"},
"artifacts": [{"text": "README looks healthy"}],
}
args = argparse.Namespace(
a2a_command="send",
agent="allegro",
task="analyze README",
url=None,
wait=True,
timeout=5.0,
poll_interval=0.01,
requester="timmy",
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with patch("hermes_cli.a2a_cmd.A2ATaskClient", FakeClient):
cmd_a2a(args)
result = json.loads(capsys.readouterr().out)
assert result["agent"] == "allegro"
assert result["card"]["name"] == "allegro"
assert result["task"]["status"]["state"] == "completed"
assert result["task"]["artifacts"][0]["text"] == "README looks healthy"
def test_resolve_agent_url_supports_env_override(monkeypatch):
monkeypatch.setenv("HERMES_A2A_ALLEGRO_URL", "https://fleet-allegro:9443")
from hermes_cli.a2a_cmd import resolve_agent_url
assert resolve_agent_url("allegro") == "https://fleet-allegro:9443"
def test_cmd_send_requires_known_agent(tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
args = argparse.Namespace(
a2a_command="send",
agent="unknown",
task="do work",
url=None,
wait=False,
timeout=5.0,
poll_interval=0.05,
requester=None,
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with pytest.raises(SystemExit):
cmd_a2a(args)

View File

@@ -294,32 +294,22 @@ class TestBuiltinDiscovery:
"tools.browser_tool",
"tools.clarify_tool",
"tools.code_execution_tool",
"tools.crisis_tool",
"tools.cronjob_tools",
"tools.delegate_tool",
"tools.file_tools",
"tools.homeassistant_tool",
"tools.image_generation_tool",
"tools.local_inference_tool",
"tools.memory_tool",
"tools.mixture_of_agents_tool",
"tools.process_registry",
"tools.rl_training_tool",
"tools.scavenger_fixer",
"tools.send_message_tool",
"tools.session_search_tool",
"tools.skill_manager_tool",
"tools.skills_tool",
"tools.sovereign_router",
"tools.sovereign_scavenger",
"tools.sovereign_teleport",
"tools.static_analyzer",
"tools.symbolic_verify",
"tools.terminal_tool",
"tools.todo_tool",
"tools.tts_tool",
"tools.ultraplan",
"tools.verify_tool",
"tools.vision_tools",
"tools.web_tools",
}

View File

@@ -1,81 +0,0 @@
import json
from pathlib import Path
from toolsets import resolve_toolset
from tools.registry import registry
def test_create_action_saves_markdown_and_json(tmp_path):
from tools.ultraplan import ultraplan_tool
result = json.loads(
ultraplan_tool(
action="create",
mission="Daily autonomous planning",
streams=[
{
"id": "A",
"name": "Backlog burn",
"phases": [
{"id": "A1", "name": "Triage", "artifact": "issue list"},
{"id": "A2", "name": "Ship", "dependencies": ["A1"], "artifact": "PR"},
],
}
],
base_dir=str(tmp_path),
)
)
assert result["success"] is True
assert Path(result["file_path"]).exists()
assert Path(result["json_path"]).exists()
assert "Work Streams" in Path(result["file_path"]).read_text(encoding="utf-8")
def test_load_action_returns_saved_plan(tmp_path):
from tools.ultraplan import ultraplan_tool
created = json.loads(
ultraplan_tool(
action="create",
date="20260422",
mission="Mission from saved plan",
base_dir=str(tmp_path),
)
)
loaded = json.loads(
ultraplan_tool(
action="load",
date="20260422",
base_dir=str(tmp_path),
)
)
assert created["success"] is True
assert loaded["success"] is True
assert loaded["plan"]["mission"] == "Mission from saved plan"
assert loaded["file_path"].endswith("ultraplan_20260422.md")
def test_cron_spec_returns_daily_schedule_and_prompt():
from tools.ultraplan import ultraplan_tool
result = json.loads(ultraplan_tool(action="cron_spec"))
assert result["success"] is True
assert result["schedule"] == "0 6 * * *"
assert "Ultraplan" in result["prompt"]
assert "ultraplan_YYYYMMDD.md" in result["prompt"]
def test_registry_registers_ultraplan_tool():
import tools.ultraplan # noqa: F401
entry = registry.get_entry("ultraplan")
assert entry is not None
assert entry.toolset == "todo"
def test_default_toolsets_include_ultraplan():
assert "ultraplan" in resolve_toolset("todo")
assert "ultraplan" in resolve_toolset("hermes-cli")

View File

@@ -290,9 +290,6 @@ def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
return None
DEFAULT_ULTRAPLAN_SCHEDULE = "0 6 * * *"
def generate_daily_cron_prompt() -> str:
"""Generate the prompt for the daily ultraplan cron job."""
return """Generate today's Ultraplan.
@@ -301,9 +298,9 @@ Steps:
1. Check open Gitea issues assigned to you
2. Check open PRs needing review
3. Check fleet health status
4. Decompose work into parallel streams with concrete phases and artifacts
5. Use the ultraplan tool to save ~/.timmy/cron/ultraplan_YYYYMMDD.md and the matching JSON sidecar
6. Optionally file a Gitea issue with the plan summary
4. Decompose work into parallel streams
5. Generate ultraplan_YYYYMMDD.md
6. File Gitea issue with the plan
Output format:
- Mission statement
@@ -311,176 +308,3 @@ Output format:
- Dependency map
- Success metrics
"""
def generate_daily_cron_job_spec(schedule: str = DEFAULT_ULTRAPLAN_SCHEDULE) -> Dict[str, str]:
"""Return a reusable cron job spec for daily Ultraplan generation."""
return {
"name": "Daily Ultraplan",
"schedule": schedule,
"prompt": generate_daily_cron_prompt(),
"path_pattern": "~/.timmy/cron/ultraplan_YYYYMMDD.md",
}
def _resolve_base_dir(base_dir: Optional[str | Path]) -> Path:
"""Normalize the requested Ultraplan base directory."""
if base_dir is None:
return Path.home() / ".timmy" / "cron"
return Path(base_dir).expanduser()
def ultraplan_tool(
action: str,
date: Optional[str] = None,
mission: str = "",
streams: Optional[List[Dict[str, Any]]] = None,
metrics: Optional[Dict[str, Any]] = None,
notes: str = "",
base_dir: Optional[str] = None,
) -> str:
"""Create/load Ultraplan artifacts and expose a daily cron spec."""
from tools.registry import tool_error, tool_result
action = (action or "").strip().lower()
resolved_base_dir = _resolve_base_dir(base_dir)
try:
if action == "create":
plan = create_ultraplan(date=date, mission=mission, streams=streams or [])
if metrics:
plan.metrics = metrics
if notes:
plan.notes = notes
md_path = save_ultraplan(plan, base_dir=resolved_base_dir)
json_path = resolved_base_dir / f"ultraplan_{plan.date}.json"
return tool_result(
success=True,
action="create",
date=plan.date,
file_path=str(md_path),
json_path=str(json_path),
plan=plan.to_dict(),
)
if action == "load":
plan_date = date or datetime.now().strftime("%Y%m%d")
plan = load_ultraplan(plan_date, base_dir=resolved_base_dir)
if plan is None:
return tool_error(
f"No Ultraplan found for {plan_date}",
success=False,
action="load",
date=plan_date,
)
return tool_result(
success=True,
action="load",
date=plan.date,
file_path=str(resolved_base_dir / f"ultraplan_{plan.date}.md"),
json_path=str(resolved_base_dir / f"ultraplan_{plan.date}.json"),
plan=plan.to_dict(),
markdown=plan.to_markdown(),
)
if action == "cron_spec":
spec = generate_daily_cron_job_spec()
return tool_result(success=True, action="cron_spec", **spec)
return tool_error(
f"Unknown Ultraplan action: {action}",
success=False,
action=action,
)
except Exception as e:
return tool_error(f"Ultraplan {action or 'tool'} failed: {e}", success=False, action=action)
ULTRAPLAN_SCHEMA = {
"name": "ultraplan",
"description": (
"Create or load daily Ultraplan planning artifacts under ~/.timmy/cron/ and "
"return a reusable cron spec for autonomous planning. Use this when you want "
"a concrete markdown/json plan file with streams, phases, dependencies, and metrics."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["create", "load", "cron_spec"],
"description": "Operation to perform",
},
"date": {
"type": "string",
"description": "Plan date as YYYYMMDD. Defaults to today for create/load.",
},
"mission": {
"type": "string",
"description": "High-level mission statement for today's plan.",
},
"streams": {
"type": "array",
"description": "Optional work streams with phases/artifacts/dependencies for create.",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"phases": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"description": {"type": "string"},
"artifact": {"type": "string"},
"dependencies": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["name"],
},
},
},
"required": ["name"],
},
},
"metrics": {
"type": "object",
"description": "Optional success metrics to store on the plan.",
"additionalProperties": True,
},
"notes": {
"type": "string",
"description": "Optional free-form notes appended to the saved plan.",
},
"base_dir": {
"type": "string",
"description": "Optional override for the Ultraplan storage directory.",
},
},
"required": ["action"],
},
}
from tools.registry import registry
registry.register(
name="ultraplan",
toolset="todo",
schema=ULTRAPLAN_SCHEMA,
handler=lambda args, **_kw: ultraplan_tool(
action=args.get("action", ""),
date=args.get("date"),
mission=args.get("mission", ""),
streams=args.get("streams"),
metrics=args.get("metrics"),
notes=args.get("notes", ""),
base_dir=args.get("base_dir"),
),
emoji="🗺️",
)

View File

@@ -47,7 +47,7 @@ _HERMES_CORE_TOOLS = [
# Text-to-speech
"text_to_speech",
# Planning & memory
"todo", "ultraplan", "memory",
"todo", "memory",
# Session history search
"session_search",
# Clarifying questions
@@ -157,8 +157,8 @@ TOOLSETS = {
},
"todo": {
"description": "Task planning and tracking for multi-step work, including daily Ultraplan artifacts",
"tools": ["todo", "ultraplan"],
"description": "Task planning and tracking for multi-step work",
"tools": ["todo"],
"includes": []
},