feat(acp): support slash commands in ACP adapter (#1532)

Adds /help, /model, /tools, /context, /reset, /compact, /version
to the ACP adapter (VS Code, Zed, JetBrains). Commands are handled
directly in the server without instantiating the TUI — each command
queries agent/session state and returns plain text.

Unrecognized /commands fall through to the LLM as normal messages.

/model uses detect_provider_for_model() for auto-detection when
switching models, matching the CLI and gateway behavior.

Fixes #1402
This commit is contained in:
Teknium
2026-03-16 05:19:36 -07:00
committed by GitHub
parent add945e53c
commit a2f0d14f29
2 changed files with 245 additions and 5 deletions

View File

@@ -42,7 +42,7 @@ from acp_adapter.events import (
make_tool_progress_cb,
)
from acp_adapter.permissions import make_approval_callback
from acp_adapter.session import SessionManager
from acp_adapter.session import SessionManager, SessionState
logger = logging.getLogger(__name__)
@@ -226,10 +226,19 @@ class HermesACPAgent(acp.Agent):
logger.error("prompt: session %s not found", session_id)
return PromptResponse(stop_reason="refusal")
user_text = _extract_text(prompt)
if not user_text.strip():
user_text = _extract_text(prompt).strip()
if not user_text:
return PromptResponse(stop_reason="end_turn")
# Intercept slash commands — handle locally without calling the LLM
if user_text.startswith("/"):
response_text = self._handle_slash_command(user_text, state)
if response_text is not None:
if self._conn:
update = acp.update_agent_message_text(response_text)
await self._conn.session_update(session_id, update)
return PromptResponse(stop_reason="end_turn")
logger.info("Prompt on session %s: %s", session_id, user_text[:100])
conn = self._conn
@@ -315,12 +324,149 @@ class HermesACPAgent(acp.Agent):
stop_reason = "cancelled" if state.cancel_event and state.cancel_event.is_set() else "end_turn"
return PromptResponse(stop_reason=stop_reason, usage=usage)
# ---- Model switching ----------------------------------------------------
# ---- Slash commands (headless) -------------------------------------------
_SLASH_COMMANDS = {
"help": "Show available commands",
"model": "Show or change current model",
"tools": "List available tools",
"context": "Show conversation context info",
"reset": "Clear conversation history",
"compact": "Compress conversation context",
"version": "Show Hermes version",
}
def _handle_slash_command(self, text: str, state: SessionState) -> str | None:
"""Dispatch a slash command and return the response text.
Returns ``None`` for unrecognized commands so they fall through
to the LLM (the user may have typed ``/something`` as prose).
"""
parts = text.split(maxsplit=1)
cmd = parts[0].lstrip("/").lower()
args = parts[1].strip() if len(parts) > 1 else ""
handler = {
"help": self._cmd_help,
"model": self._cmd_model,
"tools": self._cmd_tools,
"context": self._cmd_context,
"reset": self._cmd_reset,
"compact": self._cmd_compact,
"version": self._cmd_version,
}.get(cmd)
if handler is None:
return None # not a known command — let the LLM handle it
try:
return handler(args, state)
except Exception as e:
logger.error("Slash command /%s error: %s", cmd, e, exc_info=True)
return f"Error executing /{cmd}: {e}"
def _cmd_help(self, args: str, state: SessionState) -> str:
lines = ["Available commands:", ""]
for cmd, desc in self._SLASH_COMMANDS.items():
lines.append(f" /{cmd:10s} {desc}")
lines.append("")
lines.append("Unrecognized /commands are sent to the model as normal messages.")
return "\n".join(lines)
def _cmd_model(self, args: str, state: SessionState) -> str:
if not args:
model = state.model or getattr(state.agent, "model", "unknown")
provider = getattr(state.agent, "provider", None) or "auto"
return f"Current model: {model}\nProvider: {provider}"
new_model = args.strip()
target_provider = None
# Auto-detect provider for the requested model
try:
from hermes_cli.models import parse_model_input, detect_provider_for_model
current_provider = getattr(state.agent, "provider", None) or "openrouter"
target_provider, new_model = parse_model_input(new_model, current_provider)
if target_provider == current_provider:
detected = detect_provider_for_model(new_model, current_provider)
if detected:
target_provider, new_model = detected
except Exception:
logger.debug("Provider detection failed, using model as-is", exc_info=True)
state.model = new_model
state.agent = self.session_manager._make_agent(
session_id=state.session_id,
cwd=state.cwd,
model=new_model,
)
provider_label = target_provider or getattr(state.agent, "provider", "auto")
logger.info("Session %s: model switched to %s", state.session_id, new_model)
return f"Model switched to: {new_model}\nProvider: {provider_label}"
def _cmd_tools(self, args: str, state: SessionState) -> str:
try:
from model_tools import get_tool_definitions
toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"]
tools = get_tool_definitions(enabled_toolsets=toolsets, quiet_mode=True)
if not tools:
return "No tools available."
lines = [f"Available tools ({len(tools)}):"]
for t in tools:
name = t.get("function", {}).get("name", "?")
desc = t.get("function", {}).get("description", "")
# Truncate long descriptions
if len(desc) > 80:
desc = desc[:77] + "..."
lines.append(f" {name}: {desc}")
return "\n".join(lines)
except Exception as e:
return f"Could not list tools: {e}"
def _cmd_context(self, args: str, state: SessionState) -> str:
n_messages = len(state.history)
if n_messages == 0:
return "Conversation is empty (no messages yet)."
# Count by role
roles: dict[str, int] = {}
for msg in state.history:
role = msg.get("role", "unknown")
roles[role] = roles.get(role, 0) + 1
lines = [
f"Conversation: {n_messages} messages",
f" user: {roles.get('user', 0)}, assistant: {roles.get('assistant', 0)}, "
f"tool: {roles.get('tool', 0)}, system: {roles.get('system', 0)}",
]
model = state.model or getattr(state.agent, "model", "")
if model:
lines.append(f"Model: {model}")
return "\n".join(lines)
def _cmd_reset(self, args: str, state: SessionState) -> str:
state.history.clear()
return "Conversation history cleared."
def _cmd_compact(self, args: str, state: SessionState) -> str:
if not state.history:
return "Nothing to compress — conversation is empty."
try:
agent = state.agent
if hasattr(agent, "compress_context"):
agent.compress_context(state.history)
return f"Context compressed. Messages: {len(state.history)}"
return "Context compression not available for this agent."
except Exception as e:
return f"Compression failed: {e}"
def _cmd_version(self, args: str, state: SessionState) -> str:
return f"Hermes Agent v{HERMES_VERSION}"
# ---- Model switching (ACP protocol method) -------------------------------
async def set_session_model(
self, model_id: str, session_id: str, **kwargs: Any
):
"""Switch the model for a session."""
"""Switch the model for a session (called by ACP protocol)."""
state = self.session_manager.get_session(session_id)
if state:
state.model = model_id