From 25481d42863c7121180330c5abe6e50f7dd480d7 Mon Sep 17 00:00:00 2001
From: Teknium <127238744+teknium1@users.noreply.github.com>
Date: Sat, 14 Mar 2026 00:09:05 -0700
Subject: [PATCH] feat: restore ACP server implementation from PR #949 (#1254)
Restore the ACP editor-integration implementation that was present on the
original PR branch but did not actually land in main.
Includes:
- acp_adapter/ server, session manager, event bridge, auth, permissions,
and tool helpers
- hermes acp subcommand and hermes-acp entry point
- hermes-acp curated toolset
- ACP registry manifest, setup guide, and ACP test suite
- jupyter-live-kernel data science skill from the original branch
Also updates the revived ACP code for current main by:
- resolving runtime providers through the modern shared provider router
- binding ACP sessions to per-session cwd task overrides
- tracking duplicate same-name tool calls with FIFO IDs
- restoring terminal approval callbacks after prompts
- normalizing supporting docs/skill metadata
Validated with tests/acp and the full pytest suite (-n0).
---
acp_adapter/__init__.py | 1 +
acp_adapter/__main__.py | 5 +
acp_adapter/auth.py | 24 ++
acp_adapter/entry.py | 88 +++++
acp_adapter/events.py | 171 +++++++++
acp_adapter/permissions.py | 80 +++++
acp_adapter/server.py | 333 ++++++++++++++++++
acp_adapter/session.py | 203 +++++++++++
acp_adapter/tools.py | 215 +++++++++++
acp_registry/agent.json | 12 +
acp_registry/icon.svg | 25 ++
docs/acp-setup.md | 229 ++++++++++++
hermes_cli/main.py | 32 +-
pyproject.toml | 5 +-
skills/data-science/DESCRIPTION.md | 3 +
.../data-science/jupyter-live-kernel/SKILL.md | 171 +++++++++
tests/acp/__init__.py | 0
tests/acp/test_auth.py | 56 +++
tests/acp/test_events.py | 239 +++++++++++++
tests/acp/test_permissions.py | 75 ++++
tests/acp/test_server.py | 297 ++++++++++++++++
tests/acp/test_session.py | 112 ++++++
tests/acp/test_tools.py | 236 +++++++++++++
toolsets.py | 19 +
24 files changed, 2625 insertions(+), 6 deletions(-)
create mode 100644 acp_adapter/__init__.py
create mode 100644 acp_adapter/__main__.py
create mode 100644 acp_adapter/auth.py
create mode 100644 acp_adapter/entry.py
create mode 100644 acp_adapter/events.py
create mode 100644 acp_adapter/permissions.py
create mode 100644 acp_adapter/server.py
create mode 100644 acp_adapter/session.py
create mode 100644 acp_adapter/tools.py
create mode 100644 acp_registry/agent.json
create mode 100644 acp_registry/icon.svg
create mode 100644 docs/acp-setup.md
create mode 100644 skills/data-science/DESCRIPTION.md
create mode 100644 skills/data-science/jupyter-live-kernel/SKILL.md
create mode 100644 tests/acp/__init__.py
create mode 100644 tests/acp/test_auth.py
create mode 100644 tests/acp/test_events.py
create mode 100644 tests/acp/test_permissions.py
create mode 100644 tests/acp/test_server.py
create mode 100644 tests/acp/test_session.py
create mode 100644 tests/acp/test_tools.py
diff --git a/acp_adapter/__init__.py b/acp_adapter/__init__.py
new file mode 100644
index 00000000..b58a27b6
--- /dev/null
+++ b/acp_adapter/__init__.py
@@ -0,0 +1 @@
+"""ACP (Agent Communication Protocol) adapter for hermes-agent."""
diff --git a/acp_adapter/__main__.py b/acp_adapter/__main__.py
new file mode 100644
index 00000000..a6ccd099
--- /dev/null
+++ b/acp_adapter/__main__.py
@@ -0,0 +1,5 @@
+"""Allow running the ACP adapter as ``python -m acp_adapter``."""
+
+from .entry import main
+
+main()
diff --git a/acp_adapter/auth.py b/acp_adapter/auth.py
new file mode 100644
index 00000000..a33b5a93
--- /dev/null
+++ b/acp_adapter/auth.py
@@ -0,0 +1,24 @@
+"""ACP auth helpers — detect the currently configured Hermes provider."""
+
+from __future__ import annotations
+
+from typing import Optional
+
+
+def detect_provider() -> Optional[str]:
+ """Resolve the active Hermes runtime provider, or None if unavailable."""
+ try:
+ from hermes_cli.runtime_provider import resolve_runtime_provider
+ runtime = resolve_runtime_provider()
+ api_key = runtime.get("api_key")
+ provider = runtime.get("provider")
+ if isinstance(api_key, str) and api_key.strip() and isinstance(provider, str) and provider.strip():
+ return provider.strip().lower()
+ except Exception:
+ return None
+ return None
+
+
+def has_provider() -> bool:
+ """Return True if Hermes can resolve any runtime provider credentials."""
+ return detect_provider() is not None
diff --git a/acp_adapter/entry.py b/acp_adapter/entry.py
new file mode 100644
index 00000000..27948612
--- /dev/null
+++ b/acp_adapter/entry.py
@@ -0,0 +1,88 @@
+"""CLI entry point for the hermes-agent ACP adapter.
+
+Loads environment variables from ``~/.hermes/.env``, configures logging
+to write to stderr (so stdout is reserved for ACP JSON-RPC transport),
+and starts the ACP agent server.
+
+Usage::
+
+ python -m acp_adapter.entry
+ # or
+ hermes acp
+ # or
+ hermes-acp
+"""
+
+import asyncio
+import logging
+import os
+import sys
+from pathlib import Path
+
+
+def _setup_logging() -> None:
+ """Route all logging to stderr so stdout stays clean for ACP stdio."""
+ handler = logging.StreamHandler(sys.stderr)
+ handler.setFormatter(
+ logging.Formatter(
+ "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+ )
+ )
+ root = logging.getLogger()
+ root.handlers.clear()
+ root.addHandler(handler)
+ root.setLevel(logging.INFO)
+
+ # Quiet down noisy libraries
+ logging.getLogger("httpx").setLevel(logging.WARNING)
+ logging.getLogger("httpcore").setLevel(logging.WARNING)
+ logging.getLogger("openai").setLevel(logging.WARNING)
+
+
+def _load_env() -> None:
+ """Load .env from HERMES_HOME (default ``~/.hermes``)."""
+ from dotenv import load_dotenv
+
+ hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
+ env_file = hermes_home / ".env"
+ if env_file.exists():
+ try:
+ load_dotenv(dotenv_path=env_file, encoding="utf-8")
+ except UnicodeDecodeError:
+ load_dotenv(dotenv_path=env_file, encoding="latin-1")
+ logging.getLogger(__name__).info("Loaded env from %s", env_file)
+ else:
+ logging.getLogger(__name__).info(
+ "No .env found at %s, using system env", env_file
+ )
+
+
+def main() -> None:
+ """Entry point: load env, configure logging, run the ACP agent."""
+ _setup_logging()
+ _load_env()
+
+ logger = logging.getLogger(__name__)
+ logger.info("Starting hermes-agent ACP adapter")
+
+ # Ensure the project root is on sys.path so ``from run_agent import AIAgent`` works
+ project_root = str(Path(__file__).resolve().parent.parent)
+ if project_root not in sys.path:
+ sys.path.insert(0, project_root)
+
+ import acp
+ from .server import HermesACPAgent
+
+ agent = HermesACPAgent()
+ try:
+ asyncio.run(acp.run_agent(agent))
+ except KeyboardInterrupt:
+ logger.info("Shutting down (KeyboardInterrupt)")
+ except Exception:
+ logger.exception("ACP agent crashed")
+ sys.exit(1)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/acp_adapter/events.py b/acp_adapter/events.py
new file mode 100644
index 00000000..33b7ce63
--- /dev/null
+++ b/acp_adapter/events.py
@@ -0,0 +1,171 @@
+"""Callback factories for bridging AIAgent events to ACP notifications.
+
+Each factory returns a callable with the signature that AIAgent expects
+for its callbacks. Internally, the callbacks push ACP session updates
+to the client via ``conn.session_update()`` using
+``asyncio.run_coroutine_threadsafe()`` (since AIAgent runs in a worker
+thread while the event loop lives on the main thread).
+"""
+
+import asyncio
+import json
+import logging
+from collections import defaultdict, deque
+from typing import Any, Callable, Deque, Dict
+
+import acp
+
+from .tools import (
+ build_tool_complete,
+ build_tool_start,
+ make_tool_call_id,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def _send_update(
+ conn: acp.Client,
+ session_id: str,
+ loop: asyncio.AbstractEventLoop,
+ update: Any,
+) -> None:
+ """Fire-and-forget an ACP session update from a worker thread."""
+ try:
+ future = asyncio.run_coroutine_threadsafe(
+ conn.session_update(session_id, update), loop
+ )
+ future.result(timeout=5)
+ except Exception:
+ logger.debug("Failed to send ACP update", exc_info=True)
+
+
+# ------------------------------------------------------------------
+# Tool progress callback
+# ------------------------------------------------------------------
+
+def make_tool_progress_cb(
+ conn: acp.Client,
+ session_id: str,
+ loop: asyncio.AbstractEventLoop,
+ tool_call_ids: Dict[str, Deque[str]],
+) -> Callable:
+ """Create a ``tool_progress_callback`` for AIAgent.
+
+ Signature expected by AIAgent::
+
+ tool_progress_callback(name: str, preview: str, args: dict)
+
+ Emits ``ToolCallStart`` for each tool invocation and tracks IDs in a FIFO
+ queue per tool name so duplicate/parallel same-name calls still complete
+ against the correct ACP tool call.
+ """
+
+ def _tool_progress(name: str, preview: str, args: Any = None) -> None:
+ if isinstance(args, str):
+ try:
+ args = json.loads(args)
+ except (json.JSONDecodeError, TypeError):
+ args = {"raw": args}
+ if not isinstance(args, dict):
+ args = {}
+
+ tc_id = make_tool_call_id()
+ queue = tool_call_ids.get(name)
+ if queue is None:
+ queue = deque()
+ tool_call_ids[name] = queue
+ elif isinstance(queue, str):
+ queue = deque([queue])
+ tool_call_ids[name] = queue
+ queue.append(tc_id)
+
+ update = build_tool_start(tc_id, name, args)
+ _send_update(conn, session_id, loop, update)
+
+ return _tool_progress
+
+
+# ------------------------------------------------------------------
+# Thinking callback
+# ------------------------------------------------------------------
+
+def make_thinking_cb(
+ conn: acp.Client,
+ session_id: str,
+ loop: asyncio.AbstractEventLoop,
+) -> Callable:
+ """Create a ``thinking_callback`` for AIAgent."""
+
+ def _thinking(text: str) -> None:
+ if not text:
+ return
+ update = acp.update_agent_thought_text(text)
+ _send_update(conn, session_id, loop, update)
+
+ return _thinking
+
+
+# ------------------------------------------------------------------
+# Step callback
+# ------------------------------------------------------------------
+
+def make_step_cb(
+ conn: acp.Client,
+ session_id: str,
+ loop: asyncio.AbstractEventLoop,
+ tool_call_ids: Dict[str, Deque[str]],
+) -> Callable:
+ """Create a ``step_callback`` for AIAgent.
+
+ Signature expected by AIAgent::
+
+ step_callback(api_call_count: int, prev_tools: list)
+ """
+
+ def _step(api_call_count: int, prev_tools: Any = None) -> None:
+ if prev_tools and isinstance(prev_tools, list):
+ for tool_info in prev_tools:
+ tool_name = None
+ result = None
+
+ if isinstance(tool_info, dict):
+ tool_name = tool_info.get("name") or tool_info.get("function_name")
+ result = tool_info.get("result") or tool_info.get("output")
+ elif isinstance(tool_info, str):
+ tool_name = tool_info
+
+ queue = tool_call_ids.get(tool_name or "")
+ if isinstance(queue, str):
+ queue = deque([queue])
+ tool_call_ids[tool_name] = queue
+ if tool_name and queue:
+ tc_id = queue.popleft()
+ update = build_tool_complete(
+ tc_id, tool_name, result=str(result) if result is not None else None
+ )
+ _send_update(conn, session_id, loop, update)
+ if not queue:
+ tool_call_ids.pop(tool_name, None)
+
+ return _step
+
+
+# ------------------------------------------------------------------
+# Agent message callback
+# ------------------------------------------------------------------
+
+def make_message_cb(
+ conn: acp.Client,
+ session_id: str,
+ loop: asyncio.AbstractEventLoop,
+) -> Callable:
+ """Create a callback that streams agent response text to the editor."""
+
+ def _message(text: str) -> None:
+ if not text:
+ return
+ update = acp.update_agent_message_text(text)
+ _send_update(conn, session_id, loop, update)
+
+ return _message
diff --git a/acp_adapter/permissions.py b/acp_adapter/permissions.py
new file mode 100644
index 00000000..cadd16c6
--- /dev/null
+++ b/acp_adapter/permissions.py
@@ -0,0 +1,80 @@
+"""ACP permission bridging — maps ACP approval requests to hermes approval callbacks."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from concurrent.futures import TimeoutError as FutureTimeout
+from typing import Any, Callable, Optional
+
+from acp.schema import (
+ AllowedOutcome,
+ DeniedOutcome,
+ PermissionOption,
+ RequestPermissionRequest,
+ SelectedPermissionOutcome,
+)
+
+logger = logging.getLogger(__name__)
+
+# Maps ACP PermissionOptionKind -> hermes approval result strings
+_KIND_TO_HERMES = {
+ "allow_once": "once",
+ "allow_always": "always",
+ "reject_once": "deny",
+ "reject_always": "deny",
+}
+
+
+def make_approval_callback(
+ request_permission_fn: Callable,
+ loop: asyncio.AbstractEventLoop,
+ session_id: str,
+ timeout: float = 60.0,
+) -> Callable[[str, str], str]:
+ """
+ Return a hermes-compatible ``approval_callback(command, description) -> str``
+ that bridges to the ACP client's ``request_permission`` call.
+
+ Args:
+ request_permission_fn: The ACP connection's ``request_permission`` coroutine.
+ loop: The event loop on which the ACP connection lives.
+ session_id: Current ACP session id.
+ timeout: Seconds to wait for a response before auto-denying.
+ """
+
+ def _callback(command: str, description: str) -> str:
+ options = [
+ PermissionOption(option_id="allow_once", kind="allow_once", name="Allow once"),
+ PermissionOption(option_id="allow_always", kind="allow_always", name="Allow always"),
+ PermissionOption(option_id="deny", kind="reject_once", name="Deny"),
+ ]
+ import acp as _acp
+
+ tool_call = _acp.start_tool_call("perm-check", command, kind="execute")
+
+ coro = request_permission_fn(
+ session_id=session_id,
+ tool_call=tool_call,
+ options=options,
+ )
+
+ try:
+ future = asyncio.run_coroutine_threadsafe(coro, loop)
+ response = future.result(timeout=timeout)
+ except (FutureTimeout, Exception) as exc:
+ logger.warning("Permission request timed out or failed: %s", exc)
+ return "deny"
+
+ outcome = response.outcome
+ if isinstance(outcome, AllowedOutcome):
+ option_id = outcome.option_id
+ # Look up the kind from our options list
+ for opt in options:
+ if opt.option_id == option_id:
+ return _KIND_TO_HERMES.get(opt.kind, "deny")
+ return "once" # fallback for unknown option_id
+ else:
+ return "deny"
+
+ return _callback
diff --git a/acp_adapter/server.py b/acp_adapter/server.py
new file mode 100644
index 00000000..6e8ec3b4
--- /dev/null
+++ b/acp_adapter/server.py
@@ -0,0 +1,333 @@
+"""ACP agent server — exposes Hermes Agent via the Agent Client Protocol."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from collections import defaultdict, deque
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, Deque, Optional
+
+import acp
+from acp.schema import (
+ AgentCapabilities,
+ AuthenticateResponse,
+ AuthMethod,
+ ClientCapabilities,
+ EmbeddedResourceContentBlock,
+ ForkSessionResponse,
+ ImageContentBlock,
+ AudioContentBlock,
+ Implementation,
+ InitializeResponse,
+ ListSessionsResponse,
+ LoadSessionResponse,
+ NewSessionResponse,
+ PromptResponse,
+ ResumeSessionResponse,
+ ResourceContentBlock,
+ SessionCapabilities,
+ SessionForkCapabilities,
+ SessionListCapabilities,
+ SessionInfo,
+ TextContentBlock,
+ Usage,
+)
+
+from acp_adapter.auth import detect_provider, has_provider
+from acp_adapter.events import (
+ make_message_cb,
+ make_step_cb,
+ make_thinking_cb,
+ make_tool_progress_cb,
+)
+from acp_adapter.permissions import make_approval_callback
+from acp_adapter.session import SessionManager
+
+logger = logging.getLogger(__name__)
+
+try:
+ from hermes_cli import __version__ as HERMES_VERSION
+except Exception:
+ HERMES_VERSION = "0.0.0"
+
+# Thread pool for running AIAgent (synchronous) in parallel.
+_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="acp-agent")
+
+
+def _extract_text(
+ prompt: list[
+ TextContentBlock
+ | ImageContentBlock
+ | AudioContentBlock
+ | ResourceContentBlock
+ | EmbeddedResourceContentBlock
+ ],
+) -> str:
+ """Extract plain text from ACP content blocks."""
+ parts: list[str] = []
+ for block in prompt:
+ if isinstance(block, TextContentBlock):
+ parts.append(block.text)
+ elif hasattr(block, "text"):
+ parts.append(str(block.text))
+ # Non-text blocks are ignored for now.
+ return "\n".join(parts)
+
+
+class HermesACPAgent(acp.Agent):
+ """ACP Agent implementation wrapping Hermes AIAgent."""
+
+ def __init__(self, session_manager: SessionManager | None = None):
+ super().__init__()
+ self.session_manager = session_manager or SessionManager()
+ self._conn: Optional[acp.Client] = None
+
+ # ---- Connection lifecycle -----------------------------------------------
+
+ def on_connect(self, conn: acp.Client) -> None:
+ """Store the client connection for sending session updates."""
+ self._conn = conn
+ logger.info("ACP client connected")
+
+ # ---- ACP lifecycle ------------------------------------------------------
+
+ async def initialize(
+ self,
+ protocol_version: int,
+ client_capabilities: ClientCapabilities | None = None,
+ client_info: Implementation | None = None,
+ **kwargs: Any,
+ ) -> InitializeResponse:
+ provider = detect_provider()
+ auth_methods = None
+ if provider:
+ auth_methods = [
+ AuthMethod(
+ id=provider,
+ name=f"{provider} runtime credentials",
+ description=f"Authenticate Hermes using the currently configured {provider} runtime credentials.",
+ )
+ ]
+
+ client_name = client_info.name if client_info else "unknown"
+ logger.info("Initialize from %s (protocol v%s)", client_name, protocol_version)
+
+ return InitializeResponse(
+ protocol_version=acp.PROTOCOL_VERSION,
+ agent_info=Implementation(name="hermes-agent", version=HERMES_VERSION),
+ agent_capabilities=AgentCapabilities(
+ session_capabilities=SessionCapabilities(
+ fork=SessionForkCapabilities(),
+ list=SessionListCapabilities(),
+ ),
+ ),
+ auth_methods=auth_methods,
+ )
+
+ async def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None:
+ if has_provider():
+ return AuthenticateResponse()
+ return None
+
+ # ---- Session management -------------------------------------------------
+
+ async def new_session(
+ self,
+ cwd: str,
+ mcp_servers: list | None = None,
+ **kwargs: Any,
+ ) -> NewSessionResponse:
+ state = self.session_manager.create_session(cwd=cwd)
+ logger.info("New session %s (cwd=%s)", state.session_id, cwd)
+ return NewSessionResponse(session_id=state.session_id)
+
+ async def load_session(
+ self,
+ cwd: str,
+ session_id: str,
+ mcp_servers: list | None = None,
+ **kwargs: Any,
+ ) -> LoadSessionResponse | None:
+ state = self.session_manager.update_cwd(session_id, cwd)
+ if state is None:
+ logger.warning("load_session: session %s not found", session_id)
+ return None
+ logger.info("Loaded session %s", session_id)
+ return LoadSessionResponse()
+
+ async def resume_session(
+ self,
+ cwd: str,
+ session_id: str,
+ mcp_servers: list | None = None,
+ **kwargs: Any,
+ ) -> ResumeSessionResponse:
+ state = self.session_manager.update_cwd(session_id, cwd)
+ if state is None:
+ logger.warning("resume_session: session %s not found, creating new", session_id)
+ state = self.session_manager.create_session(cwd=cwd)
+ logger.info("Resumed session %s", state.session_id)
+ return ResumeSessionResponse()
+
+ async def cancel(self, session_id: str, **kwargs: Any) -> None:
+ state = self.session_manager.get_session(session_id)
+ if state and state.cancel_event:
+ state.cancel_event.set()
+ try:
+ if getattr(state, "agent", None) and hasattr(state.agent, "interrupt"):
+ state.agent.interrupt()
+ except Exception:
+ logger.debug("Failed to interrupt ACP session %s", session_id, exc_info=True)
+ logger.info("Cancelled session %s", session_id)
+
+ async def fork_session(
+ self,
+ cwd: str,
+ session_id: str,
+ mcp_servers: list | None = None,
+ **kwargs: Any,
+ ) -> ForkSessionResponse:
+ state = self.session_manager.fork_session(session_id, cwd=cwd)
+ new_id = state.session_id if state else ""
+ logger.info("Forked session %s -> %s", session_id, new_id)
+ return ForkSessionResponse(session_id=new_id)
+
+ async def list_sessions(
+ self,
+ cursor: str | None = None,
+ cwd: str | None = None,
+ **kwargs: Any,
+ ) -> ListSessionsResponse:
+ infos = self.session_manager.list_sessions()
+ sessions = [
+ SessionInfo(session_id=s["session_id"], cwd=s["cwd"])
+ for s in infos
+ ]
+ return ListSessionsResponse(sessions=sessions)
+
+ # ---- Prompt (core) ------------------------------------------------------
+
+ async def prompt(
+ self,
+ prompt: list[
+ TextContentBlock
+ | ImageContentBlock
+ | AudioContentBlock
+ | ResourceContentBlock
+ | EmbeddedResourceContentBlock
+ ],
+ session_id: str,
+ **kwargs: Any,
+ ) -> PromptResponse:
+ """Run Hermes on the user's prompt and stream events back to the editor."""
+ state = self.session_manager.get_session(session_id)
+ if state is None:
+ logger.error("prompt: session %s not found", session_id)
+ return PromptResponse(stop_reason="refusal")
+
+ user_text = _extract_text(prompt)
+ if not user_text.strip():
+ return PromptResponse(stop_reason="end_turn")
+
+ logger.info("Prompt on session %s: %s", session_id, user_text[:100])
+
+ conn = self._conn
+ loop = asyncio.get_running_loop()
+
+ if state.cancel_event:
+ state.cancel_event.clear()
+
+ tool_call_ids: dict[str, Deque[str]] = defaultdict(deque)
+ previous_approval_cb = None
+
+ if conn:
+ tool_progress_cb = make_tool_progress_cb(conn, session_id, loop, tool_call_ids)
+ thinking_cb = make_thinking_cb(conn, session_id, loop)
+ step_cb = make_step_cb(conn, session_id, loop, tool_call_ids)
+ message_cb = make_message_cb(conn, session_id, loop)
+ approval_cb = make_approval_callback(conn.request_permission, loop, session_id)
+ else:
+ tool_progress_cb = None
+ thinking_cb = None
+ step_cb = None
+ message_cb = None
+ approval_cb = None
+
+ agent = state.agent
+ agent.tool_progress_callback = tool_progress_cb
+ agent.thinking_callback = thinking_cb
+ agent.step_callback = step_cb
+ agent.message_callback = message_cb
+
+ if approval_cb:
+ try:
+ from tools import terminal_tool as _terminal_tool
+ previous_approval_cb = getattr(_terminal_tool, "_approval_callback", None)
+ _terminal_tool.set_approval_callback(approval_cb)
+ except Exception:
+ logger.debug("Could not set ACP approval callback", exc_info=True)
+
+ def _run_agent() -> dict:
+ try:
+ result = agent.run_conversation(
+ user_message=user_text,
+ conversation_history=state.history,
+ task_id=session_id,
+ )
+ return result
+ except Exception as e:
+ logger.exception("Agent error in session %s", session_id)
+ return {"final_response": f"Error: {e}", "messages": state.history}
+ finally:
+ if approval_cb:
+ try:
+ from tools import terminal_tool as _terminal_tool
+ _terminal_tool.set_approval_callback(previous_approval_cb)
+ except Exception:
+ logger.debug("Could not restore approval callback", exc_info=True)
+
+ try:
+ result = await loop.run_in_executor(_executor, _run_agent)
+ except Exception:
+ logger.exception("Executor error for session %s", session_id)
+ return PromptResponse(stop_reason="end_turn")
+
+ if result.get("messages"):
+ state.history = result["messages"]
+
+ final_response = result.get("final_response", "")
+ if final_response and conn:
+ update = acp.update_agent_message_text(final_response)
+ await conn.session_update(session_id, update)
+
+ usage = None
+ usage_data = result.get("usage")
+ if usage_data and isinstance(usage_data, dict):
+ usage = Usage(
+ input_tokens=usage_data.get("prompt_tokens", 0),
+ output_tokens=usage_data.get("completion_tokens", 0),
+ total_tokens=usage_data.get("total_tokens", 0),
+ thought_tokens=usage_data.get("reasoning_tokens"),
+ cached_read_tokens=usage_data.get("cached_tokens"),
+ )
+
+ stop_reason = "cancelled" if state.cancel_event and state.cancel_event.is_set() else "end_turn"
+ return PromptResponse(stop_reason=stop_reason, usage=usage)
+
+ # ---- Model switching ----------------------------------------------------
+
+ async def set_session_model(
+ self, model_id: str, session_id: str, **kwargs: Any
+ ):
+ """Switch the model for a session."""
+ state = self.session_manager.get_session(session_id)
+ if state:
+ state.model = model_id
+ state.agent = self.session_manager._make_agent(
+ session_id=session_id,
+ cwd=state.cwd,
+ model=model_id,
+ )
+ logger.info("Session %s: model switched to %s", session_id, model_id)
+ return None
diff --git a/acp_adapter/session.py b/acp_adapter/session.py
new file mode 100644
index 00000000..8590a62e
--- /dev/null
+++ b/acp_adapter/session.py
@@ -0,0 +1,203 @@
+"""ACP session manager — maps ACP sessions to Hermes AIAgent instances."""
+from __future__ import annotations
+
+import copy
+import logging
+import uuid
+from dataclasses import dataclass, field
+from threading import Lock
+from typing import Any, Dict, List, Optional
+
+logger = logging.getLogger(__name__)
+
+
+def _register_task_cwd(task_id: str, cwd: str) -> None:
+ """Bind a task/session id to the editor's working directory for tools."""
+ if not task_id:
+ return
+ try:
+ from tools.terminal_tool import register_task_env_overrides
+ register_task_env_overrides(task_id, {"cwd": cwd})
+ except Exception:
+ logger.debug("Failed to register ACP task cwd override", exc_info=True)
+
+
+def _clear_task_cwd(task_id: str) -> None:
+ """Remove task-specific cwd overrides for an ACP session."""
+ if not task_id:
+ return
+ try:
+ from tools.terminal_tool import clear_task_env_overrides
+ clear_task_env_overrides(task_id)
+ except Exception:
+ logger.debug("Failed to clear ACP task cwd override", exc_info=True)
+
+
+@dataclass
+class SessionState:
+ """Tracks per-session state for an ACP-managed Hermes agent."""
+
+ session_id: str
+ agent: Any # AIAgent instance
+ cwd: str = "."
+ model: str = ""
+ history: List[Dict[str, Any]] = field(default_factory=list)
+ cancel_event: Any = None # threading.Event
+
+
+class SessionManager:
+ """Thread-safe manager for ACP sessions backed by Hermes AIAgent instances."""
+
+ def __init__(self, agent_factory=None):
+ """
+ Args:
+ agent_factory: Optional callable that creates an AIAgent-like object.
+ Used by tests. When omitted, a real AIAgent is created
+ using the current Hermes runtime provider configuration.
+ """
+ self._sessions: Dict[str, SessionState] = {}
+ self._lock = Lock()
+ self._agent_factory = agent_factory
+
+ # ---- public API ---------------------------------------------------------
+
+ def create_session(self, cwd: str = ".") -> SessionState:
+ """Create a new session with a unique ID and a fresh AIAgent."""
+ import threading
+
+ session_id = str(uuid.uuid4())
+ agent = self._make_agent(session_id=session_id, cwd=cwd)
+ state = SessionState(
+ session_id=session_id,
+ agent=agent,
+ cwd=cwd,
+ model=getattr(agent, "model", "") or "",
+ cancel_event=threading.Event(),
+ )
+ with self._lock:
+ self._sessions[session_id] = state
+ _register_task_cwd(session_id, cwd)
+ logger.info("Created ACP session %s (cwd=%s)", session_id, cwd)
+ return state
+
+ def get_session(self, session_id: str) -> Optional[SessionState]:
+ """Return the session for *session_id*, or ``None``."""
+ with self._lock:
+ return self._sessions.get(session_id)
+
+ def remove_session(self, session_id: str) -> bool:
+ """Remove a session. Returns True if it existed."""
+ with self._lock:
+ existed = self._sessions.pop(session_id, None) is not None
+ if existed:
+ _clear_task_cwd(session_id)
+ return existed
+
+ def fork_session(self, session_id: str, cwd: str = ".") -> Optional[SessionState]:
+ """Deep-copy a session's history into a new session."""
+ import threading
+
+ with self._lock:
+ original = self._sessions.get(session_id)
+ if original is None:
+ return None
+
+ new_id = str(uuid.uuid4())
+ agent = self._make_agent(
+ session_id=new_id,
+ cwd=cwd,
+ model=original.model or None,
+ )
+ state = SessionState(
+ session_id=new_id,
+ agent=agent,
+ cwd=cwd,
+ model=getattr(agent, "model", original.model) or original.model,
+ history=copy.deepcopy(original.history),
+ cancel_event=threading.Event(),
+ )
+ self._sessions[new_id] = state
+ _register_task_cwd(new_id, cwd)
+ logger.info("Forked ACP session %s -> %s", session_id, new_id)
+ return state
+
+ def list_sessions(self) -> List[Dict[str, Any]]:
+ """Return lightweight info dicts for all sessions."""
+ with self._lock:
+ return [
+ {
+ "session_id": s.session_id,
+ "cwd": s.cwd,
+ "model": s.model,
+ "history_len": len(s.history),
+ }
+ for s in self._sessions.values()
+ ]
+
+ def update_cwd(self, session_id: str, cwd: str) -> Optional[SessionState]:
+ """Update the working directory for a session and its tool overrides."""
+ with self._lock:
+ state = self._sessions.get(session_id)
+ if state is None:
+ return None
+ state.cwd = cwd
+ _register_task_cwd(session_id, cwd)
+ return state
+
+ def cleanup(self) -> None:
+ """Remove all sessions and clear task-specific cwd overrides."""
+ with self._lock:
+ session_ids = list(self._sessions.keys())
+ self._sessions.clear()
+ for session_id in session_ids:
+ _clear_task_cwd(session_id)
+
+ # ---- internal -----------------------------------------------------------
+
+ def _make_agent(
+ self,
+ *,
+ session_id: str,
+ cwd: str,
+ model: str | None = None,
+ ):
+ if self._agent_factory is not None:
+ return self._agent_factory()
+
+ from run_agent import AIAgent
+ from hermes_cli.config import load_config
+ from hermes_cli.runtime_provider import resolve_runtime_provider
+
+ config = load_config()
+ model_cfg = config.get("model")
+ default_model = "anthropic/claude-opus-4.6"
+ requested_provider = None
+ if isinstance(model_cfg, dict):
+ default_model = str(model_cfg.get("default") or default_model)
+ requested_provider = model_cfg.get("provider")
+ elif isinstance(model_cfg, str) and model_cfg.strip():
+ default_model = model_cfg.strip()
+
+ kwargs = {
+ "platform": "acp",
+ "enabled_toolsets": ["hermes-acp"],
+ "quiet_mode": True,
+ "session_id": session_id,
+ "model": model or default_model,
+ }
+
+ try:
+ runtime = resolve_runtime_provider(requested=requested_provider)
+ kwargs.update(
+ {
+ "provider": runtime.get("provider"),
+ "api_mode": runtime.get("api_mode"),
+ "base_url": runtime.get("base_url"),
+ "api_key": runtime.get("api_key"),
+ }
+ )
+ except Exception:
+ logger.debug("ACP session falling back to default provider resolution", exc_info=True)
+
+ _register_task_cwd(session_id, cwd)
+ return AIAgent(**kwargs)
diff --git a/acp_adapter/tools.py b/acp_adapter/tools.py
new file mode 100644
index 00000000..8756aa92
--- /dev/null
+++ b/acp_adapter/tools.py
@@ -0,0 +1,215 @@
+"""ACP tool-call helpers for mapping hermes tools to ACP ToolKind and building content."""
+
+from __future__ import annotations
+
+import uuid
+from typing import Any, Dict, List, Optional
+
+import acp
+from acp.schema import (
+ ToolCallLocation,
+ ToolCallStart,
+ ToolCallProgress,
+ ToolKind,
+)
+
+# ---------------------------------------------------------------------------
+# Map hermes tool names -> ACP ToolKind
+# ---------------------------------------------------------------------------
+
+TOOL_KIND_MAP: Dict[str, ToolKind] = {
+ # File operations
+ "read_file": "read",
+ "write_file": "edit",
+ "patch": "edit",
+ "search_files": "search",
+ # Terminal / execution
+ "terminal": "execute",
+ "process": "execute",
+ "execute_code": "execute",
+ # Web / fetch
+ "web_search": "fetch",
+ "web_extract": "fetch",
+ # Browser
+ "browser_navigate": "fetch",
+ "browser_click": "execute",
+ "browser_type": "execute",
+ "browser_snapshot": "read",
+ "browser_vision": "read",
+ "browser_scroll": "execute",
+ "browser_press": "execute",
+ "browser_back": "execute",
+ "browser_close": "execute",
+ "browser_get_images": "read",
+ # Agent internals
+ "delegate_task": "execute",
+ "vision_analyze": "read",
+ "image_generate": "execute",
+ "text_to_speech": "execute",
+ # Thinking / meta
+ "_thinking": "think",
+}
+
+
+def get_tool_kind(tool_name: str) -> ToolKind:
+ """Return the ACP ToolKind for a hermes tool, defaulting to 'other'."""
+ return TOOL_KIND_MAP.get(tool_name, "other")
+
+
+def make_tool_call_id() -> str:
+ """Generate a unique tool call ID."""
+ return f"tc-{uuid.uuid4().hex[:12]}"
+
+
+def build_tool_title(tool_name: str, args: Dict[str, Any]) -> str:
+ """Build a human-readable title for a tool call."""
+ if tool_name == "terminal":
+ cmd = args.get("command", "")
+ if len(cmd) > 80:
+ cmd = cmd[:77] + "..."
+ return f"terminal: {cmd}"
+ if tool_name == "read_file":
+ return f"read: {args.get('path', '?')}"
+ if tool_name == "write_file":
+ return f"write: {args.get('path', '?')}"
+ if tool_name == "patch":
+ mode = args.get("mode", "replace")
+ path = args.get("path", "?")
+ return f"patch ({mode}): {path}"
+ if tool_name == "search_files":
+ return f"search: {args.get('pattern', '?')}"
+ if tool_name == "web_search":
+ return f"web search: {args.get('query', '?')}"
+ if tool_name == "web_extract":
+ urls = args.get("urls", [])
+ if urls:
+ return f"extract: {urls[0]}" + (f" (+{len(urls)-1})" if len(urls) > 1 else "")
+ return "web extract"
+ if tool_name == "delegate_task":
+ goal = args.get("goal", "")
+ if goal and len(goal) > 60:
+ goal = goal[:57] + "..."
+ return f"delegate: {goal}" if goal else "delegate task"
+ if tool_name == "execute_code":
+ return "execute code"
+ if tool_name == "vision_analyze":
+ return f"analyze image: {args.get('question', '?')[:50]}"
+ return tool_name
+
+
+# ---------------------------------------------------------------------------
+# Build ACP content objects for tool-call events
+# ---------------------------------------------------------------------------
+
+
+def build_tool_start(
+ tool_call_id: str,
+ tool_name: str,
+ arguments: Dict[str, Any],
+) -> ToolCallStart:
+ """Create a ToolCallStart event for the given hermes tool invocation."""
+ kind = get_tool_kind(tool_name)
+ title = build_tool_title(tool_name, arguments)
+ locations = extract_locations(arguments)
+
+ if tool_name == "patch":
+ mode = arguments.get("mode", "replace")
+ if mode == "replace":
+ path = arguments.get("path", "")
+ old = arguments.get("old_string", "")
+ new = arguments.get("new_string", "")
+ content = [acp.tool_diff_content(path=path, new_text=new, old_text=old)]
+ else:
+ # Patch mode — show the patch content as text
+ patch_text = arguments.get("patch", "")
+ content = [acp.tool_content(acp.text_block(patch_text))]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+ if tool_name == "write_file":
+ path = arguments.get("path", "")
+ file_content = arguments.get("content", "")
+ content = [acp.tool_diff_content(path=path, new_text=file_content)]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+ if tool_name == "terminal":
+ command = arguments.get("command", "")
+ content = [acp.tool_content(acp.text_block(f"$ {command}"))]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+ if tool_name == "read_file":
+ path = arguments.get("path", "")
+ content = [acp.tool_content(acp.text_block(f"Reading {path}"))]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+ if tool_name == "search_files":
+ pattern = arguments.get("pattern", "")
+ target = arguments.get("target", "content")
+ content = [acp.tool_content(acp.text_block(f"Searching for '{pattern}' ({target})"))]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+ # Generic fallback
+ import json
+ try:
+ args_text = json.dumps(arguments, indent=2, default=str)
+ except (TypeError, ValueError):
+ args_text = str(arguments)
+ content = [acp.tool_content(acp.text_block(args_text))]
+ return acp.start_tool_call(
+ tool_call_id, title, kind=kind, content=content, locations=locations,
+ raw_input=arguments,
+ )
+
+
+def build_tool_complete(
+ tool_call_id: str,
+ tool_name: str,
+ result: Optional[str] = None,
+) -> ToolCallProgress:
+ """Create a ToolCallUpdate (progress) event for a completed tool call."""
+ kind = get_tool_kind(tool_name)
+
+ # Truncate very large results for the UI
+ display_result = result or ""
+ if len(display_result) > 5000:
+ display_result = display_result[:4900] + f"\n... ({len(result)} chars total, truncated)"
+
+ content = [acp.tool_content(acp.text_block(display_result))]
+ return acp.update_tool_call(
+ tool_call_id,
+ kind=kind,
+ status="completed",
+ content=content,
+ raw_output=result,
+ )
+
+
+# ---------------------------------------------------------------------------
+# Location extraction
+# ---------------------------------------------------------------------------
+
+
+def extract_locations(
+ arguments: Dict[str, Any],
+) -> List[ToolCallLocation]:
+ """Extract file-system locations from tool arguments."""
+ locations: List[ToolCallLocation] = []
+ path = arguments.get("path")
+ if path:
+ line = arguments.get("offset") or arguments.get("line")
+ locations.append(ToolCallLocation(path=path, line=line))
+ return locations
diff --git a/acp_registry/agent.json b/acp_registry/agent.json
new file mode 100644
index 00000000..492a8444
--- /dev/null
+++ b/acp_registry/agent.json
@@ -0,0 +1,12 @@
+{
+ "schema_version": 1,
+ "name": "hermes-agent",
+ "display_name": "Hermes Agent",
+ "description": "AI agent by Nous Research with 90+ tools, persistent memory, and multi-platform support",
+ "icon": "icon.svg",
+ "distribution": {
+ "type": "command",
+ "command": "hermes",
+ "args": ["acp"]
+ }
+}
diff --git a/acp_registry/icon.svg b/acp_registry/icon.svg
new file mode 100644
index 00000000..fc08ec05
--- /dev/null
+++ b/acp_registry/icon.svg
@@ -0,0 +1,25 @@
+
diff --git a/docs/acp-setup.md b/docs/acp-setup.md
new file mode 100644
index 00000000..c5f7fec1
--- /dev/null
+++ b/docs/acp-setup.md
@@ -0,0 +1,229 @@
+# Hermes Agent — ACP (Agent Client Protocol) Setup Guide
+
+Hermes Agent supports the **Agent Client Protocol (ACP)**, allowing it to run as
+a coding agent inside your editor. ACP lets your IDE send tasks to Hermes, and
+Hermes responds with file edits, terminal commands, and explanations — all shown
+natively in the editor UI.
+
+---
+
+## Prerequisites
+
+- Hermes Agent installed and configured (`hermes setup` completed)
+- An API key / provider set up in `~/.hermes/.env` or via `hermes login`
+- Python 3.11+
+
+Install the ACP extra:
+
+```bash
+pip install -e ".[acp]"
+```
+
+---
+
+## VS Code Setup
+
+### 1. Install the ACP Client extension
+
+Open VS Code and install **ACP Client** from the marketplace:
+
+- Press `Ctrl+Shift+X` (or `Cmd+Shift+X` on macOS)
+- Search for **"ACP Client"**
+- Click **Install**
+
+Or install from the command line:
+
+```bash
+code --install-extension anysphere.acp-client
+```
+
+### 2. Configure settings.json
+
+Open your VS Code settings (`Ctrl+,` → click the `{}` icon for JSON) and add:
+
+```json
+{
+ "acpClient.agents": [
+ {
+ "name": "hermes-agent",
+ "registryDir": "/path/to/hermes-agent/acp_registry"
+ }
+ ]
+}
+```
+
+Replace `/path/to/hermes-agent` with the actual path to your Hermes Agent
+installation (e.g. `~/.hermes/hermes-agent`).
+
+Alternatively, if `hermes` is on your PATH, the ACP Client can discover it
+automatically via the registry directory.
+
+### 3. Restart VS Code
+
+After configuring, restart VS Code. You should see **Hermes Agent** appear in
+the ACP agent picker in the chat/agent panel.
+
+---
+
+## Zed Setup
+
+Zed has built-in ACP support.
+
+### 1. Configure Zed settings
+
+Open Zed settings (`Cmd+,` on macOS or `Ctrl+,` on Linux) and add to your
+`settings.json`:
+
+```json
+{
+ "acp": {
+ "agents": [
+ {
+ "name": "hermes-agent",
+ "registry_dir": "/path/to/hermes-agent/acp_registry"
+ }
+ ]
+ }
+}
+```
+
+### 2. Restart Zed
+
+Hermes Agent will appear in the agent panel. Select it and start a conversation.
+
+---
+
+## JetBrains Setup (IntelliJ, PyCharm, WebStorm, etc.)
+
+### 1. Install the ACP plugin
+
+- Open **Settings** → **Plugins** → **Marketplace**
+- Search for **"ACP"** or **"Agent Client Protocol"**
+- Install and restart the IDE
+
+### 2. Configure the agent
+
+- Open **Settings** → **Tools** → **ACP Agents**
+- Click **+** to add a new agent
+- Set the registry directory to your `acp_registry/` folder:
+ `/path/to/hermes-agent/acp_registry`
+- Click **OK**
+
+### 3. Use the agent
+
+Open the ACP panel (usually in the right sidebar) and select **Hermes Agent**.
+
+---
+
+## What You Will See
+
+Once connected, your editor provides a native interface to Hermes Agent:
+
+### Chat Panel
+A conversational interface where you can describe tasks, ask questions, and
+give instructions. Hermes responds with explanations and actions.
+
+### File Diffs
+When Hermes edits files, you see standard diffs in the editor. You can:
+- **Accept** individual changes
+- **Reject** changes you don't want
+- **Review** the full diff before applying
+
+### Terminal Commands
+When Hermes needs to run shell commands (builds, tests, installs), the editor
+shows them in an integrated terminal. Depending on your settings:
+- Commands may run automatically
+- Or you may be prompted to **approve** each command
+
+### Approval Flow
+For potentially destructive operations, the editor will prompt you for
+approval before Hermes proceeds. This includes:
+- File deletions
+- Shell commands
+- Git operations
+
+---
+
+## Configuration
+
+Hermes Agent under ACP uses the **same configuration** as the CLI:
+
+- **API keys / providers**: `~/.hermes/.env`
+- **Agent config**: `~/.hermes/config.yaml`
+- **Skills**: `~/.hermes/skills/`
+- **Sessions**: `~/.hermes/state.db`
+
+You can run `hermes setup` to configure providers, or edit `~/.hermes/.env`
+directly.
+
+### Changing the model
+
+Edit `~/.hermes/config.yaml`:
+
+```yaml
+model: openrouter/nous/hermes-3-llama-3.1-70b
+```
+
+Or set the `HERMES_MODEL` environment variable.
+
+### Toolsets
+
+ACP sessions use the curated `hermes-acp` toolset by default. It is designed for editor workflows and intentionally excludes things like messaging delivery, cronjob management, and audio-first UX features.
+
+---
+
+## Troubleshooting
+
+### Agent doesn't appear in the editor
+
+1. **Check the registry path** — make sure the `acp_registry/` directory path
+ in your editor settings is correct and contains `agent.json`.
+2. **Check `hermes` is on PATH** — run `which hermes` in a terminal. If not
+ found, you may need to activate your virtualenv or add it to PATH.
+3. **Restart the editor** after changing settings.
+
+### Agent starts but errors immediately
+
+1. Run `hermes doctor` to check your configuration.
+2. Check that you have a valid API key: `hermes status`
+3. Try running `hermes acp` directly in a terminal to see error output.
+
+### "Module not found" errors
+
+Make sure you installed the ACP extra:
+
+```bash
+pip install -e ".[acp]"
+```
+
+### Slow responses
+
+- ACP streams responses, so you should see incremental output. If the agent
+ appears stuck, check your network connection and API provider status.
+- Some providers have rate limits. Try switching to a different model/provider.
+
+### Permission denied for terminal commands
+
+If the editor blocks terminal commands, check your ACP Client extension
+settings for auto-approval or manual-approval preferences.
+
+### Logs
+
+Hermes logs are written to stderr when running in ACP mode. Check:
+- VS Code: **Output** panel → select **ACP Client** or **Hermes Agent**
+- Zed: **View** → **Toggle Terminal** and check the process output
+- JetBrains: **Event Log** or the ACP tool window
+
+You can also enable verbose logging:
+
+```bash
+HERMES_LOG_LEVEL=DEBUG hermes acp
+```
+
+---
+
+## Further Reading
+
+- [ACP Specification](https://github.com/anysphere/acp)
+- [Hermes Agent Documentation](https://github.com/NousResearch/hermes-agent)
+- Run `hermes --help` for all CLI options
diff --git a/hermes_cli/main.py b/hermes_cli/main.py
index 4f83933d..539c2101 100644
--- a/hermes_cli/main.py
+++ b/hermes_cli/main.py
@@ -34,11 +34,12 @@ Usage:
hermes honcho identity # Show AI peer identity representation
hermes honcho identity # Seed AI peer identity from a file (SOUL.md etc.)
hermes honcho migrate # Step-by-step migration guide: OpenClaw native → Hermes + Honcho
- hermes version # Show version
- hermes update # Update to latest version
- hermes uninstall # Uninstall Hermes Agent
- hermes sessions browse # Interactive session picker with search
- hermes claw migrate # Migrate from OpenClaw to Hermes
+ hermes version Show version
+ hermes update Update to latest version
+ hermes uninstall Uninstall Hermes Agent
+ hermes acp Run as an ACP server for editor integration
+ hermes sessions browse Interactive session picker with search
+
hermes claw migrate --dry-run # Preview migration without changes
"""
@@ -3102,6 +3103,27 @@ For more help on a command:
help="Skip confirmation prompts"
)
uninstall_parser.set_defaults(func=cmd_uninstall)
+
+ # =========================================================================
+ # acp command
+ # =========================================================================
+ acp_parser = subparsers.add_parser(
+ "acp",
+ help="Run Hermes Agent as an ACP (Agent Client Protocol) server",
+ description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)",
+ )
+
+ def cmd_acp(args):
+ """Launch Hermes Agent as an ACP server."""
+ try:
+ from acp_adapter.entry import main as acp_main
+ acp_main()
+ except ImportError:
+ print("ACP dependencies not installed.")
+ print("Install them with: pip install -e '.[acp]'")
+ sys.exit(1)
+
+ acp_parser.set_defaults(func=cmd_acp)
# =========================================================================
# Parse and execute
diff --git a/pyproject.toml b/pyproject.toml
index 54e75c95..7e419772 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -55,6 +55,7 @@ pty = [
honcho = ["honcho-ai>=2.0.1"]
mcp = ["mcp>=1.2.0"]
homeassistant = ["aiohttp>=3.9.0"]
+acp = ["agent-client-protocol>=0.8.1,<1.0"]
rl = [
"atroposlib @ git+https://github.com/NousResearch/atropos.git",
"tinker @ git+https://github.com/thinking-machines-lab/tinker.git",
@@ -76,17 +77,19 @@ all = [
"hermes-agent[honcho]",
"hermes-agent[mcp]",
"hermes-agent[homeassistant]",
+ "hermes-agent[acp]",
]
[project.scripts]
hermes = "hermes_cli.main:main"
hermes-agent = "run_agent:main"
+hermes-acp = "acp_adapter.entry:main"
[tool.setuptools]
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_constants", "hermes_state", "hermes_time", "mini_swe_runner", "rl_cli", "utils"]
[tool.setuptools.packages.find]
-include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "cron", "honcho_integration"]
+include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "cron", "honcho_integration", "acp_adapter"]
[tool.pytest.ini_options]
testpaths = ["tests"]
diff --git a/skills/data-science/DESCRIPTION.md b/skills/data-science/DESCRIPTION.md
new file mode 100644
index 00000000..0236b261
--- /dev/null
+++ b/skills/data-science/DESCRIPTION.md
@@ -0,0 +1,3 @@
+---
+description: Skills for data science workflows — interactive exploration, Jupyter notebooks, data analysis, and visualization.
+---
diff --git a/skills/data-science/jupyter-live-kernel/SKILL.md b/skills/data-science/jupyter-live-kernel/SKILL.md
new file mode 100644
index 00000000..984cd9e8
--- /dev/null
+++ b/skills/data-science/jupyter-live-kernel/SKILL.md
@@ -0,0 +1,171 @@
+---
+name: jupyter-live-kernel
+description: >
+ Use a live Jupyter kernel for stateful, iterative Python execution via hamelnb.
+ Load this skill when the task involves exploration, iteration, or inspecting
+ intermediate results — data science, ML experimentation, API exploration, or
+ building up complex code step-by-step. Uses terminal to run CLI commands against
+ a live Jupyter kernel. No new tools required.
+version: 1.0.0
+author: Hermes Agent
+license: MIT
+metadata:
+ hermes:
+ tags: [jupyter, notebook, repl, data-science, exploration, iterative]
+ category: data-science
+---
+
+# Jupyter Live Kernel (hamelnb)
+
+Gives you a **stateful Python REPL** via a live Jupyter kernel. Variables persist
+across executions. Use this instead of `execute_code` when you need to build up
+state incrementally, explore APIs, inspect DataFrames, or iterate on complex code.
+
+## When to Use This vs Other Tools
+
+| Tool | Use When |
+|------|----------|
+| **This skill** | Iterative exploration, state across steps, data science, ML, "let me try this and check" |
+| `execute_code` | One-shot scripts needing hermes tool access (web_search, file ops). Stateless. |
+| `terminal` | Shell commands, builds, installs, git, process management |
+
+**Rule of thumb:** If you'd want a Jupyter notebook for the task, use this skill.
+
+## Prerequisites
+
+1. **uv** must be installed (check: `which uv`)
+2. **JupyterLab** must be installed: `uv tool install jupyterlab`
+3. A Jupyter server must be running (see Setup below)
+
+## Setup
+
+The hamelnb script location:
+```
+SCRIPT="$HOME/.agent-skills/hamelnb/skills/jupyter-live-kernel/scripts/jupyter_live_kernel.py"
+```
+
+If not cloned yet:
+```
+git clone https://github.com/hamelsmu/hamelnb.git ~/.agent-skills/hamelnb
+```
+
+### Starting JupyterLab
+
+Check if a server is already running:
+```
+uv run "$SCRIPT" servers
+```
+
+If no servers found, start one:
+```
+jupyter-lab --no-browser --port=8888 --notebook-dir=$HOME/notebooks \
+ --IdentityProvider.token='' --ServerApp.password='' > /tmp/jupyter.log 2>&1 &
+sleep 3
+```
+
+Note: Token/password disabled for local agent access. The server runs headless.
+
+### Creating a Notebook for REPL Use
+
+If you just need a REPL (no existing notebook), create a minimal notebook file:
+```
+mkdir -p ~/notebooks
+```
+Write a minimal .ipynb JSON file with one empty code cell, then start a kernel
+session via the Jupyter REST API:
+```
+curl -s -X POST http://127.0.0.1:8888/api/sessions \
+ -H "Content-Type: application/json" \
+ -d '{"path":"scratch.ipynb","type":"notebook","name":"scratch.ipynb","kernel":{"name":"python3"}}'
+```
+
+## Core Workflow
+
+All commands return structured JSON. Always use `--compact` to save tokens.
+
+### 1. Discover servers and notebooks
+
+```
+uv run "$SCRIPT" servers --compact
+uv run "$SCRIPT" notebooks --compact
+```
+
+### 2. Execute code (primary operation)
+
+```
+uv run "$SCRIPT" execute --path --code '' --compact
+```
+
+State persists across execute calls. Variables, imports, objects all survive.
+
+Multi-line code works with $'...' quoting:
+```
+uv run "$SCRIPT" execute --path scratch.ipynb --code $'import os\nfiles = os.listdir(".")\nprint(f"Found {len(files)} files")' --compact
+```
+
+### 3. Inspect live variables
+
+```
+uv run "$SCRIPT" variables --path list --compact
+uv run "$SCRIPT" variables --path preview --name --compact
+```
+
+### 4. Edit notebook cells
+
+```
+# View current cells
+uv run "$SCRIPT" contents --path --compact
+
+# Insert a new cell
+uv run "$SCRIPT" edit --path insert \
+ --at-index --cell-type code --source '' --compact
+
+# Replace cell source (use cell-id from contents output)
+uv run "$SCRIPT" edit --path replace-source \
+ --cell-id --source '' --compact
+
+# Delete a cell
+uv run "$SCRIPT" edit --path delete --cell-id --compact
+```
+
+### 5. Verification (restart + run all)
+
+Only use when the user asks for a clean verification or you need to confirm
+the notebook runs top-to-bottom:
+
+```
+uv run "$SCRIPT" restart-run-all --path --save-outputs --compact
+```
+
+## Practical Tips from Experience
+
+1. **First execution after server start may timeout** — the kernel needs a moment
+ to initialize. If you get a timeout, just retry.
+
+2. **The kernel Python is JupyterLab's Python** — packages must be installed in
+ that environment. If you need additional packages, install them into the
+ JupyterLab tool environment first.
+
+3. **--compact flag saves significant tokens** — always use it. JSON output can
+ be very verbose without it.
+
+4. **For pure REPL use**, create a scratch.ipynb and don't bother with cell editing.
+ Just use `execute` repeatedly.
+
+5. **Argument order matters** — subcommand flags like `--path` go BEFORE the
+ sub-subcommand. E.g.: `variables --path nb.ipynb list` not `variables list --path nb.ipynb`.
+
+6. **If a session doesn't exist yet**, you need to start one via the REST API
+ (see Setup section). The tool can't execute without a live kernel session.
+
+7. **Errors are returned as JSON** with traceback — read the `ename` and `evalue`
+ fields to understand what went wrong.
+
+8. **Occasional websocket timeouts** — some operations may timeout on first try,
+ especially after a kernel restart. Retry once before escalating.
+
+## Timeout Defaults
+
+The script has a 30-second default timeout per execution. For long-running
+operations, pass `--timeout 120`. Use generous timeouts (60+) for initial
+setup or heavy computation.
diff --git a/tests/acp/__init__.py b/tests/acp/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/acp/test_auth.py b/tests/acp/test_auth.py
new file mode 100644
index 00000000..ffb07463
--- /dev/null
+++ b/tests/acp/test_auth.py
@@ -0,0 +1,56 @@
+"""Tests for acp_adapter.auth — provider detection."""
+
+from acp_adapter.auth import has_provider, detect_provider
+
+
+class TestHasProvider:
+ def test_has_provider_with_resolved_runtime(self, monkeypatch):
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda: {"provider": "openrouter", "api_key": "sk-or-test"},
+ )
+ assert has_provider() is True
+
+ def test_has_no_provider_when_runtime_has_no_key(self, monkeypatch):
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda: {"provider": "openrouter", "api_key": ""},
+ )
+ assert has_provider() is False
+
+ def test_has_no_provider_when_runtime_resolution_fails(self, monkeypatch):
+ def _boom():
+ raise RuntimeError("no provider")
+
+ monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _boom)
+ assert has_provider() is False
+
+
+class TestDetectProvider:
+ def test_detect_openrouter(self, monkeypatch):
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda: {"provider": "openrouter", "api_key": "sk-or-test"},
+ )
+ assert detect_provider() == "openrouter"
+
+ def test_detect_anthropic(self, monkeypatch):
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda: {"provider": "anthropic", "api_key": "sk-ant-test"},
+ )
+ assert detect_provider() == "anthropic"
+
+ def test_detect_none_when_no_key(self, monkeypatch):
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda: {"provider": "kimi-coding", "api_key": ""},
+ )
+ assert detect_provider() is None
+
+ def test_detect_none_on_resolution_error(self, monkeypatch):
+ def _boom():
+ raise RuntimeError("broken")
+
+ monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _boom)
+ assert detect_provider() is None
diff --git a/tests/acp/test_events.py b/tests/acp/test_events.py
new file mode 100644
index 00000000..400ea88e
--- /dev/null
+++ b/tests/acp/test_events.py
@@ -0,0 +1,239 @@
+"""Tests for acp_adapter.events — callback factories for ACP notifications."""
+
+import asyncio
+from concurrent.futures import Future
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+import acp
+from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, AgentMessageChunk
+
+from acp_adapter.events import (
+ make_message_cb,
+ make_step_cb,
+ make_thinking_cb,
+ make_tool_progress_cb,
+)
+
+
+@pytest.fixture()
+def mock_conn():
+ """Mock ACP Client connection."""
+ conn = MagicMock(spec=acp.Client)
+ conn.session_update = AsyncMock()
+ return conn
+
+
+@pytest.fixture()
+def event_loop_fixture():
+ """Create a real event loop for testing threadsafe coroutine submission."""
+ loop = asyncio.new_event_loop()
+ yield loop
+ loop.close()
+
+
+# ---------------------------------------------------------------------------
+# Tool progress callback
+# ---------------------------------------------------------------------------
+
+
+class TestToolProgressCallback:
+ def test_emits_tool_call_start(self, mock_conn, event_loop_fixture):
+ """Tool progress should emit a ToolCallStart update."""
+ tool_call_ids = {}
+ loop = event_loop_fixture
+
+ cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ # Run callback in the event loop context
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb("terminal", "$ ls -la", {"command": "ls -la"})
+
+ # Should have tracked the tool call ID
+ assert "terminal" in tool_call_ids
+
+ # Should have called run_coroutine_threadsafe
+ mock_rcts.assert_called_once()
+ coro = mock_rcts.call_args[0][0]
+ # The coroutine should be conn.session_update
+ assert mock_conn.session_update.called or coro is not None
+
+ def test_handles_string_args(self, mock_conn, event_loop_fixture):
+ """If args is a JSON string, it should be parsed."""
+ tool_call_ids = {}
+ loop = event_loop_fixture
+
+ cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb("read_file", "Reading /etc/hosts", '{"path": "/etc/hosts"}')
+
+ assert "read_file" in tool_call_ids
+
+ def test_handles_non_dict_args(self, mock_conn, event_loop_fixture):
+ """If args is not a dict, it should be wrapped."""
+ tool_call_ids = {}
+ loop = event_loop_fixture
+
+ cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb("terminal", "$ echo hi", None)
+
+ assert "terminal" in tool_call_ids
+
+ def test_duplicate_same_name_tool_calls_use_fifo_ids(self, mock_conn, event_loop_fixture):
+ """Multiple same-name tool calls should be tracked independently in order."""
+ tool_call_ids = {}
+ loop = event_loop_fixture
+
+ progress_cb = make_tool_progress_cb(mock_conn, "session-1", loop, tool_call_ids)
+ step_cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ progress_cb("terminal", "$ ls", {"command": "ls"})
+ progress_cb("terminal", "$ pwd", {"command": "pwd"})
+ assert len(tool_call_ids["terminal"]) == 2
+
+ step_cb(1, [{"name": "terminal", "result": "ok-1"}])
+ assert len(tool_call_ids["terminal"]) == 1
+
+ step_cb(2, [{"name": "terminal", "result": "ok-2"}])
+ assert "terminal" not in tool_call_ids
+
+
+# ---------------------------------------------------------------------------
+# Thinking callback
+# ---------------------------------------------------------------------------
+
+
+class TestThinkingCallback:
+ def test_emits_thought_chunk(self, mock_conn, event_loop_fixture):
+ """Thinking callback should emit AgentThoughtChunk."""
+ loop = event_loop_fixture
+
+ cb = make_thinking_cb(mock_conn, "session-1", loop)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb("Analyzing the code...")
+
+ mock_rcts.assert_called_once()
+
+ def test_ignores_empty_text(self, mock_conn, event_loop_fixture):
+ """Empty text should not emit any update."""
+ loop = event_loop_fixture
+
+ cb = make_thinking_cb(mock_conn, "session-1", loop)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ cb("")
+
+ mock_rcts.assert_not_called()
+
+
+# ---------------------------------------------------------------------------
+# Step callback
+# ---------------------------------------------------------------------------
+
+
+class TestStepCallback:
+ def test_completes_tracked_tool_calls(self, mock_conn, event_loop_fixture):
+ """Step callback should mark tracked tools as completed."""
+ tool_call_ids = {"terminal": "tc-abc123"}
+ loop = event_loop_fixture
+
+ cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb(1, [{"name": "terminal", "result": "success"}])
+
+ # Tool should have been removed from tracking
+ assert "terminal" not in tool_call_ids
+ mock_rcts.assert_called_once()
+
+ def test_ignores_untracked_tools(self, mock_conn, event_loop_fixture):
+ """Tools not in tool_call_ids should be silently ignored."""
+ tool_call_ids = {}
+ loop = event_loop_fixture
+
+ cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ cb(1, [{"name": "unknown_tool", "result": "ok"}])
+
+ mock_rcts.assert_not_called()
+
+ def test_handles_string_tool_info(self, mock_conn, event_loop_fixture):
+ """Tool info as a string (just the name) should work."""
+ tool_call_ids = {"read_file": "tc-def456"}
+ loop = event_loop_fixture
+
+ cb = make_step_cb(mock_conn, "session-1", loop, tool_call_ids)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb(2, ["read_file"])
+
+ assert "read_file" not in tool_call_ids
+ mock_rcts.assert_called_once()
+
+
+# ---------------------------------------------------------------------------
+# Message callback
+# ---------------------------------------------------------------------------
+
+
+class TestMessageCallback:
+ def test_emits_agent_message_chunk(self, mock_conn, event_loop_fixture):
+ """Message callback should emit AgentMessageChunk."""
+ loop = event_loop_fixture
+
+ cb = make_message_cb(mock_conn, "session-1", loop)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ future = MagicMock(spec=Future)
+ future.result.return_value = None
+ mock_rcts.return_value = future
+
+ cb("Here is your answer.")
+
+ mock_rcts.assert_called_once()
+
+ def test_ignores_empty_message(self, mock_conn, event_loop_fixture):
+ """Empty text should not emit any update."""
+ loop = event_loop_fixture
+
+ cb = make_message_cb(mock_conn, "session-1", loop)
+
+ with patch("acp_adapter.events.asyncio.run_coroutine_threadsafe") as mock_rcts:
+ cb("")
+
+ mock_rcts.assert_not_called()
diff --git a/tests/acp/test_permissions.py b/tests/acp/test_permissions.py
new file mode 100644
index 00000000..de83ebef
--- /dev/null
+++ b/tests/acp/test_permissions.py
@@ -0,0 +1,75 @@
+"""Tests for acp_adapter.permissions — ACP approval bridging."""
+
+import asyncio
+from concurrent.futures import Future
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from acp.schema import (
+ AllowedOutcome,
+ DeniedOutcome,
+ RequestPermissionResponse,
+)
+from acp_adapter.permissions import make_approval_callback
+
+
+def _make_response(outcome):
+ """Helper to build a RequestPermissionResponse with the given outcome."""
+ return RequestPermissionResponse(outcome=outcome)
+
+
+def _setup_callback(outcome, timeout=60.0):
+ """
+ Create a callback wired to a mock request_permission coroutine
+ that resolves to the given outcome.
+
+ Returns:
+ (callback, mock_request_permission_fn)
+ """
+ loop = MagicMock(spec=asyncio.AbstractEventLoop)
+ mock_rp = MagicMock(name="request_permission")
+
+ response = _make_response(outcome)
+
+ # Patch asyncio.run_coroutine_threadsafe so it returns a future
+ # that immediately yields the response.
+ future = MagicMock(spec=Future)
+ future.result.return_value = response
+
+ with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
+ cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=timeout)
+ result = cb("rm -rf /", "dangerous command")
+
+ return result
+
+
+class TestApprovalMapping:
+ def test_approval_allow_once_maps_correctly(self):
+ outcome = AllowedOutcome(option_id="allow_once", outcome="selected")
+ result = _setup_callback(outcome)
+ assert result == "once"
+
+ def test_approval_allow_always_maps_correctly(self):
+ outcome = AllowedOutcome(option_id="allow_always", outcome="selected")
+ result = _setup_callback(outcome)
+ assert result == "always"
+
+ def test_approval_deny_maps_correctly(self):
+ outcome = DeniedOutcome(outcome="cancelled")
+ result = _setup_callback(outcome)
+ assert result == "deny"
+
+ def test_approval_timeout_returns_deny(self):
+ """When the future times out, the callback should return 'deny'."""
+ loop = MagicMock(spec=asyncio.AbstractEventLoop)
+ mock_rp = MagicMock(name="request_permission")
+
+ future = MagicMock(spec=Future)
+ future.result.side_effect = TimeoutError("timed out")
+
+ with patch("acp_adapter.permissions.asyncio.run_coroutine_threadsafe", return_value=future):
+ cb = make_approval_callback(mock_rp, loop, session_id="s1", timeout=0.01)
+ result = cb("rm -rf /", "dangerous")
+
+ assert result == "deny"
diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py
new file mode 100644
index 00000000..96475c67
--- /dev/null
+++ b/tests/acp/test_server.py
@@ -0,0 +1,297 @@
+"""Tests for acp_adapter.server — HermesACPAgent ACP server."""
+
+import asyncio
+import os
+from unittest.mock import MagicMock, AsyncMock, patch
+
+import pytest
+
+import acp
+from acp.schema import (
+ AgentCapabilities,
+ AuthenticateResponse,
+ Implementation,
+ InitializeResponse,
+ ListSessionsResponse,
+ LoadSessionResponse,
+ NewSessionResponse,
+ PromptResponse,
+ ResumeSessionResponse,
+ SessionInfo,
+ TextContentBlock,
+ Usage,
+)
+from acp_adapter.server import HermesACPAgent, HERMES_VERSION
+from acp_adapter.session import SessionManager
+
+
+@pytest.fixture()
+def mock_manager():
+ """SessionManager with a mock agent factory."""
+ return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent"))
+
+
+@pytest.fixture()
+def agent(mock_manager):
+ """HermesACPAgent backed by a mock session manager."""
+ return HermesACPAgent(session_manager=mock_manager)
+
+
+# ---------------------------------------------------------------------------
+# initialize
+# ---------------------------------------------------------------------------
+
+
+class TestInitialize:
+ @pytest.mark.asyncio
+ async def test_initialize_returns_correct_protocol_version(self, agent):
+ resp = await agent.initialize(protocol_version=1)
+ assert isinstance(resp, InitializeResponse)
+ assert resp.protocol_version == acp.PROTOCOL_VERSION
+
+ @pytest.mark.asyncio
+ async def test_initialize_returns_agent_info(self, agent):
+ resp = await agent.initialize(protocol_version=1)
+ assert resp.agent_info is not None
+ assert isinstance(resp.agent_info, Implementation)
+ assert resp.agent_info.name == "hermes-agent"
+ assert resp.agent_info.version == HERMES_VERSION
+
+ @pytest.mark.asyncio
+ async def test_initialize_returns_capabilities(self, agent):
+ resp = await agent.initialize(protocol_version=1)
+ caps = resp.agent_capabilities
+ assert isinstance(caps, AgentCapabilities)
+ assert caps.session_capabilities is not None
+ assert caps.session_capabilities.fork is not None
+ assert caps.session_capabilities.list is not None
+
+
+# ---------------------------------------------------------------------------
+# authenticate
+# ---------------------------------------------------------------------------
+
+
+class TestAuthenticate:
+ @pytest.mark.asyncio
+ async def test_authenticate_with_provider_configured(self, agent, monkeypatch):
+ monkeypatch.setattr(
+ "acp_adapter.server.has_provider",
+ lambda: True,
+ )
+ resp = await agent.authenticate(method_id="openrouter")
+ assert isinstance(resp, AuthenticateResponse)
+
+ @pytest.mark.asyncio
+ async def test_authenticate_without_provider(self, agent, monkeypatch):
+ monkeypatch.setattr(
+ "acp_adapter.server.has_provider",
+ lambda: False,
+ )
+ resp = await agent.authenticate(method_id="openrouter")
+ assert resp is None
+
+
+# ---------------------------------------------------------------------------
+# new_session / cancel / load / resume
+# ---------------------------------------------------------------------------
+
+
+class TestSessionOps:
+ @pytest.mark.asyncio
+ async def test_new_session_creates_session(self, agent):
+ resp = await agent.new_session(cwd="/home/user/project")
+ assert isinstance(resp, NewSessionResponse)
+ assert resp.session_id
+ # Session should be retrievable from the manager
+ state = agent.session_manager.get_session(resp.session_id)
+ assert state is not None
+ assert state.cwd == "/home/user/project"
+
+ @pytest.mark.asyncio
+ async def test_cancel_sets_event(self, agent):
+ resp = await agent.new_session(cwd=".")
+ state = agent.session_manager.get_session(resp.session_id)
+ assert not state.cancel_event.is_set()
+ await agent.cancel(session_id=resp.session_id)
+ assert state.cancel_event.is_set()
+
+ @pytest.mark.asyncio
+ async def test_cancel_nonexistent_session_is_noop(self, agent):
+ # Should not raise
+ await agent.cancel(session_id="does-not-exist")
+
+ @pytest.mark.asyncio
+ async def test_load_session_returns_response(self, agent):
+ resp = await agent.new_session(cwd="/tmp")
+ load_resp = await agent.load_session(cwd="/tmp", session_id=resp.session_id)
+ assert isinstance(load_resp, LoadSessionResponse)
+
+ @pytest.mark.asyncio
+ async def test_load_session_not_found_returns_none(self, agent):
+ resp = await agent.load_session(cwd="/tmp", session_id="bogus")
+ assert resp is None
+
+ @pytest.mark.asyncio
+ async def test_resume_session_returns_response(self, agent):
+ resp = await agent.new_session(cwd="/tmp")
+ resume_resp = await agent.resume_session(cwd="/tmp", session_id=resp.session_id)
+ assert isinstance(resume_resp, ResumeSessionResponse)
+
+ @pytest.mark.asyncio
+ async def test_resume_session_creates_new_if_missing(self, agent):
+ resume_resp = await agent.resume_session(cwd="/tmp", session_id="nonexistent")
+ assert isinstance(resume_resp, ResumeSessionResponse)
+
+
+# ---------------------------------------------------------------------------
+# list / fork
+# ---------------------------------------------------------------------------
+
+
+class TestListAndFork:
+ @pytest.mark.asyncio
+ async def test_list_sessions(self, agent):
+ await agent.new_session(cwd="/a")
+ await agent.new_session(cwd="/b")
+ resp = await agent.list_sessions()
+ assert isinstance(resp, ListSessionsResponse)
+ assert len(resp.sessions) == 2
+
+ @pytest.mark.asyncio
+ async def test_fork_session(self, agent):
+ new_resp = await agent.new_session(cwd="/original")
+ fork_resp = await agent.fork_session(cwd="/forked", session_id=new_resp.session_id)
+ assert fork_resp.session_id
+ assert fork_resp.session_id != new_resp.session_id
+
+
+# ---------------------------------------------------------------------------
+# prompt
+# ---------------------------------------------------------------------------
+
+
+class TestPrompt:
+ @pytest.mark.asyncio
+ async def test_prompt_returns_refusal_for_unknown_session(self, agent):
+ prompt = [TextContentBlock(type="text", text="hello")]
+ resp = await agent.prompt(prompt=prompt, session_id="nonexistent")
+ assert isinstance(resp, PromptResponse)
+ assert resp.stop_reason == "refusal"
+
+ @pytest.mark.asyncio
+ async def test_prompt_returns_end_turn_for_empty_message(self, agent):
+ new_resp = await agent.new_session(cwd=".")
+ prompt = [TextContentBlock(type="text", text=" ")]
+ resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
+ assert resp.stop_reason == "end_turn"
+
+ @pytest.mark.asyncio
+ async def test_prompt_runs_agent(self, agent):
+ """The prompt method should call run_conversation on the agent."""
+ new_resp = await agent.new_session(cwd=".")
+ state = agent.session_manager.get_session(new_resp.session_id)
+
+ # Mock the agent's run_conversation
+ state.agent.run_conversation = MagicMock(return_value={
+ "final_response": "Hello! How can I help?",
+ "messages": [
+ {"role": "user", "content": "hello"},
+ {"role": "assistant", "content": "Hello! How can I help?"},
+ ],
+ })
+
+ # Set up a mock connection
+ mock_conn = MagicMock(spec=acp.Client)
+ mock_conn.session_update = AsyncMock()
+ agent._conn = mock_conn
+
+ prompt = [TextContentBlock(type="text", text="hello")]
+ resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
+
+ assert isinstance(resp, PromptResponse)
+ assert resp.stop_reason == "end_turn"
+ state.agent.run_conversation.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_prompt_updates_history(self, agent):
+ """After a prompt, session history should be updated."""
+ new_resp = await agent.new_session(cwd=".")
+ state = agent.session_manager.get_session(new_resp.session_id)
+
+ expected_history = [
+ {"role": "user", "content": "hi"},
+ {"role": "assistant", "content": "hey"},
+ ]
+ state.agent.run_conversation = MagicMock(return_value={
+ "final_response": "hey",
+ "messages": expected_history,
+ })
+
+ mock_conn = MagicMock(spec=acp.Client)
+ mock_conn.session_update = AsyncMock()
+ agent._conn = mock_conn
+
+ prompt = [TextContentBlock(type="text", text="hi")]
+ await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
+
+ assert state.history == expected_history
+
+ @pytest.mark.asyncio
+ async def test_prompt_sends_final_message_update(self, agent):
+ """The final response should be sent as an AgentMessageChunk."""
+ new_resp = await agent.new_session(cwd=".")
+ state = agent.session_manager.get_session(new_resp.session_id)
+
+ state.agent.run_conversation = MagicMock(return_value={
+ "final_response": "I can help with that!",
+ "messages": [],
+ })
+
+ mock_conn = MagicMock(spec=acp.Client)
+ mock_conn.session_update = AsyncMock()
+ agent._conn = mock_conn
+
+ prompt = [TextContentBlock(type="text", text="help me")]
+ await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
+
+ # session_update should have been called with the final message
+ mock_conn.session_update.assert_called()
+ # Get the last call's update argument
+ last_call = mock_conn.session_update.call_args_list[-1]
+ update = last_call[1].get("update") or last_call[0][1]
+ assert update.session_update == "agent_message_chunk"
+
+ @pytest.mark.asyncio
+ async def test_prompt_cancelled_returns_cancelled_stop_reason(self, agent):
+ """If cancel is called during prompt, stop_reason should be 'cancelled'."""
+ new_resp = await agent.new_session(cwd=".")
+ state = agent.session_manager.get_session(new_resp.session_id)
+
+ def mock_run(*args, **kwargs):
+ # Simulate cancel being set during execution
+ state.cancel_event.set()
+ return {"final_response": "interrupted", "messages": []}
+
+ state.agent.run_conversation = mock_run
+
+ mock_conn = MagicMock(spec=acp.Client)
+ mock_conn.session_update = AsyncMock()
+ agent._conn = mock_conn
+
+ prompt = [TextContentBlock(type="text", text="do something")]
+ resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
+
+ assert resp.stop_reason == "cancelled"
+
+
+# ---------------------------------------------------------------------------
+# on_connect
+# ---------------------------------------------------------------------------
+
+
+class TestOnConnect:
+ def test_on_connect_stores_client(self, agent):
+ mock_conn = MagicMock(spec=acp.Client)
+ agent.on_connect(mock_conn)
+ assert agent._conn is mock_conn
diff --git a/tests/acp/test_session.py b/tests/acp/test_session.py
new file mode 100644
index 00000000..79cbcf53
--- /dev/null
+++ b/tests/acp/test_session.py
@@ -0,0 +1,112 @@
+"""Tests for acp_adapter.session — SessionManager and SessionState."""
+
+import pytest
+from unittest.mock import MagicMock
+
+from acp_adapter.session import SessionManager, SessionState
+
+
+@pytest.fixture()
+def manager():
+ """SessionManager with a mock agent factory (avoids needing API keys)."""
+ return SessionManager(agent_factory=lambda: MagicMock(name="MockAIAgent"))
+
+
+# ---------------------------------------------------------------------------
+# create / get
+# ---------------------------------------------------------------------------
+
+
+class TestCreateSession:
+ def test_create_session_returns_state(self, manager):
+ state = manager.create_session(cwd="/tmp/work")
+ assert isinstance(state, SessionState)
+ assert state.cwd == "/tmp/work"
+ assert state.session_id
+ assert state.history == []
+ assert state.agent is not None
+
+ def test_create_session_registers_task_cwd(self, manager, monkeypatch):
+ calls = []
+ monkeypatch.setattr("acp_adapter.session._register_task_cwd", lambda task_id, cwd: calls.append((task_id, cwd)))
+ state = manager.create_session(cwd="/tmp/work")
+ assert calls == [(state.session_id, "/tmp/work")]
+
+ def test_session_ids_are_unique(self, manager):
+ s1 = manager.create_session()
+ s2 = manager.create_session()
+ assert s1.session_id != s2.session_id
+
+ def test_get_session(self, manager):
+ state = manager.create_session()
+ fetched = manager.get_session(state.session_id)
+ assert fetched is state
+
+ def test_get_nonexistent_session_returns_none(self, manager):
+ assert manager.get_session("does-not-exist") is None
+
+
+# ---------------------------------------------------------------------------
+# fork
+# ---------------------------------------------------------------------------
+
+
+class TestForkSession:
+ def test_fork_session_deep_copies_history(self, manager):
+ original = manager.create_session()
+ original.history.append({"role": "user", "content": "hello"})
+ original.history.append({"role": "assistant", "content": "hi"})
+
+ forked = manager.fork_session(original.session_id, cwd="/new")
+ assert forked is not None
+
+ # History should be equal in content
+ assert len(forked.history) == 2
+ assert forked.history[0]["content"] == "hello"
+
+ # But a deep copy — mutating one doesn't affect the other
+ forked.history.append({"role": "user", "content": "extra"})
+ assert len(original.history) == 2
+ assert len(forked.history) == 3
+
+ def test_fork_session_has_new_id(self, manager):
+ original = manager.create_session()
+ forked = manager.fork_session(original.session_id)
+ assert forked is not None
+ assert forked.session_id != original.session_id
+
+ def test_fork_nonexistent_returns_none(self, manager):
+ assert manager.fork_session("bogus-id") is None
+
+
+# ---------------------------------------------------------------------------
+# list / cleanup / remove
+# ---------------------------------------------------------------------------
+
+
+class TestListAndCleanup:
+ def test_list_sessions_empty(self, manager):
+ assert manager.list_sessions() == []
+
+ def test_list_sessions_returns_created(self, manager):
+ s1 = manager.create_session(cwd="/a")
+ s2 = manager.create_session(cwd="/b")
+ listing = manager.list_sessions()
+ ids = {s["session_id"] for s in listing}
+ assert s1.session_id in ids
+ assert s2.session_id in ids
+ assert len(listing) == 2
+
+ def test_cleanup_clears_all(self, manager):
+ manager.create_session()
+ manager.create_session()
+ assert len(manager.list_sessions()) == 2
+ manager.cleanup()
+ assert manager.list_sessions() == []
+
+ def test_remove_session(self, manager):
+ state = manager.create_session()
+ assert manager.remove_session(state.session_id) is True
+ assert manager.get_session(state.session_id) is None
+ # Removing again returns False
+ assert manager.remove_session(state.session_id) is False
diff --git a/tests/acp/test_tools.py b/tests/acp/test_tools.py
new file mode 100644
index 00000000..59401501
--- /dev/null
+++ b/tests/acp/test_tools.py
@@ -0,0 +1,236 @@
+"""Tests for acp_adapter.tools — tool kind mapping and ACP content building."""
+
+import pytest
+
+from acp_adapter.tools import (
+ TOOL_KIND_MAP,
+ build_tool_complete,
+ build_tool_start,
+ build_tool_title,
+ extract_locations,
+ get_tool_kind,
+ make_tool_call_id,
+)
+from acp.schema import (
+ FileEditToolCallContent,
+ ContentToolCallContent,
+ ToolCallLocation,
+ ToolCallStart,
+ ToolCallProgress,
+)
+
+
+# ---------------------------------------------------------------------------
+# TOOL_KIND_MAP coverage
+# ---------------------------------------------------------------------------
+
+
+COMMON_HERMES_TOOLS = ["read_file", "search_files", "terminal", "patch", "write_file", "process"]
+
+
+class TestToolKindMap:
+ def test_all_hermes_tools_have_kind(self):
+ """Every common hermes tool should appear in TOOL_KIND_MAP."""
+ for tool in COMMON_HERMES_TOOLS:
+ assert tool in TOOL_KIND_MAP, f"{tool} missing from TOOL_KIND_MAP"
+
+ def test_tool_kind_read_file(self):
+ assert get_tool_kind("read_file") == "read"
+
+ def test_tool_kind_terminal(self):
+ assert get_tool_kind("terminal") == "execute"
+
+ def test_tool_kind_patch(self):
+ assert get_tool_kind("patch") == "edit"
+
+ def test_tool_kind_write_file(self):
+ assert get_tool_kind("write_file") == "edit"
+
+ def test_tool_kind_web_search(self):
+ assert get_tool_kind("web_search") == "fetch"
+
+ def test_tool_kind_execute_code(self):
+ assert get_tool_kind("execute_code") == "execute"
+
+ def test_tool_kind_browser_navigate(self):
+ assert get_tool_kind("browser_navigate") == "fetch"
+
+ def test_unknown_tool_returns_other_kind(self):
+ assert get_tool_kind("nonexistent_tool_xyz") == "other"
+
+
+# ---------------------------------------------------------------------------
+# make_tool_call_id
+# ---------------------------------------------------------------------------
+
+
+class TestMakeToolCallId:
+ def test_returns_string(self):
+ tc_id = make_tool_call_id()
+ assert isinstance(tc_id, str)
+
+ def test_starts_with_tc_prefix(self):
+ tc_id = make_tool_call_id()
+ assert tc_id.startswith("tc-")
+
+ def test_ids_are_unique(self):
+ ids = {make_tool_call_id() for _ in range(100)}
+ assert len(ids) == 100
+
+
+# ---------------------------------------------------------------------------
+# build_tool_title
+# ---------------------------------------------------------------------------
+
+
+class TestBuildToolTitle:
+ def test_terminal_title_includes_command(self):
+ title = build_tool_title("terminal", {"command": "ls -la /tmp"})
+ assert "ls -la /tmp" in title
+
+ def test_terminal_title_truncates_long_command(self):
+ long_cmd = "x" * 200
+ title = build_tool_title("terminal", {"command": long_cmd})
+ assert len(title) < 120
+ assert "..." in title
+
+ def test_read_file_title(self):
+ title = build_tool_title("read_file", {"path": "/etc/hosts"})
+ assert "/etc/hosts" in title
+
+ def test_patch_title(self):
+ title = build_tool_title("patch", {"path": "main.py", "mode": "replace"})
+ assert "main.py" in title
+
+ def test_search_title(self):
+ title = build_tool_title("search_files", {"pattern": "TODO"})
+ assert "TODO" in title
+
+ def test_web_search_title(self):
+ title = build_tool_title("web_search", {"query": "python asyncio"})
+ assert "python asyncio" in title
+
+ def test_unknown_tool_uses_name(self):
+ title = build_tool_title("some_new_tool", {"foo": "bar"})
+ assert title == "some_new_tool"
+
+
+# ---------------------------------------------------------------------------
+# build_tool_start
+# ---------------------------------------------------------------------------
+
+
+class TestBuildToolStart:
+ def test_build_tool_start_for_patch(self):
+ """patch should produce a FileEditToolCallContent (diff)."""
+ args = {
+ "path": "src/main.py",
+ "old_string": "print('hello')",
+ "new_string": "print('world')",
+ }
+ result = build_tool_start("tc-1", "patch", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "edit"
+ # The first content item should be a diff
+ assert len(result.content) >= 1
+ diff_item = result.content[0]
+ assert isinstance(diff_item, FileEditToolCallContent)
+ assert diff_item.path == "src/main.py"
+ assert diff_item.new_text == "print('world')"
+ assert diff_item.old_text == "print('hello')"
+
+ def test_build_tool_start_for_write_file(self):
+ """write_file should produce a FileEditToolCallContent (diff)."""
+ args = {"path": "new_file.py", "content": "print('hello')"}
+ result = build_tool_start("tc-w1", "write_file", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "edit"
+ assert len(result.content) >= 1
+ diff_item = result.content[0]
+ assert isinstance(diff_item, FileEditToolCallContent)
+ assert diff_item.path == "new_file.py"
+
+ def test_build_tool_start_for_terminal(self):
+ """terminal should produce text content with the command."""
+ args = {"command": "ls -la /tmp"}
+ result = build_tool_start("tc-2", "terminal", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "execute"
+ assert len(result.content) >= 1
+ content_item = result.content[0]
+ assert isinstance(content_item, ContentToolCallContent)
+ # The wrapped text block should contain the command
+ text = content_item.content.text
+ assert "ls -la /tmp" in text
+
+ def test_build_tool_start_for_read_file(self):
+ """read_file should include the path in content."""
+ args = {"path": "/etc/hosts", "offset": 1, "limit": 50}
+ result = build_tool_start("tc-3", "read_file", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "read"
+ assert len(result.content) >= 1
+ content_item = result.content[0]
+ assert isinstance(content_item, ContentToolCallContent)
+ assert "/etc/hosts" in content_item.content.text
+
+ def test_build_tool_start_for_search(self):
+ """search_files should include pattern in content."""
+ args = {"pattern": "TODO", "target": "content"}
+ result = build_tool_start("tc-4", "search_files", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "search"
+ assert "TODO" in result.content[0].content.text
+
+ def test_build_tool_start_generic_fallback(self):
+ """Unknown tools should get a generic text representation."""
+ args = {"foo": "bar", "baz": 42}
+ result = build_tool_start("tc-5", "some_tool", args)
+ assert isinstance(result, ToolCallStart)
+ assert result.kind == "other"
+
+
+# ---------------------------------------------------------------------------
+# build_tool_complete
+# ---------------------------------------------------------------------------
+
+
+class TestBuildToolComplete:
+ def test_build_tool_complete_for_terminal(self):
+ """Completed terminal call should include output text."""
+ result = build_tool_complete("tc-2", "terminal", "total 42\ndrwxr-xr-x 2 root root 4096 ...")
+ assert isinstance(result, ToolCallProgress)
+ assert result.status == "completed"
+ assert len(result.content) >= 1
+ content_item = result.content[0]
+ assert isinstance(content_item, ContentToolCallContent)
+ assert "total 42" in content_item.content.text
+
+ def test_build_tool_complete_truncates_large_output(self):
+ """Very large outputs should be truncated."""
+ big_output = "x" * 10000
+ result = build_tool_complete("tc-6", "read_file", big_output)
+ assert isinstance(result, ToolCallProgress)
+ display_text = result.content[0].content.text
+ assert len(display_text) < 6000
+ assert "truncated" in display_text
+
+
+# ---------------------------------------------------------------------------
+# extract_locations
+# ---------------------------------------------------------------------------
+
+
+class TestExtractLocations:
+ def test_extract_locations_with_path(self):
+ args = {"path": "src/app.py", "offset": 42}
+ locs = extract_locations(args)
+ assert len(locs) == 1
+ assert isinstance(locs[0], ToolCallLocation)
+ assert locs[0].path == "src/app.py"
+ assert locs[0].line == 42
+
+ def test_extract_locations_without_path(self):
+ args = {"command": "echo hi"}
+ locs = extract_locations(args)
+ assert locs == []
diff --git a/toolsets.py b/toolsets.py
index 305d6605..221ff2ca 100644
--- a/toolsets.py
+++ b/toolsets.py
@@ -224,6 +224,25 @@ TOOLSETS = {
# All platforms share the same core tools (including send_message,
# which is gated on gateway running via its check_fn).
# ==========================================================================
+
+ "hermes-acp": {
+ "description": "Editor integration (VS Code, Zed, JetBrains) — coding-focused tools without messaging, audio, or clarify UI",
+ "tools": [
+ "web_search", "web_extract",
+ "terminal", "process",
+ "read_file", "write_file", "patch", "search_files",
+ "vision_analyze",
+ "skills_list", "skill_view", "skill_manage",
+ "browser_navigate", "browser_snapshot", "browser_click",
+ "browser_type", "browser_scroll", "browser_back",
+ "browser_press", "browser_close", "browser_get_images",
+ "browser_vision",
+ "todo", "memory",
+ "session_search",
+ "execute_code", "delegate_task",
+ ],
+ "includes": []
+ },
"hermes-cli": {
"description": "Full interactive CLI toolset - all default tools plus cronjob management",