feat: restore ACP server implementation from PR #949 (#1254)

Restore the ACP editor-integration implementation that was present on the
original PR branch but did not actually land in main.

Includes:
- acp_adapter/ server, session manager, event bridge, auth, permissions,
  and tool helpers
- hermes acp subcommand and hermes-acp entry point
- hermes-acp curated toolset
- ACP registry manifest, setup guide, and ACP test suite
- jupyter-live-kernel data science skill from the original branch

Also updates the revived ACP code for current main by:
- resolving runtime providers through the modern shared provider router
- binding ACP sessions to per-session cwd task overrides
- tracking duplicate same-name tool calls with FIFO IDs
- restoring terminal approval callbacks after prompts
- normalizing supporting docs/skill metadata

Validated with tests/acp and the full pytest suite (-n0).
This commit is contained in:
Teknium
2026-03-14 00:09:05 -07:00
committed by GitHub
parent 2fe853bcc9
commit 25481d4286
24 changed files with 2625 additions and 6 deletions

1
acp_adapter/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""ACP (Agent Communication Protocol) adapter for hermes-agent."""

5
acp_adapter/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Allow running the ACP adapter as ``python -m acp_adapter``."""
from .entry import main
main()

24
acp_adapter/auth.py Normal file
View File

@@ -0,0 +1,24 @@
"""ACP auth helpers — detect the currently configured Hermes provider."""
from __future__ import annotations
from typing import Optional
def detect_provider() -> Optional[str]:
"""Resolve the active Hermes runtime provider, or None if unavailable."""
try:
from hermes_cli.runtime_provider import resolve_runtime_provider
runtime = resolve_runtime_provider()
api_key = runtime.get("api_key")
provider = runtime.get("provider")
if isinstance(api_key, str) and api_key.strip() and isinstance(provider, str) and provider.strip():
return provider.strip().lower()
except Exception:
return None
return None
def has_provider() -> bool:
"""Return True if Hermes can resolve any runtime provider credentials."""
return detect_provider() is not None

88
acp_adapter/entry.py Normal file
View File

@@ -0,0 +1,88 @@
"""CLI entry point for the hermes-agent ACP adapter.
Loads environment variables from ``~/.hermes/.env``, configures logging
to write to stderr (so stdout is reserved for ACP JSON-RPC transport),
and starts the ACP agent server.
Usage::
python -m acp_adapter.entry
# or
hermes acp
# or
hermes-acp
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
def _setup_logging() -> None:
"""Route all logging to stderr so stdout stays clean for ACP stdio."""
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(logging.INFO)
# Quiet down noisy libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
def _load_env() -> None:
"""Load .env from HERMES_HOME (default ``~/.hermes``)."""
from dotenv import load_dotenv
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
env_file = hermes_home / ".env"
if env_file.exists():
try:
load_dotenv(dotenv_path=env_file, encoding="utf-8")
except UnicodeDecodeError:
load_dotenv(dotenv_path=env_file, encoding="latin-1")
logging.getLogger(__name__).info("Loaded env from %s", env_file)
else:
logging.getLogger(__name__).info(
"No .env found at %s, using system env", env_file
)
def main() -> None:
"""Entry point: load env, configure logging, run the ACP agent."""
_setup_logging()
_load_env()
logger = logging.getLogger(__name__)
logger.info("Starting hermes-agent ACP adapter")
# Ensure the project root is on sys.path so ``from run_agent import AIAgent`` works
project_root = str(Path(__file__).resolve().parent.parent)
if project_root not in sys.path:
sys.path.insert(0, project_root)
import acp
from .server import HermesACPAgent
agent = HermesACPAgent()
try:
asyncio.run(acp.run_agent(agent))
except KeyboardInterrupt:
logger.info("Shutting down (KeyboardInterrupt)")
except Exception:
logger.exception("ACP agent crashed")
sys.exit(1)
if __name__ == "__main__":
main()

171
acp_adapter/events.py Normal file
View File

@@ -0,0 +1,171 @@
"""Callback factories for bridging AIAgent events to ACP notifications.
Each factory returns a callable with the signature that AIAgent expects
for its callbacks. Internally, the callbacks push ACP session updates
to the client via ``conn.session_update()`` using
``asyncio.run_coroutine_threadsafe()`` (since AIAgent runs in a worker
thread while the event loop lives on the main thread).
"""
import asyncio
import json
import logging
from collections import defaultdict, deque
from typing import Any, Callable, Deque, Dict
import acp
from .tools import (
build_tool_complete,
build_tool_start,
make_tool_call_id,
)
logger = logging.getLogger(__name__)
def _send_update(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
update: Any,
) -> None:
"""Fire-and-forget an ACP session update from a worker thread."""
try:
future = asyncio.run_coroutine_threadsafe(
conn.session_update(session_id, update), loop
)
future.result(timeout=5)
except Exception:
logger.debug("Failed to send ACP update", exc_info=True)
# ------------------------------------------------------------------
# Tool progress callback
# ------------------------------------------------------------------
def make_tool_progress_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
tool_call_ids: Dict[str, Deque[str]],
) -> Callable:
"""Create a ``tool_progress_callback`` for AIAgent.
Signature expected by AIAgent::
tool_progress_callback(name: str, preview: str, args: dict)
Emits ``ToolCallStart`` for each tool invocation and tracks IDs in a FIFO
queue per tool name so duplicate/parallel same-name calls still complete
against the correct ACP tool call.
"""
def _tool_progress(name: str, preview: str, args: Any = None) -> None:
if isinstance(args, str):
try:
args = json.loads(args)
except (json.JSONDecodeError, TypeError):
args = {"raw": args}
if not isinstance(args, dict):
args = {}
tc_id = make_tool_call_id()
queue = tool_call_ids.get(name)
if queue is None:
queue = deque()
tool_call_ids[name] = queue
elif isinstance(queue, str):
queue = deque([queue])
tool_call_ids[name] = queue
queue.append(tc_id)
update = build_tool_start(tc_id, name, args)
_send_update(conn, session_id, loop, update)
return _tool_progress
# ------------------------------------------------------------------
# Thinking callback
# ------------------------------------------------------------------
def make_thinking_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
) -> Callable:
"""Create a ``thinking_callback`` for AIAgent."""
def _thinking(text: str) -> None:
if not text:
return
update = acp.update_agent_thought_text(text)
_send_update(conn, session_id, loop, update)
return _thinking
# ------------------------------------------------------------------
# Step callback
# ------------------------------------------------------------------
def make_step_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
tool_call_ids: Dict[str, Deque[str]],
) -> Callable:
"""Create a ``step_callback`` for AIAgent.
Signature expected by AIAgent::
step_callback(api_call_count: int, prev_tools: list)
"""
def _step(api_call_count: int, prev_tools: Any = None) -> None:
if prev_tools and isinstance(prev_tools, list):
for tool_info in prev_tools:
tool_name = None
result = None
if isinstance(tool_info, dict):
tool_name = tool_info.get("name") or tool_info.get("function_name")
result = tool_info.get("result") or tool_info.get("output")
elif isinstance(tool_info, str):
tool_name = tool_info
queue = tool_call_ids.get(tool_name or "")
if isinstance(queue, str):
queue = deque([queue])
tool_call_ids[tool_name] = queue
if tool_name and queue:
tc_id = queue.popleft()
update = build_tool_complete(
tc_id, tool_name, result=str(result) if result is not None else None
)
_send_update(conn, session_id, loop, update)
if not queue:
tool_call_ids.pop(tool_name, None)
return _step
# ------------------------------------------------------------------
# Agent message callback
# ------------------------------------------------------------------
def make_message_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
) -> Callable:
"""Create a callback that streams agent response text to the editor."""
def _message(text: str) -> None:
if not text:
return
update = acp.update_agent_message_text(text)
_send_update(conn, session_id, loop, update)
return _message

View File

@@ -0,0 +1,80 @@
"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks."""
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import TimeoutError as FutureTimeout
from typing import Any, Callable, Optional
from acp.schema import (
AllowedOutcome,
DeniedOutcome,
PermissionOption,
RequestPermissionRequest,
SelectedPermissionOutcome,
)
logger = logging.getLogger(__name__)
# Maps ACP PermissionOptionKind -> hermes approval result strings
_KIND_TO_HERMES = {
"allow_once": "once",
"allow_always": "always",
"reject_once": "deny",
"reject_always": "deny",
}
def make_approval_callback(
request_permission_fn: Callable,
loop: asyncio.AbstractEventLoop,
session_id: str,
timeout: float = 60.0,
) -> Callable[[str, str], str]:
"""
Return a hermes-compatible ``approval_callback(command, description) -> str``
that bridges to the ACP client's ``request_permission`` call.
Args:
request_permission_fn: The ACP connection's ``request_permission`` coroutine.
loop: The event loop on which the ACP connection lives.
session_id: Current ACP session id.
timeout: Seconds to wait for a response before auto-denying.
"""
def _callback(command: str, description: str) -> str:
options = [
PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"),
PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"),
PermissionOption(option_id="deny", kind="reject_once", name="Deny"),
]
import acp as _acp
tool_call = _acp.start_tool_call("perm-check", command, kind="execute")
coro = request_permission_fn(
session_id=session_id,
tool_call=tool_call,
options=options,
)
try:
future = asyncio.run_coroutine_threadsafe(coro, loop)
response = future.result(timeout=timeout)
except (FutureTimeout, Exception) as exc:
logger.warning("Permission request timed out or failed: %s", exc)
return "deny"
outcome = response.outcome
if isinstance(outcome, AllowedOutcome):
option_id = outcome.option_id
# Look up the kind from our options list
for opt in options:
if opt.option_id == option_id:
return _KIND_TO_HERMES.get(opt.kind, "deny")
return "once" # fallback for unknown option_id
else:
return "deny"
return _callback

333
acp_adapter/server.py Normal file
View File

@@ -0,0 +1,333 @@
"""ACP agent server — exposes Hermes Agent via the Agent Client Protocol."""
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Deque, Optional
import acp
from acp.schema import (
AgentCapabilities,
AuthenticateResponse,
AuthMethod,
ClientCapabilities,
EmbeddedResourceContentBlock,
ForkSessionResponse,
ImageContentBlock,
AudioContentBlock,
Implementation,
InitializeResponse,
ListSessionsResponse,
LoadSessionResponse,
NewSessionResponse,
PromptResponse,
ResumeSessionResponse,
ResourceContentBlock,
SessionCapabilities,
SessionForkCapabilities,
SessionListCapabilities,
SessionInfo,
TextContentBlock,
Usage,
)
from acp_adapter.auth import detect_provider, has_provider
from acp_adapter.events import (
make_message_cb,
make_step_cb,
make_thinking_cb,
make_tool_progress_cb,
)
from acp_adapter.permissions import make_approval_callback
from acp_adapter.session import SessionManager
logger = logging.getLogger(__name__)
try:
from hermes_cli import __version__ as HERMES_VERSION
except Exception:
HERMES_VERSION = "0.0.0"
# Thread pool for running AIAgent (synchronous) in parallel.
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="acp-agent")
def _extract_text(
prompt: list[
TextContentBlock
| ImageContentBlock
| AudioContentBlock
| ResourceContentBlock
| EmbeddedResourceContentBlock
],
) -> str:
"""Extract plain text from ACP content blocks."""
parts: list[str] = []
for block in prompt:
if isinstance(block, TextContentBlock):
parts.append(block.text)
elif hasattr(block, "text"):
parts.append(str(block.text))
# Non-text blocks are ignored for now.
return "\n".join(parts)
class HermesACPAgent(acp.Agent):
"""ACP Agent implementation wrapping Hermes AIAgent."""
def __init__(self, session_manager: SessionManager | None = None):
super().__init__()
self.session_manager = session_manager or SessionManager()
self._conn: Optional[acp.Client] = None
# ---- Connection lifecycle -----------------------------------------------
def on_connect(self, conn: acp.Client) -> None:
"""Store the client connection for sending session updates."""
self._conn = conn
logger.info("ACP client connected")
# ---- ACP lifecycle ------------------------------------------------------
async def initialize(
self,
protocol_version: int,
client_capabilities: ClientCapabilities | None = None,
client_info: Implementation | None = None,
**kwargs: Any,
) -> InitializeResponse:
provider = detect_provider()
auth_methods = None
if provider:
auth_methods = [
AuthMethod(
id=provider,
name=f"{provider} runtime credentials",
description=f"Authenticate Hermes using the currently configured {provider} runtime credentials.",
)
]
client_name = client_info.name if client_info else "unknown"
logger.info("Initialize from %s (protocol v%s)", client_name, protocol_version)
return InitializeResponse(
protocol_version=acp.PROTOCOL_VERSION,
agent_info=Implementation(name="hermes-agent", version=HERMES_VERSION),
agent_capabilities=AgentCapabilities(
session_capabilities=SessionCapabilities(
fork=SessionForkCapabilities(),
list=SessionListCapabilities(),
),
),
auth_methods=auth_methods,
)
async def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None:
if has_provider():
return AuthenticateResponse()
return None
# ---- Session management -------------------------------------------------
async def new_session(
self,
cwd: str,
mcp_servers: list | None = None,
**kwargs: Any,
) -> NewSessionResponse:
state = self.session_manager.create_session(cwd=cwd)
logger.info("New session %s (cwd=%s)", state.session_id, cwd)
return NewSessionResponse(session_id=state.session_id)
async def load_session(
self,
cwd: str,
session_id: str,
mcp_servers: list | None = None,
**kwargs: Any,
) -> LoadSessionResponse | None:
state = self.session_manager.update_cwd(session_id, cwd)
if state is None:
logger.warning("load_session: session %s not found", session_id)
return None
logger.info("Loaded session %s", session_id)
return LoadSessionResponse()
async def resume_session(
self,
cwd: str,
session_id: str,
mcp_servers: list | None = None,
**kwargs: Any,
) -> ResumeSessionResponse:
state = self.session_manager.update_cwd(session_id, cwd)
if state is None:
logger.warning("resume_session: session %s not found, creating new", session_id)
state = self.session_manager.create_session(cwd=cwd)
logger.info("Resumed session %s", state.session_id)
return ResumeSessionResponse()
async def cancel(self, session_id: str, **kwargs: Any) -> None:
state = self.session_manager.get_session(session_id)
if state and state.cancel_event:
state.cancel_event.set()
try:
if getattr(state, "agent", None) and hasattr(state.agent, "interrupt"):
state.agent.interrupt()
except Exception:
logger.debug("Failed to interrupt ACP session %s", session_id, exc_info=True)
logger.info("Cancelled session %s", session_id)
async def fork_session(
self,
cwd: str,
session_id: str,
mcp_servers: list | None = None,
**kwargs: Any,
) -> ForkSessionResponse:
state = self.session_manager.fork_session(session_id, cwd=cwd)
new_id = state.session_id if state else ""
logger.info("Forked session %s -> %s", session_id, new_id)
return ForkSessionResponse(session_id=new_id)
async def list_sessions(
self,
cursor: str | None = None,
cwd: str | None = None,
**kwargs: Any,
) -> ListSessionsResponse:
infos = self.session_manager.list_sessions()
sessions = [
SessionInfo(session_id=s["session_id"], cwd=s["cwd"])
for s in infos
]
return ListSessionsResponse(sessions=sessions)
# ---- Prompt (core) ------------------------------------------------------
async def prompt(
self,
prompt: list[
TextContentBlock
| ImageContentBlock
| AudioContentBlock
| ResourceContentBlock
| EmbeddedResourceContentBlock
],
session_id: str,
**kwargs: Any,
) -> PromptResponse:
"""Run Hermes on the user's prompt and stream events back to the editor."""
state = self.session_manager.get_session(session_id)
if state is None:
logger.error("prompt: session %s not found", session_id)
return PromptResponse(stop_reason="refusal")
user_text = _extract_text(prompt)
if not user_text.strip():
return PromptResponse(stop_reason="end_turn")
logger.info("Prompt on session %s: %s", session_id, user_text[:100])
conn = self._conn
loop = asyncio.get_running_loop()
if state.cancel_event:
state.cancel_event.clear()
tool_call_ids: dict[str, Deque[str]] = defaultdict(deque)
previous_approval_cb = None
if conn:
tool_progress_cb = make_tool_progress_cb(conn, session_id, loop, tool_call_ids)
thinking_cb = make_thinking_cb(conn, session_id, loop)
step_cb = make_step_cb(conn, session_id, loop, tool_call_ids)
message_cb = make_message_cb(conn, session_id, loop)
approval_cb = make_approval_callback(conn.request_permission, loop, session_id)
else:
tool_progress_cb = None
thinking_cb = None
step_cb = None
message_cb = None
approval_cb = None
agent = state.agent
agent.tool_progress_callback = tool_progress_cb
agent.thinking_callback = thinking_cb
agent.step_callback = step_cb
agent.message_callback = message_cb
if approval_cb:
try:
from tools import terminal_tool as _terminal_tool
previous_approval_cb = getattr(_terminal_tool, "_approval_callback", None)
_terminal_tool.set_approval_callback(approval_cb)
except Exception:
logger.debug("Could not set ACP approval callback", exc_info=True)
def _run_agent() -> dict:
try:
result = agent.run_conversation(
user_message=user_text,
conversation_history=state.history,
task_id=session_id,
)
return result
except Exception as e:
logger.exception("Agent error in session %s", session_id)
return {"final_response": f"Error: {e}", "messages": state.history}
finally:
if approval_cb:
try:
from tools import terminal_tool as _terminal_tool
_terminal_tool.set_approval_callback(previous_approval_cb)
except Exception:
logger.debug("Could not restore approval callback", exc_info=True)
try:
result = await loop.run_in_executor(_executor, _run_agent)
except Exception:
logger.exception("Executor error for session %s", session_id)
return PromptResponse(stop_reason="end_turn")
if result.get("messages"):
state.history = result["messages"]
final_response = result.get("final_response", "")
if final_response and conn:
update = acp.update_agent_message_text(final_response)
await conn.session_update(session_id, update)
usage = None
usage_data = result.get("usage")
if usage_data and isinstance(usage_data, dict):
usage = Usage(
input_tokens=usage_data.get("prompt_tokens", 0),
output_tokens=usage_data.get("completion_tokens", 0),
total_tokens=usage_data.get("total_tokens", 0),
thought_tokens=usage_data.get("reasoning_tokens"),
cached_read_tokens=usage_data.get("cached_tokens"),
)
stop_reason = "cancelled" if state.cancel_event and state.cancel_event.is_set() else "end_turn"
return PromptResponse(stop_reason=stop_reason, usage=usage)
# ---- Model switching ----------------------------------------------------
async def set_session_model(
self, model_id: str, session_id: str, **kwargs: Any
):
"""Switch the model for a session."""
state = self.session_manager.get_session(session_id)
if state:
state.model = model_id
state.agent = self.session_manager._make_agent(
session_id=session_id,
cwd=state.cwd,
model=model_id,
)
logger.info("Session %s: model switched to %s", session_id, model_id)
return None

203
acp_adapter/session.py Normal file
View File

@@ -0,0 +1,203 @@
"""ACP session manager — maps ACP sessions to Hermes AIAgent instances."""
from __future__ import annotations
import copy
import logging
import uuid
from dataclasses import dataclass, field
from threading import Lock
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def _register_task_cwd(task_id: str, cwd: str) -> None:
"""Bind a task/session id to the editor's working directory for tools."""
if not task_id:
return
try:
from tools.terminal_tool import register_task_env_overrides
register_task_env_overrides(task_id, {"cwd": cwd})
except Exception:
logger.debug("Failed to register ACP task cwd override", exc_info=True)
def _clear_task_cwd(task_id: str) -> None:
"""Remove task-specific cwd overrides for an ACP session."""
if not task_id:
return
try:
from tools.terminal_tool import clear_task_env_overrides
clear_task_env_overrides(task_id)
except Exception:
logger.debug("Failed to clear ACP task cwd override", exc_info=True)
@dataclass
class SessionState:
"""Tracks per-session state for an ACP-managed Hermes agent."""
session_id: str
agent: Any # AIAgent instance
cwd: str = "."
model: str = ""
history: List[Dict[str, Any]] = field(default_factory=list)
cancel_event: Any = None # threading.Event
class SessionManager:
"""Thread-safe manager for ACP sessions backed by Hermes AIAgent instances."""
def __init__(self, agent_factory=None):
"""
Args:
agent_factory: Optional callable that creates an AIAgent-like object.
Used by tests. When omitted, a real AIAgent is created
using the current Hermes runtime provider configuration.
"""
self._sessions: Dict[str, SessionState] = {}
self._lock = Lock()
self._agent_factory = agent_factory
# ---- public API ---------------------------------------------------------
def create_session(self, cwd: str = ".") -> SessionState:
"""Create a new session with a unique ID and a fresh AIAgent."""
import threading
session_id = str(uuid.uuid4())
agent = self._make_agent(session_id=session_id, cwd=cwd)
state = SessionState(
session_id=session_id,
agent=agent,
cwd=cwd,
model=getattr(agent, "model", "") or "",
cancel_event=threading.Event(),
)
with self._lock:
self._sessions[session_id] = state
_register_task_cwd(session_id, cwd)
logger.info("Created ACP session %s (cwd=%s)", session_id, cwd)
return state
def get_session(self, session_id: str) -> Optional[SessionState]:
"""Return the session for *session_id*, or ``None``."""
with self._lock:
return self._sessions.get(session_id)
def remove_session(self, session_id: str) -> bool:
"""Remove a session. Returns True if it existed."""
with self._lock:
existed = self._sessions.pop(session_id, None) is not None
if existed:
_clear_task_cwd(session_id)
return existed
def fork_session(self, session_id: str, cwd: str = ".") -> Optional[SessionState]:
"""Deep-copy a session's history into a new session."""
import threading
with self._lock:
original = self._sessions.get(session_id)
if original is None:
return None
new_id = str(uuid.uuid4())
agent = self._make_agent(
session_id=new_id,
cwd=cwd,
model=original.model or None,
)
state = SessionState(
session_id=new_id,
agent=agent,
cwd=cwd,
model=getattr(agent, "model", original.model) or original.model,
history=copy.deepcopy(original.history),
cancel_event=threading.Event(),
)
self._sessions[new_id] = state
_register_task_cwd(new_id, cwd)
logger.info("Forked ACP session %s -> %s", session_id, new_id)
return state
def list_sessions(self) -> List[Dict[str, Any]]:
"""Return lightweight info dicts for all sessions."""
with self._lock:
return [
{
"session_id": s.session_id,
"cwd": s.cwd,
"model": s.model,
"history_len": len(s.history),
}
for s in self._sessions.values()
]
def update_cwd(self, session_id: str, cwd: str) -> Optional[SessionState]:
"""Update the working directory for a session and its tool overrides."""
with self._lock:
state = self._sessions.get(session_id)
if state is None:
return None
state.cwd = cwd
_register_task_cwd(session_id, cwd)
return state
def cleanup(self) -> None:
"""Remove all sessions and clear task-specific cwd overrides."""
with self._lock:
session_ids = list(self._sessions.keys())
self._sessions.clear()
for session_id in session_ids:
_clear_task_cwd(session_id)
# ---- internal -----------------------------------------------------------
def _make_agent(
self,
*,
session_id: str,
cwd: str,
model: str | None = None,
):
if self._agent_factory is not None:
return self._agent_factory()
from run_agent import AIAgent
from hermes_cli.config import load_config
from hermes_cli.runtime_provider import resolve_runtime_provider
config = load_config()
model_cfg = config.get("model")
default_model = "anthropic/claude-opus-4.6"
requested_provider = None
if isinstance(model_cfg, dict):
default_model = str(model_cfg.get("default") or default_model)
requested_provider = model_cfg.get("provider")
elif isinstance(model_cfg, str) and model_cfg.strip():
default_model = model_cfg.strip()
kwargs = {
"platform": "acp",
"enabled_toolsets": ["hermes-acp"],
"quiet_mode": True,
"session_id": session_id,
"model": model or default_model,
}
try:
runtime = resolve_runtime_provider(requested=requested_provider)
kwargs.update(
{
"provider": runtime.get("provider"),
"api_mode": runtime.get("api_mode"),
"base_url": runtime.get("base_url"),
"api_key": runtime.get("api_key"),
}
)
except Exception:
logger.debug("ACP session falling back to default provider resolution", exc_info=True)
_register_task_cwd(session_id, cwd)
return AIAgent(**kwargs)

215
acp_adapter/tools.py Normal file
View File

@@ -0,0 +1,215 @@
"""ACP tool-call helpers for mapping hermes tools to ACP ToolKind and building content."""
from __future__ import annotations
import uuid
from typing import Any, Dict, List, Optional
import acp
from acp.schema import (
ToolCallLocation,
ToolCallStart,
ToolCallProgress,
ToolKind,
)
# ---------------------------------------------------------------------------
# Map hermes tool names -> ACP ToolKind
# ---------------------------------------------------------------------------
TOOL_KIND_MAP: Dict[str, ToolKind] = {
# File operations
"read_file": "read",
"write_file": "edit",
"patch": "edit",
"search_files": "search",
# Terminal / execution
"terminal": "execute",
"process": "execute",
"execute_code": "execute",
# Web / fetch
"web_search": "fetch",
"web_extract": "fetch",
# Browser
"browser_navigate": "fetch",
"browser_click": "execute",
"browser_type": "execute",
"browser_snapshot": "read",
"browser_vision": "read",
"browser_scroll": "execute",
"browser_press": "execute",
"browser_back": "execute",
"browser_close": "execute",
"browser_get_images": "read",
# Agent internals
"delegate_task": "execute",
"vision_analyze": "read",
"image_generate": "execute",
"text_to_speech": "execute",
# Thinking / meta
"_thinking": "think",
}
def get_tool_kind(tool_name: str) -> ToolKind:
"""Return the ACP ToolKind for a hermes tool, defaulting to 'other'."""
return TOOL_KIND_MAP.get(tool_name, "other")
def make_tool_call_id() -> str:
"""Generate a unique tool call ID."""
return f"tc-{uuid.uuid4().hex[:12]}"
def build_tool_title(tool_name: str, args: Dict[str, Any]) -> str:
"""Build a human-readable title for a tool call."""
if tool_name == "terminal":
cmd = args.get("command", "")
if len(cmd) > 80:
cmd = cmd[:77] + "..."
return f"terminal: {cmd}"
if tool_name == "read_file":
return f"read: {args.get('path', '?')}"
if tool_name == "write_file":
return f"write: {args.get('path', '?')}"
if tool_name == "patch":
mode = args.get("mode", "replace")
path = args.get("path", "?")
return f"patch ({mode}): {path}"
if tool_name == "search_files":
return f"search: {args.get('pattern', '?')}"
if tool_name == "web_search":
return f"web search: {args.get('query', '?')}"
if tool_name == "web_extract":
urls = args.get("urls", [])
if urls:
return f"extract: {urls[0]}" + (f" (+{len(urls)-1})" if len(urls) > 1 else "")
return "web extract"
if tool_name == "delegate_task":
goal = args.get("goal", "")
if goal and len(goal) > 60:
goal = goal[:57] + "..."
return f"delegate: {goal}" if goal else "delegate task"
if tool_name == "execute_code":
return "execute code"
if tool_name == "vision_analyze":
return f"analyze image: {args.get('question', '?')[:50]}"
return tool_name
# ---------------------------------------------------------------------------
# Build ACP content objects for tool-call events
# ---------------------------------------------------------------------------
def build_tool_start(
tool_call_id: str,
tool_name: str,
arguments: Dict[str, Any],
) -> ToolCallStart:
"""Create a ToolCallStart event for the given hermes tool invocation."""
kind = get_tool_kind(tool_name)
title = build_tool_title(tool_name, arguments)
locations = extract_locations(arguments)
if tool_name == "patch":
mode = arguments.get("mode", "replace")
if mode == "replace":
path = arguments.get("path", "")
old = arguments.get("old_string", "")
new = arguments.get("new_string", "")
content = [acp.tool_diff_content(path=path, new_text=new, old_text=old)]
else:
# Patch mode — show the patch content as text
patch_text = arguments.get("patch", "")
content = [acp.tool_content(acp.text_block(patch_text))]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
if tool_name == "write_file":
path = arguments.get("path", "")
file_content = arguments.get("content", "")
content = [acp.tool_diff_content(path=path, new_text=file_content)]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
if tool_name == "terminal":
command = arguments.get("command", "")
content = [acp.tool_content(acp.text_block(f"$ {command}"))]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
if tool_name == "read_file":
path = arguments.get("path", "")
content = [acp.tool_content(acp.text_block(f"Reading {path}"))]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
if tool_name == "search_files":
pattern = arguments.get("pattern", "")
target = arguments.get("target", "content")
content = [acp.tool_content(acp.text_block(f"Searching for '{pattern}' ({target})"))]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
# Generic fallback
import json
try:
args_text = json.dumps(arguments, indent=2, default=str)
except (TypeError, ValueError):
args_text = str(arguments)
content = [acp.tool_content(acp.text_block(args_text))]
return acp.start_tool_call(
tool_call_id, title, kind=kind, content=content, locations=locations,
raw_input=arguments,
)
def build_tool_complete(
tool_call_id: str,
tool_name: str,
result: Optional[str] = None,
) -> ToolCallProgress:
"""Create a ToolCallUpdate (progress) event for a completed tool call."""
kind = get_tool_kind(tool_name)
# Truncate very large results for the UI
display_result = result or ""
if len(display_result) > 5000:
display_result = display_result[:4900] + f"\n... ({len(result)} chars total, truncated)"
content = [acp.tool_content(acp.text_block(display_result))]
return acp.update_tool_call(
tool_call_id,
kind=kind,
status="completed",
content=content,
raw_output=result,
)
# ---------------------------------------------------------------------------
# Location extraction
# ---------------------------------------------------------------------------
def extract_locations(
arguments: Dict[str, Any],
) -> List[ToolCallLocation]:
"""Extract file-system locations from tool arguments."""
locations: List[ToolCallLocation] = []
path = arguments.get("path")
if path:
line = arguments.get("offset") or arguments.get("line")
locations.append(ToolCallLocation(path=path, line=line))
return locations

12
acp_registry/agent.json Normal file
View File

@@ -0,0 +1,12 @@
{
"schema_version": 1,
"name": "hermes-agent",
"display_name": "Hermes Agent",
"description": "AI agent by Nous Research with 90+ tools, persistent memory, and multi-platform support",
"icon": "icon.svg",
"distribution": {
"type": "command",
"command": "hermes",
"args": ["acp"]
}
}

25
acp_registry/icon.svg Normal file
View File

@@ -0,0 +1,25 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 64 64" width="64" height="64">
<defs>
<linearGradient id="gold" x1="0%" y1="0%" x2="0%" y2="100%">
<stop offset="0%" style="stop-color:#F5C542;stop-opacity:1" />
<stop offset="100%" style="stop-color:#D4961C;stop-opacity:1" />
</linearGradient>
</defs>
<!-- Staff -->
<rect x="30" y="10" width="4" height="46" rx="2" fill="url(#gold)" />
<!-- Wings (left) -->
<path d="M30 18 C24 14, 14 14, 10 18 C14 16, 22 16, 28 20" fill="#F5C542" opacity="0.9" />
<path d="M30 22 C26 19, 18 19, 14 22 C18 20, 24 20, 28 24" fill="#D4961C" opacity="0.8" />
<!-- Wings (right) -->
<path d="M34 18 C40 14, 50 14, 54 18 C50 16, 42 16, 36 20" fill="#F5C542" opacity="0.9" />
<path d="M34 22 C38 19, 46 19, 50 22 C46 20, 40 20, 36 24" fill="#D4961C" opacity="0.8" />
<!-- Left serpent -->
<path d="M32 48 C22 44, 20 38, 26 34 C20 36, 18 42, 24 46 C18 40, 22 30, 30 28 C24 32, 22 38, 28 42"
fill="none" stroke="#F5C542" stroke-width="2.5" stroke-linecap="round" />
<!-- Right serpent -->
<path d="M32 48 C42 44, 44 38, 38 34 C44 36, 46 42, 40 46 C46 40, 42 30, 34 28 C40 32, 42 38, 36 42"
fill="none" stroke="#D4961C" stroke-width="2.5" stroke-linecap="round" />
<!-- Orb at top -->
<circle cx="32" cy="10" r="4" fill="#F5C542" />
<circle cx="32" cy="10" r="2" fill="#FFF8E1" opacity="0.7" />
</svg>

After

Width:  |  Height:  |  Size: 1.4 KiB

229
docs/acp-setup.md Normal file
View File

@@ -0,0 +1,229 @@
# Hermes Agent — ACP (Agent Client Protocol) Setup Guide
Hermes Agent supports the **Agent Client Protocol (ACP)**, allowing it to run as
a coding agent inside your editor. ACP lets your IDE send tasks to Hermes, and
Hermes responds with file edits, terminal commands, and explanations — all shown
natively in the editor UI.
---
## Prerequisites
- Hermes Agent installed and configured (`hermes setup` completed)
- An API key / provider set up in `~/.hermes/.env` or via `hermes login`
- Python 3.11+
Install the ACP extra:
```bash
pip install -e ".[acp]"
```
---
## VS Code Setup
### 1. Install the ACP Client extension
Open VS Code and install **ACP Client** from the marketplace:
- Press `Ctrl+Shift+X` (or `Cmd+Shift+X` on macOS)
- Search for **"ACP Client"**
- Click **Install**
Or install from the command line:
```bash
code --install-extension anysphere.acp-client
```
### 2. Configure settings.json
Open your VS Code settings (`Ctrl+,` → click the `{}` icon for JSON) and add:
```json
{
"acpClient.agents": [
{
"name": "hermes-agent",
"registryDir": "/path/to/hermes-agent/acp_registry"
}
]
}
```
Replace `/path/to/hermes-agent` with the actual path to your Hermes Agent
installation (e.g. `~/.hermes/hermes-agent`).
Alternatively, if `hermes` is on your PATH, the ACP Client can discover it
automatically via the registry directory.
### 3. Restart VS Code
After configuring, restart VS Code. You should see **Hermes Agent** appear in
the ACP agent picker in the chat/agent panel.
---
## Zed Setup
Zed has built-in ACP support.
### 1. Configure Zed settings
Open Zed settings (`Cmd+,` on macOS or `Ctrl+,` on Linux) and add to your
`settings.json`:
```json
{
"acp": {
"agents": [
{
"name": "hermes-agent",
"registry_dir": "/path/to/hermes-agent/acp_registry"
}
]
}
}
```
### 2. Restart Zed
Hermes Agent will appear in the agent panel. Select it and start a conversation.
---
## JetBrains Setup (IntelliJ, PyCharm, WebStorm, etc.)
### 1. Install the ACP plugin
- Open **Settings****Plugins****Marketplace**
- Search for **"ACP"** or **"Agent Client Protocol"**
- Install and restart the IDE
### 2. Configure the agent
- Open **Settings****Tools****ACP Agents**
- Click **+** to add a new agent
- Set the registry directory to your `acp_registry/` folder:
`/path/to/hermes-agent/acp_registry`
- Click **OK**
### 3. Use the agent
Open the ACP panel (usually in the right sidebar) and select **Hermes Agent**.
---
## What You Will See
Once connected, your editor provides a native interface to Hermes Agent:
### Chat Panel
A conversational interface where you can describe tasks, ask questions, and
give instructions. Hermes responds with explanations and actions.
### File Diffs
When Hermes edits files, you see standard diffs in the editor. You can:
- **Accept** individual changes
- **Reject** changes you don't want
- **Review** the full diff before applying
### Terminal Commands
When Hermes needs to run shell commands (builds, tests, installs), the editor
shows them in an integrated terminal. Depending on your settings:
- Commands may run automatically
- Or you may be prompted to **approve** each command
### Approval Flow
For potentially destructive operations, the editor will prompt you for
approval before Hermes proceeds. This includes:
- File deletions
- Shell commands
- Git operations
---
## Configuration
Hermes Agent under ACP uses the **same configuration** as the CLI:
- **API keys / providers**: `~/.hermes/.env`
- **Agent config**: `~/.hermes/config.yaml`
- **Skills**: `~/.hermes/skills/`
- **Sessions**: `~/.hermes/state.db`
You can run `hermes setup` to configure providers, or edit `~/.hermes/.env`
directly.
### Changing the model
Edit `~/.hermes/config.yaml`:
```yaml
model: openrouter/nous/hermes-3-llama-3.1-70b
```
Or set the `HERMES_MODEL` environment variable.
### Toolsets
ACP sessions use the curated `hermes-acp` toolset by default. It is designed for editor workflows and intentionally excludes things like messaging delivery, cronjob management, and audio-first UX features.
---
## Troubleshooting
### Agent doesn't appear in the editor
1. **Check the registry path** — make sure the `acp_registry/` directory path
in your editor settings is correct and contains `agent.json`.
2. **Check `hermes` is on PATH** — run `which hermes` in a terminal. If not
found, you may need to activate your virtualenv or add it to PATH.
3. **Restart the editor** after changing settings.
### Agent starts but errors immediately
1. Run `hermes doctor` to check your configuration.
2. Check that you have a valid API key: `hermes status`
3. Try running `hermes acp` directly in a terminal to see error output.
### "Module not found" errors
Make sure you installed the ACP extra:
```bash
pip install -e ".[acp]"
```
### Slow responses
- ACP streams responses, so you should see incremental output. If the agent
appears stuck, check your network connection and API provider status.
- Some providers have rate limits. Try switching to a different model/provider.
### Permission denied for terminal commands
If the editor blocks terminal commands, check your ACP Client extension
settings for auto-approval or manual-approval preferences.
### Logs
Hermes logs are written to stderr when running in ACP mode. Check:
- VS Code: **Output** panel → select **ACP Client** or **Hermes Agent**
- Zed: **View****Toggle Terminal** and check the process output
- JetBrains: **Event Log** or the ACP tool window
You can also enable verbose logging:
```bash
HERMES_LOG_LEVEL=DEBUG hermes acp
```
---
## Further Reading
- [ACP Specification](https://github.com/anysphere/acp)
- [Hermes Agent Documentation](https://github.com/NousResearch/hermes-agent)
- Run `hermes --help` for all CLI options

View File

@@ -34,11 +34,12 @@ Usage:
hermes honcho identity # Show AI peer identity representation
hermes honcho identity <file> # Seed AI peer identity from a file (SOUL.md etc.)
hermes honcho migrate # Step-by-step migration guide: OpenClaw native → Hermes + Honcho
hermes version # Show version
hermes update # Update to latest version
hermes uninstall # Uninstall Hermes Agent
hermes sessions browse # Interactive session picker with search
hermes claw migrate # Migrate from OpenClaw to Hermes
hermes version Show version
hermes update Update to latest version
hermes uninstall Uninstall Hermes Agent
hermes acp Run as an ACP server for editor integration
hermes sessions browse Interactive session picker with search
hermes claw migrate --dry-run # Preview migration without changes
"""
@@ -3102,6 +3103,27 @@ For more help on a command:
help="Skip confirmation prompts"
)
uninstall_parser.set_defaults(func=cmd_uninstall)
# =========================================================================
# acp command
# =========================================================================
acp_parser = subparsers.add_parser(
"acp",
help="Run Hermes Agent as an ACP (Agent Client Protocol) server",
description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)",
)
def cmd_acp(args):
"""Launch Hermes Agent as an ACP server."""
try:
from acp_adapter.entry import main as acp_main
acp_main()
except ImportError:
print("ACP dependencies not installed.")
print("Install them with: pip install -e '.[acp]'")
sys.exit(1)
acp_parser.set_defaults(func=cmd_acp)
# =========================================================================
# Parse and execute

View File

@@ -55,6 +55,7 @@ pty = [
honcho = ["honcho-ai>=2.0.1"]
mcp = ["mcp>=1.2.0"]
homeassistant = ["aiohttp>=3.9.0"]
acp = ["agent-client-protocol>=0.8.1,<1.0"]
rl = [
"atroposlib @ git+https://github.com/NousResearch/atropos.git",
"tinker @ git+https://github.com/thinking-machines-lab/tinker.git",
@@ -76,17 +77,19 @@ all = [
"hermes-agent[honcho]",
"hermes-agent[mcp]",
"hermes-agent[homeassistant]",
"hermes-agent[acp]",
]
[project.scripts]
hermes = "hermes_cli.main:main"
hermes-agent = "run_agent:main"
hermes-acp = "acp_adapter.entry:main"
[tool.setuptools]
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_constants", "hermes_state", "hermes_time", "mini_swe_runner", "rl_cli", "utils"]
[tool.setuptools.packages.find]
include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "cron", "honcho_integration"]
include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "cron", "honcho_integration", "acp_adapter"]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@@ -0,0 +1,3 @@
---
description: Skills for data science workflows — interactive exploration, Jupyter notebooks, data analysis, and visualization.
---

View File

@@ -0,0 +1,171 @@
---
name: jupyter-live-kernel
description: >
Use a live Jupyter kernel for stateful, iterative Python execution via hamelnb.
Load this skill when the task involves exploration, iteration, or inspecting
intermediate results — data science, ML experimentation, API exploration, or
building up complex code step-by-step. Uses terminal to run CLI commands against
a live Jupyter kernel. No new tools required.
version: 1.0.0
author: Hermes Agent
license: MIT
metadata:
hermes:
tags: [jupyter, notebook, repl, data-science, exploration, iterative]
category: data-science
---
# Jupyter Live Kernel (hamelnb)
Gives you a **stateful Python REPL** via a live Jupyter kernel. Variables persist
across executions. Use this instead of `execute_code` when you need to build up
state incrementally, explore APIs, inspect DataFrames, or iterate on complex code.
## When to Use This vs Other Tools
| Tool | Use When |
|------|----------|
| **This skill** | Iterative exploration, state across steps, data science, ML, "let me try this and check" |
| `execute_code` | One-shot scripts needing hermes tool access (web_search, file ops). Stateless. |
| `terminal` | Shell commands, builds, installs, git, process management |
**Rule of thumb:** If you'd want a Jupyter notebook for the task, use this skill.
## Prerequisites
1. **uv** must be installed (check: `which uv`)
2. **JupyterLab** must be installed: `uv tool install jupyterlab`
3. A Jupyter server must be running (see Setup below)
## Setup
The hamelnb script location:
```
SCRIPT="$HOME/.agent-skills/hamelnb/skills/jupyter-live-kernel/scripts/jupyter_live_kernel.py"
```
If not cloned yet:
```
git clone https://github.com/hamelsmu/hamelnb.git ~/.agent-skills/hamelnb
```
### Starting JupyterLab
Check if a server is already running:
```
uv run "$SCRIPT" servers
```
If no servers found, start one:
```
jupyter-lab --no-browser --port=8888 --notebook-dir=$HOME/notebooks \
--IdentityProvider.token='' --ServerApp.password='' > /tmp/jupyter.log 2>&1 &
sleep 3
```
Note: Token/password disabled for local agent access. The server runs headless.
### Creating a Notebook for REPL Use
If you just need a REPL (no existing notebook), create a minimal notebook file:
```
mkdir -p ~/notebooks
```
Write a minimal .ipynb JSON file with one empty code cell, then start a kernel
session via the Jupyter REST API:
```
curl -s -X POST http://127.0.0.1:8888/api/sessions \
-H "Content-Type: application/json" \
-d '{"path":"scratch.ipynb","type":"notebook","name":"scratch.ipynb","kernel":{"name":"python3"}}'
```
## Core Workflow
All commands return structured JSON. Always use `--compact` to save tokens.
### 1. Discover servers and notebooks
```
uv run "$SCRIPT" servers --compact
uv run "$SCRIPT" notebooks --compact
```
### 2. Execute code (primary operation)
```
uv run "$SCRIPT" execute --path <notebook.ipynb> --code '<python code>' --compact
```
State persists across execute calls. Variables, imports, objects all survive.
Multi-line code works with $'...' quoting:
```
uv run "$SCRIPT" execute --path scratch.ipynb --code $'import os\nfiles = os.listdir(".")\nprint(f"Found {len(files)} files")' --compact
```
### 3. Inspect live variables
```
uv run "$SCRIPT" variables --path <notebook.ipynb> list --compact
uv run "$SCRIPT" variables --path <notebook.ipynb> preview --name <varname> --compact
```
### 4. Edit notebook cells
```
# View current cells
uv run "$SCRIPT" contents --path <notebook.ipynb> --compact
# Insert a new cell
uv run "$SCRIPT" edit --path <notebook.ipynb> insert \
--at-index <N> --cell-type code --source '<code>' --compact
# Replace cell source (use cell-id from contents output)
uv run "$SCRIPT" edit --path <notebook.ipynb> replace-source \
--cell-id <id> --source '<new code>' --compact
# Delete a cell
uv run "$SCRIPT" edit --path <notebook.ipynb> delete --cell-id <id> --compact
```
### 5. Verification (restart + run all)
Only use when the user asks for a clean verification or you need to confirm
the notebook runs top-to-bottom:
```
uv run "$SCRIPT" restart-run-all --path <notebook.ipynb> --save-outputs --compact
```
## Practical Tips from Experience
1. **First execution after server start may timeout** — the kernel needs a moment
to initialize. If you get a timeout, just retry.
2. **The kernel Python is JupyterLab's Python** — packages must be installed in
that environment. If you need additional packages, install them into the
JupyterLab tool environment first.
3. **--compact flag saves significant tokens** — always use it. JSON output can
be very verbose without it.
4. **For pure REPL use**, create a scratch.ipynb and don't bother with cell editing.
Just use `execute` repeatedly.
5. **Argument order matters** — subcommand flags like `--path` go BEFORE the
sub-subcommand. E.g.: `variables --path nb.ipynb list` not `variables list --path nb.ipynb`.
6. **If a session doesn't exist yet**, you need to start one via the REST API
(see Setup section). The tool can't execute without a live kernel session.
7. **Errors are returned as JSON** with traceback — read the `ename` and `evalue`
fields to understand what went wrong.
8. **Occasional websocket timeouts** — some operations may timeout on first try,
especially after a kernel restart. Retry once before escalating.
## Timeout Defaults
The script has a 30-second default timeout per execution. For long-running
operations, pass `--timeout 120`. Use generous timeouts (60+) for initial
setup or heavy computation.

0
tests/acp/__init__.py Normal file
View File

56
tests/acp/test_auth.py Normal file
View File

@@ -0,0 +1,56 @@
"""Tests for acp_adapter.auth — provider detection."""
from acp_adapter.auth import has_provider, detect_provider
class TestHasProvider:
def test_has_provider_with_resolved_runtime(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda: {"provider": "openrouter", "api_key": "sk-or-test"},
)
assert has_provider() is True
def test_has_no_provider_when_runtime_has_no_key(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda: {"provider": "openrouter", "api_key": ""},
)
assert has_provider() is False
def test_has_no_provider_when_runtime_resolution_fails(self, monkeypatch):
def _boom():
raise RuntimeError("no provider")
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _boom)
assert has_provider() is False
class TestDetectProvider:
def test_detect_openrouter(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda: {"provider": "openrouter", "api_key": "sk-or-test"},
)
assert detect_provider() == "openrouter"
def test_detect_anthropic(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda: {"provider": "anthropic", "api_key": "sk-ant-test"},
)
assert detect_provider() == "anthropic"
def test_detect_none_when_no_key(self, monkeypatch):
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda: {"provider": "kimi-coding", "api_key": ""},
)
assert detect_provider() is None
def test_detect_none_on_resolution_error(self, monkeypatch):
def _boom():
raise RuntimeError("broken")
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _boom)
assert detect_provider() is None

239
tests/acp/test_events.py Normal file
View File

@@ -0,0 +1,239 @@
"""Tests for acp_adapter.events — callback factories for ACP notifications."""
import asyncio
from concurrent.futures import Future
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import acp
from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, AgentMessageChunk
from acp_adapter.events import (
make_message_cb,
make_step_cb,
make_thinking_cb,
make_tool_progress_cb,
)
@pytest.fixture()
def mock_conn():
"""Mock ACP Client connection."""
conn = MagicMock(spec=acp.Client)
conn.session_update = AsyncMock()
return conn
@pytest.fixture()
def event_loop_fixture():
"""Create a real event loop for testing threadsafe coroutine submission."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
# ---------------------------------------------------------------------------
# Tool progress callback
# ---------------------------------------------------------------------------
class TestToolProgressCallback:
def test_emits_tool_call_start(self, mock_conn, event_loop_fixture):
"""Tool progress should emit a ToolCallStart update."""
tool_call_ids = {}
loop = event_loop_fixture
cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
# Run callback in the event loop context
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb("terminal", "$ ls -la", {"command": "ls -la"})
# Should have tracked the tool call ID
assert "terminal" in tool_call_ids
# Should have called run_coroutine_threadsafe
mock_rcts.assert_called_once()
coro = mock_rcts.call_args[0][0]
# The coroutine should be conn.session_update
assert mock_conn.session_update.called or coro is not None
def test_handles_string_args(self, mock_conn, event_loop_fixture):
"""If args is a JSON string, it should be parsed."""
tool_call_ids = {}
loop = event_loop_fixture
cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb("read_file", "Reading /etc/hosts", '{"path": "/etc/hosts"}')
assert "read_file" in tool_call_ids
def test_handles_non_dict_args(self, mock_conn, event_loop_fixture):
"""If args is not a dict, it should be wrapped."""
tool_call_ids = {}
loop = event_loop_fixture
cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb("terminal", "$ echo hi", None)
assert "terminal" in tool_call_ids
def test_duplicate_same_name_tool_calls_use_fifo_ids(self, mock_conn, event_loop_fixture):
"""Multiple same-name tool calls should be tracked independently in order."""
tool_call_ids = {}
loop = event_loop_fixture
progress_cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
step_cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
progress_cb("terminal", "$ ls", {"command": "ls"})
progress_cb("terminal", "$ pwd", {"command": "pwd"})
assert len(tool_call_ids["terminal"]) == 2
step_cb(1, [{"name": "terminal", "result": "ok-1"}])
assert len(tool_call_ids["terminal"]) == 1
step_cb(2, [{"name": "terminal", "result": "ok-2"}])
assert "terminal" not in tool_call_ids
# ---------------------------------------------------------------------------
# Thinking callback
# ---------------------------------------------------------------------------
class TestThinkingCallback:
def test_emits_thought_chunk(self, mock_conn, event_loop_fixture):
"""Thinking callback should emit AgentThoughtChunk."""
loop = event_loop_fixture
cb = make_thinking_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb("Analyzing the code...")
mock_rcts.assert_called_once()
def test_ignores_empty_text(self, mock_conn, event_loop_fixture):
"""Empty text should not emit any update."""
loop = event_loop_fixture
cb = make_thinking_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
cb("")
mock_rcts.assert_not_called()
# ---------------------------------------------------------------------------
# Step callback
# ---------------------------------------------------------------------------
class TestStepCallback:
def test_completes_tracked_tool_calls(self, mock_conn, event_loop_fixture):
"""Step callback should mark tracked tools as completed."""
tool_call_ids = {"terminal": "tc-abc123"}
loop = event_loop_fixture
cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb(1, [{"name": "terminal", "result": "success"}])
# Tool should have been removed from tracking
assert "terminal" not in tool_call_ids
mock_rcts.assert_called_once()
def test_ignores_untracked_tools(self, mock_conn, event_loop_fixture):
"""Tools not in tool_call_ids should be silently ignored."""
tool_call_ids = {}
loop = event_loop_fixture
cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
cb(1, [{"name": "unknown_tool", "result": "ok"}])
mock_rcts.assert_not_called()
def test_handles_string_tool_info(self, mock_conn, event_loop_fixture):
"""Tool info as a string (just the name) should work."""
tool_call_ids = {"read_file": "tc-def456"}
loop = event_loop_fixture
cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb(2, ["read_file"])
assert "read_file" not in tool_call_ids
mock_rcts.assert_called_once()
# ---------------------------------------------------------------------------
# Message callback
# ---------------------------------------------------------------------------
class TestMessageCallback:
def test_emits_agent_message_chunk(self, mock_conn, event_loop_fixture):
"""Message callback should emit AgentMessageChunk."""
loop = event_loop_fixture
cb = make_message_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
future = MagicMock(spec=Future)
future.result.return_value = None
mock_rcts.return_value = future
cb("Here is your answer.")
mock_rcts.assert_called_once()
def test_ignores_empty_message(self, mock_conn, event_loop_fixture):
"""Empty text should not emit any update."""
loop = event_loop_fixture
cb = make_message_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
cb("")
mock_rcts.assert_not_called()

View File

@@ -0,0 +1,75 @@
"""Tests for acp_adapter.permissions — ACP approval bridging."""
import asyncio
from concurrent.futures import Future
from unittest.mock import MagicMock, patch
import pytest
from acp.schema import (
AllowedOutcome,
DeniedOutcome,
RequestPermissionResponse,
)
from acp_adapter.permissions import make_approval_callback
def _make_response(outcome):
"""Helper to build a RequestPermissionResponse with the given outcome."""
return RequestPermissionResponse(outcome=outcome)
def _setup_callback(outcome, timeout=60.0):
"""
Create a callback wired to a mock request_permission coroutine
that resolves to the given outcome.
Returns:
(callback, mock_request_permission_fn)
"""
loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_rp = MagicMock(name="request_permission")
response = _make_response(outcome)
# Patch asyncio.run_coroutine_threadsafe so it returns a future
# that immediately yields the response.
future = MagicMock(spec=Future)
future.result.return_value = response
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=timeout)
result = cb("rm -rf /", "dangerous command")
return result
class TestApprovalMapping:
def test_approval_allow_once_maps_correctly(self):
outcome = AllowedOutcome(option_id="allow_once", outcome="selected")
result = _setup_callback(outcome)
assert result == "once"
def test_approval_allow_always_maps_correctly(self):
outcome = AllowedOutcome(option_id="allow_always", outcome="selected")
result = _setup_callback(outcome)
assert result == "always"
def test_approval_deny_maps_correctly(self):
outcome = DeniedOutcome(outcome="cancelled")
result = _setup_callback(outcome)
assert result == "deny"
def test_approval_timeout_returns_deny(self):
"""When the future times out, the callback should return 'deny'."""
loop = MagicMock(spec=asyncio.AbstractEventLoop)
mock_rp = MagicMock(name="request_permission")
future = MagicMock(spec=Future)
future.result.side_effect = TimeoutError("timed out")
with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=0.01)
result = cb("rm -rf /", "dangerous")
assert result == "deny"

297
tests/acp/test_server.py Normal file
View File

@@ -0,0 +1,297 @@
"""Tests for acp_adapter.server — HermesACPAgent ACP server."""
import asyncio
import os
from unittest.mock import MagicMock, AsyncMock, patch
import pytest
import acp
from acp.schema import (
AgentCapabilities,
AuthenticateResponse,
Implementation,
InitializeResponse,
ListSessionsResponse,
LoadSessionResponse,
NewSessionResponse,
PromptResponse,
ResumeSessionResponse,
SessionInfo,
TextContentBlock,
Usage,
)
from acp_adapter.server import HermesACPAgent, HERMES_VERSION
from acp_adapter.session import SessionManager
@pytest.fixture()
def mock_manager():
"""SessionManager with a mock agent factory."""
return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent"))
@pytest.fixture()
def agent(mock_manager):
"""HermesACPAgent backed by a mock session manager."""
return HermesACPAgent(session_manager=mock_manager)
# ---------------------------------------------------------------------------
# initialize
# ---------------------------------------------------------------------------
class TestInitialize:
@pytest.mark.asyncio
async def test_initialize_returns_correct_protocol_version(self, agent):
resp = await agent.initialize(protocol_version=1)
assert isinstance(resp, InitializeResponse)
assert resp.protocol_version == acp.PROTOCOL_VERSION
@pytest.mark.asyncio
async def test_initialize_returns_agent_info(self, agent):
resp = await agent.initialize(protocol_version=1)
assert resp.agent_info is not None
assert isinstance(resp.agent_info, Implementation)
assert resp.agent_info.name == "hermes-agent"
assert resp.agent_info.version == HERMES_VERSION
@pytest.mark.asyncio
async def test_initialize_returns_capabilities(self, agent):
resp = await agent.initialize(protocol_version=1)
caps = resp.agent_capabilities
assert isinstance(caps, AgentCapabilities)
assert caps.session_capabilities is not None
assert caps.session_capabilities.fork is not None
assert caps.session_capabilities.list is not None
# ---------------------------------------------------------------------------
# authenticate
# ---------------------------------------------------------------------------
class TestAuthenticate:
@pytest.mark.asyncio
async def test_authenticate_with_provider_configured(self, agent, monkeypatch):
monkeypatch.setattr(
"acp_adapter.server.has_provider",
lambda: True,
)
resp = await agent.authenticate(method_id="openrouter")
assert isinstance(resp, AuthenticateResponse)
@pytest.mark.asyncio
async def test_authenticate_without_provider(self, agent, monkeypatch):
monkeypatch.setattr(
"acp_adapter.server.has_provider",
lambda: False,
)
resp = await agent.authenticate(method_id="openrouter")
assert resp is None
# ---------------------------------------------------------------------------
# new_session / cancel / load / resume
# ---------------------------------------------------------------------------
class TestSessionOps:
@pytest.mark.asyncio
async def test_new_session_creates_session(self, agent):
resp = await agent.new_session(cwd="/home/user/project")
assert isinstance(resp, NewSessionResponse)
assert resp.session_id
# Session should be retrievable from the manager
state = agent.session_manager.get_session(resp.session_id)
assert state is not None
assert state.cwd == "/home/user/project"
@pytest.mark.asyncio
async def test_cancel_sets_event(self, agent):
resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(resp.session_id)
assert not state.cancel_event.is_set()
await agent.cancel(session_id=resp.session_id)
assert state.cancel_event.is_set()
@pytest.mark.asyncio
async def test_cancel_nonexistent_session_is_noop(self, agent):
# Should not raise
await agent.cancel(session_id="does-not-exist")
@pytest.mark.asyncio
async def test_load_session_returns_response(self, agent):
resp = await agent.new_session(cwd="/tmp")
load_resp = await agent.load_session(cwd="/tmp", session_id=resp.session_id)
assert isinstance(load_resp, LoadSessionResponse)
@pytest.mark.asyncio
async def test_load_session_not_found_returns_none(self, agent):
resp = await agent.load_session(cwd="/tmp", session_id="bogus")
assert resp is None
@pytest.mark.asyncio
async def test_resume_session_returns_response(self, agent):
resp = await agent.new_session(cwd="/tmp")
resume_resp = await agent.resume_session(cwd="/tmp", session_id=resp.session_id)
assert isinstance(resume_resp, ResumeSessionResponse)
@pytest.mark.asyncio
async def test_resume_session_creates_new_if_missing(self, agent):
resume_resp = await agent.resume_session(cwd="/tmp", session_id="nonexistent")
assert isinstance(resume_resp, ResumeSessionResponse)
# ---------------------------------------------------------------------------
# list / fork
# ---------------------------------------------------------------------------
class TestListAndFork:
@pytest.mark.asyncio
async def test_list_sessions(self, agent):
await agent.new_session(cwd="/a")
await agent.new_session(cwd="/b")
resp = await agent.list_sessions()
assert isinstance(resp, ListSessionsResponse)
assert len(resp.sessions) == 2
@pytest.mark.asyncio
async def test_fork_session(self, agent):
new_resp = await agent.new_session(cwd="/original")
fork_resp = await agent.fork_session(cwd="/forked", session_id=new_resp.session_id)
assert fork_resp.session_id
assert fork_resp.session_id != new_resp.session_id
# ---------------------------------------------------------------------------
# prompt
# ---------------------------------------------------------------------------
class TestPrompt:
@pytest.mark.asyncio
async def test_prompt_returns_refusal_for_unknown_session(self, agent):
prompt = [TextContentBlock(type="text", text="hello")]
resp = await agent.prompt(prompt=prompt, session_id="nonexistent")
assert isinstance(resp, PromptResponse)
assert resp.stop_reason == "refusal"
@pytest.mark.asyncio
async def test_prompt_returns_end_turn_for_empty_message(self, agent):
new_resp = await agent.new_session(cwd=".")
prompt = [TextContentBlock(type="text", text=" ")]
resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert resp.stop_reason == "end_turn"
@pytest.mark.asyncio
async def test_prompt_runs_agent(self, agent):
"""The prompt method should call run_conversation on the agent."""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
# Mock the agent's run_conversation
state.agent.run_conversation = MagicMock(return_value={
"final_response": "Hello! How can I help?",
"messages": [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "Hello! How can I help?"},
],
})
# Set up a mock connection
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="hello")]
resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert isinstance(resp, PromptResponse)
assert resp.stop_reason == "end_turn"
state.agent.run_conversation.assert_called_once()
@pytest.mark.asyncio
async def test_prompt_updates_history(self, agent):
"""After a prompt, session history should be updated."""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
expected_history = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hey"},
]
state.agent.run_conversation = MagicMock(return_value={
"final_response": "hey",
"messages": expected_history,
})
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="hi")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert state.history == expected_history
@pytest.mark.asyncio
async def test_prompt_sends_final_message_update(self, agent):
"""The final response should be sent as an AgentMessageChunk."""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
state.agent.run_conversation = MagicMock(return_value={
"final_response": "I can help with that!",
"messages": [],
})
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="help me")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
# session_update should have been called with the final message
mock_conn.session_update.assert_called()
# Get the last call's update argument
last_call = mock_conn.session_update.call_args_list[-1]
update = last_call[1].get("update") or last_call[0][1]
assert update.session_update == "agent_message_chunk"
@pytest.mark.asyncio
async def test_prompt_cancelled_returns_cancelled_stop_reason(self, agent):
"""If cancel is called during prompt, stop_reason should be 'cancelled'."""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
def mock_run(*args, **kwargs):
# Simulate cancel being set during execution
state.cancel_event.set()
return {"final_response": "interrupted", "messages": []}
state.agent.run_conversation = mock_run
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="do something")]
resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
assert resp.stop_reason == "cancelled"
# ---------------------------------------------------------------------------
# on_connect
# ---------------------------------------------------------------------------
class TestOnConnect:
def test_on_connect_stores_client(self, agent):
mock_conn = MagicMock(spec=acp.Client)
agent.on_connect(mock_conn)
assert agent._conn is mock_conn

112
tests/acp/test_session.py Normal file
View File

@@ -0,0 +1,112 @@
"""Tests for acp_adapter.session — SessionManager and SessionState."""
import pytest
from unittest.mock import MagicMock
from acp_adapter.session import SessionManager, SessionState
@pytest.fixture()
def manager():
"""SessionManager with a mock agent factory (avoids needing API keys)."""
return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent"))
# ---------------------------------------------------------------------------
# create / get
# ---------------------------------------------------------------------------
class TestCreateSession:
def test_create_session_returns_state(self, manager):
state = manager.create_session(cwd="/tmp/work")
assert isinstance(state, SessionState)
assert state.cwd == "/tmp/work"
assert state.session_id
assert state.history == []
assert state.agent is not None
def test_create_session_registers_task_cwd(self, manager, monkeypatch):
calls = []
monkeypatch.setattr("acp_adapter.session._register_task_cwd", lambda task_id, cwd: calls.append((task_id, cwd)))
state = manager.create_session(cwd="/tmp/work")
assert calls == [(state.session_id, "/tmp/work")]
def test_session_ids_are_unique(self, manager):
s1 = manager.create_session()
s2 = manager.create_session()
assert s1.session_id != s2.session_id
def test_get_session(self, manager):
state = manager.create_session()
fetched = manager.get_session(state.session_id)
assert fetched is state
def test_get_nonexistent_session_returns_none(self, manager):
assert manager.get_session("does-not-exist") is None
# ---------------------------------------------------------------------------
# fork
# ---------------------------------------------------------------------------
class TestForkSession:
def test_fork_session_deep_copies_history(self, manager):
original = manager.create_session()
original.history.append({"role": "user", "content": "hello"})
original.history.append({"role": "assistant", "content": "hi"})
forked = manager.fork_session(original.session_id, cwd="/new")
assert forked is not None
# History should be equal in content
assert len(forked.history) == 2
assert forked.history[0]["content"] == "hello"
# But a deep copy — mutating one doesn't affect the other
forked.history.append({"role": "user", "content": "extra"})
assert len(original.history) == 2
assert len(forked.history) == 3
def test_fork_session_has_new_id(self, manager):
original = manager.create_session()
forked = manager.fork_session(original.session_id)
assert forked is not None
assert forked.session_id != original.session_id
def test_fork_nonexistent_returns_none(self, manager):
assert manager.fork_session("bogus-id") is None
# ---------------------------------------------------------------------------
# list / cleanup / remove
# ---------------------------------------------------------------------------
class TestListAndCleanup:
def test_list_sessions_empty(self, manager):
assert manager.list_sessions() == []
def test_list_sessions_returns_created(self, manager):
s1 = manager.create_session(cwd="/a")
s2 = manager.create_session(cwd="/b")
listing = manager.list_sessions()
ids = {s["session_id"] for s in listing}
assert s1.session_id in ids
assert s2.session_id in ids
assert len(listing) == 2
def test_cleanup_clears_all(self, manager):
manager.create_session()
manager.create_session()
assert len(manager.list_sessions()) == 2
manager.cleanup()
assert manager.list_sessions() == []
def test_remove_session(self, manager):
state = manager.create_session()
assert manager.remove_session(state.session_id) is True
assert manager.get_session(state.session_id) is None
# Removing again returns False
assert manager.remove_session(state.session_id) is False

236
tests/acp/test_tools.py Normal file
View File

@@ -0,0 +1,236 @@
"""Tests for acp_adapter.tools — tool kind mapping and ACP content building."""
import pytest
from acp_adapter.tools import (
TOOL_KIND_MAP,
build_tool_complete,
build_tool_start,
build_tool_title,
extract_locations,
get_tool_kind,
make_tool_call_id,
)
from acp.schema import (
FileEditToolCallContent,
ContentToolCallContent,
ToolCallLocation,
ToolCallStart,
ToolCallProgress,
)
# ---------------------------------------------------------------------------
# TOOL_KIND_MAP coverage
# ---------------------------------------------------------------------------
COMMON_HERMES_TOOLS = ["read_file", "search_files", "terminal", "patch", "write_file", "process"]
class TestToolKindMap:
def test_all_hermes_tools_have_kind(self):
"""Every common hermes tool should appear in TOOL_KIND_MAP."""
for tool in COMMON_HERMES_TOOLS:
assert tool in TOOL_KIND_MAP, f"{tool} missing from TOOL_KIND_MAP"
def test_tool_kind_read_file(self):
assert get_tool_kind("read_file") == "read"
def test_tool_kind_terminal(self):
assert get_tool_kind("terminal") == "execute"
def test_tool_kind_patch(self):
assert get_tool_kind("patch") == "edit"
def test_tool_kind_write_file(self):
assert get_tool_kind("write_file") == "edit"
def test_tool_kind_web_search(self):
assert get_tool_kind("web_search") == "fetch"
def test_tool_kind_execute_code(self):
assert get_tool_kind("execute_code") == "execute"
def test_tool_kind_browser_navigate(self):
assert get_tool_kind("browser_navigate") == "fetch"
def test_unknown_tool_returns_other_kind(self):
assert get_tool_kind("nonexistent_tool_xyz") == "other"
# ---------------------------------------------------------------------------
# make_tool_call_id
# ---------------------------------------------------------------------------
class TestMakeToolCallId:
def test_returns_string(self):
tc_id = make_tool_call_id()
assert isinstance(tc_id, str)
def test_starts_with_tc_prefix(self):
tc_id = make_tool_call_id()
assert tc_id.startswith("tc-")
def test_ids_are_unique(self):
ids = {make_tool_call_id() for _ in range(100)}
assert len(ids) == 100
# ---------------------------------------------------------------------------
# build_tool_title
# ---------------------------------------------------------------------------
class TestBuildToolTitle:
def test_terminal_title_includes_command(self):
title = build_tool_title("terminal", {"command": "ls -la /tmp"})
assert "ls -la /tmp" in title
def test_terminal_title_truncates_long_command(self):
long_cmd = "x" * 200
title = build_tool_title("terminal", {"command": long_cmd})
assert len(title) < 120
assert "..." in title
def test_read_file_title(self):
title = build_tool_title("read_file", {"path": "/etc/hosts"})
assert "/etc/hosts" in title
def test_patch_title(self):
title = build_tool_title("patch", {"path": "main.py", "mode": "replace"})
assert "main.py" in title
def test_search_title(self):
title = build_tool_title("search_files", {"pattern": "TODO"})
assert "TODO" in title
def test_web_search_title(self):
title = build_tool_title("web_search", {"query": "python asyncio"})
assert "python asyncio" in title
def test_unknown_tool_uses_name(self):
title = build_tool_title("some_new_tool", {"foo": "bar"})
assert title == "some_new_tool"
# ---------------------------------------------------------------------------
# build_tool_start
# ---------------------------------------------------------------------------
class TestBuildToolStart:
def test_build_tool_start_for_patch(self):
"""patch should produce a FileEditToolCallContent (diff)."""
args = {
"path": "src/main.py",
"old_string": "print('hello')",
"new_string": "print('world')",
}
result = build_tool_start("tc-1", "patch", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "edit"
# The first content item should be a diff
assert len(result.content) >= 1
diff_item = result.content[0]
assert isinstance(diff_item, FileEditToolCallContent)
assert diff_item.path == "src/main.py"
assert diff_item.new_text == "print('world')"
assert diff_item.old_text == "print('hello')"
def test_build_tool_start_for_write_file(self):
"""write_file should produce a FileEditToolCallContent (diff)."""
args = {"path": "new_file.py", "content": "print('hello')"}
result = build_tool_start("tc-w1", "write_file", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "edit"
assert len(result.content) >= 1
diff_item = result.content[0]
assert isinstance(diff_item, FileEditToolCallContent)
assert diff_item.path == "new_file.py"
def test_build_tool_start_for_terminal(self):
"""terminal should produce text content with the command."""
args = {"command": "ls -la /tmp"}
result = build_tool_start("tc-2", "terminal", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "execute"
assert len(result.content) >= 1
content_item = result.content[0]
assert isinstance(content_item, ContentToolCallContent)
# The wrapped text block should contain the command
text = content_item.content.text
assert "ls -la /tmp" in text
def test_build_tool_start_for_read_file(self):
"""read_file should include the path in content."""
args = {"path": "/etc/hosts", "offset": 1, "limit": 50}
result = build_tool_start("tc-3", "read_file", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "read"
assert len(result.content) >= 1
content_item = result.content[0]
assert isinstance(content_item, ContentToolCallContent)
assert "/etc/hosts" in content_item.content.text
def test_build_tool_start_for_search(self):
"""search_files should include pattern in content."""
args = {"pattern": "TODO", "target": "content"}
result = build_tool_start("tc-4", "search_files", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "search"
assert "TODO" in result.content[0].content.text
def test_build_tool_start_generic_fallback(self):
"""Unknown tools should get a generic text representation."""
args = {"foo": "bar", "baz": 42}
result = build_tool_start("tc-5", "some_tool", args)
assert isinstance(result, ToolCallStart)
assert result.kind == "other"
# ---------------------------------------------------------------------------
# build_tool_complete
# ---------------------------------------------------------------------------
class TestBuildToolComplete:
def test_build_tool_complete_for_terminal(self):
"""Completed terminal call should include output text."""
result = build_tool_complete("tc-2", "terminal", "total 42\ndrwxr-xr-x 2 root root 4096 ...")
assert isinstance(result, ToolCallProgress)
assert result.status == "completed"
assert len(result.content) >= 1
content_item = result.content[0]
assert isinstance(content_item, ContentToolCallContent)
assert "total 42" in content_item.content.text
def test_build_tool_complete_truncates_large_output(self):
"""Very large outputs should be truncated."""
big_output = "x" * 10000
result = build_tool_complete("tc-6", "read_file", big_output)
assert isinstance(result, ToolCallProgress)
display_text = result.content[0].content.text
assert len(display_text) < 6000
assert "truncated" in display_text
# ---------------------------------------------------------------------------
# extract_locations
# ---------------------------------------------------------------------------
class TestExtractLocations:
def test_extract_locations_with_path(self):
args = {"path": "src/app.py", "offset": 42}
locs = extract_locations(args)
assert len(locs) == 1
assert isinstance(locs[0], ToolCallLocation)
assert locs[0].path == "src/app.py"
assert locs[0].line == 42
def test_extract_locations_without_path(self):
args = {"command": "echo hi"}
locs = extract_locations(args)
assert locs == []

View File

@@ -224,6 +224,25 @@ TOOLSETS = {
# All platforms share the same core tools (including send_message,
# which is gated on gateway running via its check_fn).
# ==========================================================================
"hermes-acp": {
"description": "Editor integration (VS Code, Zed, JetBrains) — coding-focused tools without messaging, audio, or clarify UI",
"tools": [
"web_search", "web_extract",
"terminal", "process",
"read_file", "write_file", "patch", "search_files",
"vision_analyze",
"skills_list", "skill_view", "skill_manage",
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
"todo", "memory",
"session_search",
"execute_code", "delegate_task",
],
"includes": []
},
"hermes-cli": {
"description": "Full interactive CLI toolset - all default tools plus cronjob management",