From a2f0d14f2925ad52c2a5b485a14af0ba46a091a3 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 16 Mar 2026 05:19:36 -0700 Subject: [PATCH] feat(acp): support slash commands in ACP adapter (#1532) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds /help, /model, /tools, /context, /reset, /compact, /version to the ACP adapter (VS Code, Zed, JetBrains). Commands are handled directly in the server without instantiating the TUI — each command queries agent/session state and returns plain text. Unrecognized /commands fall through to the LLM as normal messages. /model uses detect_provider_for_model() for auto-detection when switching models, matching the CLI and gateway behavior. Fixes #1402 --- acp_adapter/server.py | 156 +++++++++++++++++++++++++++++++++++++-- tests/acp/test_server.py | 94 +++++++++++++++++++++++ 2 files changed, 245 insertions(+), 5 deletions(-) diff --git a/acp_adapter/server.py b/acp_adapter/server.py index 6e8ec3b4..1081104e 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -42,7 +42,7 @@ from acp_adapter.events import ( make_tool_progress_cb, ) from acp_adapter.permissions import make_approval_callback -from acp_adapter.session import SessionManager +from acp_adapter.session import SessionManager, SessionState logger = logging.getLogger(__name__) @@ -226,10 +226,19 @@ class HermesACPAgent(acp.Agent): logger.error("prompt: session %s not found", session_id) return PromptResponse(stop_reason="refusal") - user_text = _extract_text(prompt) - if not user_text.strip(): + user_text = _extract_text(prompt).strip() + if not user_text: return PromptResponse(stop_reason="end_turn") + # Intercept slash commands — handle locally without calling the LLM + if user_text.startswith("/"): + response_text = self._handle_slash_command(user_text, state) + if response_text is not None: + if self._conn: + update = acp.update_agent_message_text(response_text) + await self._conn.session_update(session_id, update) + return PromptResponse(stop_reason="end_turn") + logger.info("Prompt on session %s: %s", session_id, user_text[:100]) conn = self._conn @@ -315,12 +324,149 @@ class HermesACPAgent(acp.Agent): stop_reason = "cancelled" if state.cancel_event and state.cancel_event.is_set() else "end_turn" return PromptResponse(stop_reason=stop_reason, usage=usage) - # ---- Model switching ---------------------------------------------------- + # ---- Slash commands (headless) ------------------------------------------- + + _SLASH_COMMANDS = { + "help": "Show available commands", + "model": "Show or change current model", + "tools": "List available tools", + "context": "Show conversation context info", + "reset": "Clear conversation history", + "compact": "Compress conversation context", + "version": "Show Hermes version", + } + + def _handle_slash_command(self, text: str, state: SessionState) -> str | None: + """Dispatch a slash command and return the response text. + + Returns ``None`` for unrecognized commands so they fall through + to the LLM (the user may have typed ``/something`` as prose). + """ + parts = text.split(maxsplit=1) + cmd = parts[0].lstrip("/").lower() + args = parts[1].strip() if len(parts) > 1 else "" + + handler = { + "help": self._cmd_help, + "model": self._cmd_model, + "tools": self._cmd_tools, + "context": self._cmd_context, + "reset": self._cmd_reset, + "compact": self._cmd_compact, + "version": self._cmd_version, + }.get(cmd) + + if handler is None: + return None # not a known command — let the LLM handle it + + try: + return handler(args, state) + except Exception as e: + logger.error("Slash command /%s error: %s", cmd, e, exc_info=True) + return f"Error executing /{cmd}: {e}" + + def _cmd_help(self, args: str, state: SessionState) -> str: + lines = ["Available commands:", ""] + for cmd, desc in self._SLASH_COMMANDS.items(): + lines.append(f" /{cmd:10s} {desc}") + lines.append("") + lines.append("Unrecognized /commands are sent to the model as normal messages.") + return "\n".join(lines) + + def _cmd_model(self, args: str, state: SessionState) -> str: + if not args: + model = state.model or getattr(state.agent, "model", "unknown") + provider = getattr(state.agent, "provider", None) or "auto" + return f"Current model: {model}\nProvider: {provider}" + + new_model = args.strip() + target_provider = None + + # Auto-detect provider for the requested model + try: + from hermes_cli.models import parse_model_input, detect_provider_for_model + current_provider = getattr(state.agent, "provider", None) or "openrouter" + target_provider, new_model = parse_model_input(new_model, current_provider) + if target_provider == current_provider: + detected = detect_provider_for_model(new_model, current_provider) + if detected: + target_provider, new_model = detected + except Exception: + logger.debug("Provider detection failed, using model as-is", exc_info=True) + + state.model = new_model + state.agent = self.session_manager._make_agent( + session_id=state.session_id, + cwd=state.cwd, + model=new_model, + ) + provider_label = target_provider or getattr(state.agent, "provider", "auto") + logger.info("Session %s: model switched to %s", state.session_id, new_model) + return f"Model switched to: {new_model}\nProvider: {provider_label}" + + def _cmd_tools(self, args: str, state: SessionState) -> str: + try: + from model_tools import get_tool_definitions + toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"] + tools = get_tool_definitions(enabled_toolsets=toolsets, quiet_mode=True) + if not tools: + return "No tools available." + lines = [f"Available tools ({len(tools)}):"] + for t in tools: + name = t.get("function", {}).get("name", "?") + desc = t.get("function", {}).get("description", "") + # Truncate long descriptions + if len(desc) > 80: + desc = desc[:77] + "..." + lines.append(f" {name}: {desc}") + return "\n".join(lines) + except Exception as e: + return f"Could not list tools: {e}" + + def _cmd_context(self, args: str, state: SessionState) -> str: + n_messages = len(state.history) + if n_messages == 0: + return "Conversation is empty (no messages yet)." + # Count by role + roles: dict[str, int] = {} + for msg in state.history: + role = msg.get("role", "unknown") + roles[role] = roles.get(role, 0) + 1 + lines = [ + f"Conversation: {n_messages} messages", + f" user: {roles.get('user', 0)}, assistant: {roles.get('assistant', 0)}, " + f"tool: {roles.get('tool', 0)}, system: {roles.get('system', 0)}", + ] + model = state.model or getattr(state.agent, "model", "") + if model: + lines.append(f"Model: {model}") + return "\n".join(lines) + + def _cmd_reset(self, args: str, state: SessionState) -> str: + state.history.clear() + return "Conversation history cleared." + + def _cmd_compact(self, args: str, state: SessionState) -> str: + if not state.history: + return "Nothing to compress — conversation is empty." + try: + agent = state.agent + if hasattr(agent, "compress_context"): + agent.compress_context(state.history) + return f"Context compressed. Messages: {len(state.history)}" + return "Context compression not available for this agent." + except Exception as e: + return f"Compression failed: {e}" + + def _cmd_version(self, args: str, state: SessionState) -> str: + return f"Hermes Agent v{HERMES_VERSION}" + + # ---- Model switching (ACP protocol method) ------------------------------- async def set_session_model( self, model_id: str, session_id: str, **kwargs: Any ): - """Switch the model for a session.""" + """Switch the model for a session (called by ACP protocol).""" state = self.session_manager.get_session(session_id) if state: state.model = model_id diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index 96475c67..341f4b75 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -295,3 +295,97 @@ class TestOnConnect: mock_conn = MagicMock(spec=acp.Client) agent.on_connect(mock_conn) assert agent._conn is mock_conn + + +# --------------------------------------------------------------------------- +# Slash commands +# --------------------------------------------------------------------------- + + +class TestSlashCommands: + """Test slash command dispatch in the ACP adapter.""" + + def _make_state(self, mock_manager): + state = mock_manager.create_session(cwd="/tmp") + state.agent.model = "test-model" + state.agent.provider = "openrouter" + state.model = "test-model" + return state + + def test_help_lists_commands(self, agent, mock_manager): + state = self._make_state(mock_manager) + result = agent._handle_slash_command("/help", state) + assert result is not None + assert "/help" in result + assert "/model" in result + assert "/tools" in result + assert "/reset" in result + + def test_model_shows_current(self, agent, mock_manager): + state = self._make_state(mock_manager) + result = agent._handle_slash_command("/model", state) + assert "test-model" in result + + def test_context_empty(self, agent, mock_manager): + state = self._make_state(mock_manager) + state.history = [] + result = agent._handle_slash_command("/context", state) + assert "empty" in result.lower() + + def test_context_with_messages(self, agent, mock_manager): + state = self._make_state(mock_manager) + state.history = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "hi"}, + ] + result = agent._handle_slash_command("/context", state) + assert "2 messages" in result + assert "user: 1" in result + + def test_reset_clears_history(self, agent, mock_manager): + state = self._make_state(mock_manager) + state.history = [{"role": "user", "content": "hello"}] + result = agent._handle_slash_command("/reset", state) + assert "cleared" in result.lower() + assert len(state.history) == 0 + + def test_version(self, agent, mock_manager): + state = self._make_state(mock_manager) + result = agent._handle_slash_command("/version", state) + assert HERMES_VERSION in result + + def test_unknown_command_returns_none(self, agent, mock_manager): + state = self._make_state(mock_manager) + result = agent._handle_slash_command("/nonexistent", state) + assert result is None + + @pytest.mark.asyncio + async def test_slash_command_intercepted_in_prompt(self, agent, mock_manager): + """Slash commands should be handled without calling the LLM.""" + new_resp = await agent.new_session(cwd="/tmp") + mock_conn = AsyncMock(spec=acp.Client) + agent._conn = mock_conn + + prompt = [TextContentBlock(type="text", text="/help")] + resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id) + + assert resp.stop_reason == "end_turn" + mock_conn.session_update.assert_called_once() + + @pytest.mark.asyncio + async def test_unknown_slash_falls_through_to_llm(self, agent, mock_manager): + """Unknown /commands should be sent to the LLM, not intercepted.""" + new_resp = await agent.new_session(cwd="/tmp") + mock_conn = AsyncMock(spec=acp.Client) + agent._conn = mock_conn + + # Mock run_in_executor to avoid actually running the agent + with patch("asyncio.get_running_loop") as mock_loop: + mock_loop.return_value.run_in_executor = AsyncMock(return_value={ + "final_response": "I processed /foo", + "messages": [], + }) + prompt = [TextContentBlock(type="text", text="/foo bar")] + resp = await agent.prompt(prompt=prompt, session_id=new_resp.session_id) + + assert resp.stop_reason == "end_turn"