"""OpenAI-compatible shim that forwards Hermes requests to `copilot --acp`. This adapter lets Hermes treat the GitHub Copilot ACP server as a chat-style backend. Each request starts a short-lived ACP session, sends the formatted conversation as a single prompt, collects text chunks, and converts the result back into the minimal shape Hermes expects from an OpenAI client. """ from __future__ import annotations import json import os import queue import shlex import subprocess import threading import time from collections import deque from pathlib import Path from types import SimpleNamespace from typing import Any ACP_MARKER_BASE_URL = "acp://copilot" _DEFAULT_TIMEOUT_SECONDS = 900.0 def _resolve_command() -> str: return ( os.getenv("HERMES_COPILOT_ACP_COMMAND", "").strip() or os.getenv("COPILOT_CLI_PATH", "").strip() or "copilot" ) def _resolve_args() -> list[str]: raw = os.getenv("HERMES_COPILOT_ACP_ARGS", "").strip() if not raw: return ["--acp", "--stdio"] return shlex.split(raw) def _jsonrpc_error(message_id: Any, code: int, message: str) -> dict[str, Any]: return { "jsonrpc": "2.0", "id": message_id, "error": { "code": code, "message": message, }, } def _format_messages_as_prompt(messages: list[dict[str, Any]], model: str | None = None) -> str: sections: list[str] = [ "You are being used as the active ACP agent backend for Hermes.", "Use your own ACP capabilities and respond directly in natural language.", "Do not emit OpenAI tool-call JSON.", ] if model: sections.append(f"Hermes requested model hint: {model}") transcript: list[str] = [] for message in messages: if not isinstance(message, dict): continue role = str(message.get("role") or "unknown").strip().lower() if role == "tool": role = "tool" elif role not in {"system", "user", "assistant"}: role = "context" content = message.get("content") rendered = _render_message_content(content) if not rendered: continue label = { "system": "System", "user": "User", "assistant": "Assistant", "tool": "Tool", "context": "Context", }.get(role, role.title()) transcript.append(f"{label}:\n{rendered}") if transcript: sections.append("Conversation transcript:\n\n" + "\n\n".join(transcript)) sections.append("Continue the conversation from the latest user request.") return "\n\n".join(section.strip() for section in sections if section and section.strip()) def _render_message_content(content: Any) -> str: if content is None: return "" if isinstance(content, str): return content.strip() if isinstance(content, dict): if "text" in content: return str(content.get("text") or "").strip() if "content" in content and isinstance(content.get("content"), str): return str(content.get("content") or "").strip() return json.dumps(content, ensure_ascii=True) if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, str): parts.append(item) elif isinstance(item, dict): text = item.get("text") if isinstance(text, str) and text.strip(): parts.append(text.strip()) return "\n".join(parts).strip() return str(content).strip() def _ensure_path_within_cwd(path_text: str, cwd: str) -> Path: candidate = Path(path_text) if not candidate.is_absolute(): raise PermissionError("ACP file-system paths must be absolute.") resolved = candidate.resolve() root = Path(cwd).resolve() try: resolved.relative_to(root) except ValueError as exc: raise PermissionError(f"Path '{resolved}' is outside the session cwd '{root}'.") from exc return resolved class _ACPChatCompletions: def __init__(self, client: "CopilotACPClient"): self._client = client def create(self, **kwargs: Any) -> Any: return self._client._create_chat_completion(**kwargs) class _ACPChatNamespace: def __init__(self, client: "CopilotACPClient"): self.completions = _ACPChatCompletions(client) class CopilotACPClient: """Minimal OpenAI-client-compatible facade for Copilot ACP.""" def __init__( self, *, api_key: str | None = None, base_url: str | None = None, default_headers: dict[str, str] | None = None, acp_command: str | None = None, acp_args: list[str] | None = None, acp_cwd: str | None = None, command: str | None = None, args: list[str] | None = None, **_: Any, ): self.api_key = api_key or "copilot-acp" self.base_url = base_url or ACP_MARKER_BASE_URL self._default_headers = dict(default_headers or {}) self._acp_command = acp_command or command or _resolve_command() self._acp_args = list(acp_args or args or _resolve_args()) self._acp_cwd = str(Path(acp_cwd or os.getcwd()).resolve()) self.chat = _ACPChatNamespace(self) self.is_closed = False self._active_process: subprocess.Popen[str] | None = None self._active_process_lock = threading.Lock() def close(self) -> None: proc: subprocess.Popen[str] | None with self._active_process_lock: proc = self._active_process self._active_process = None self.is_closed = True if proc is None: return try: proc.terminate() proc.wait(timeout=2) except Exception: try: proc.kill() except Exception: pass def _create_chat_completion( self, *, model: str | None = None, messages: list[dict[str, Any]] | None = None, timeout: float | None = None, **_: Any, ) -> Any: prompt_text = _format_messages_as_prompt(messages or [], model=model) response_text, reasoning_text = self._run_prompt( prompt_text, timeout_seconds=float(timeout or _DEFAULT_TIMEOUT_SECONDS), ) usage = SimpleNamespace( prompt_tokens=0, completion_tokens=0, total_tokens=0, prompt_tokens_details=SimpleNamespace(cached_tokens=0), ) assistant_message = SimpleNamespace( content=response_text, tool_calls=[], reasoning=reasoning_text or None, reasoning_content=reasoning_text or None, reasoning_details=None, ) choice = SimpleNamespace(message=assistant_message, finish_reason="stop") return SimpleNamespace( choices=[choice], usage=usage, model=model or "copilot-acp", ) def _run_prompt(self, prompt_text: str, *, timeout_seconds: float) -> tuple[str, str]: try: proc = subprocess.Popen( [self._acp_command] + self._acp_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, cwd=self._acp_cwd, ) except FileNotFoundError as exc: raise RuntimeError( f"Could not start Copilot ACP command '{self._acp_command}'. " "Install GitHub Copilot CLI or set HERMES_COPILOT_ACP_COMMAND/COPILOT_CLI_PATH." ) from exc if proc.stdin is None or proc.stdout is None: proc.kill() raise RuntimeError("Copilot ACP process did not expose stdin/stdout pipes.") self.is_closed = False with self._active_process_lock: self._active_process = proc inbox: queue.Queue[dict[str, Any]] = queue.Queue() stderr_tail: deque[str] = deque(maxlen=40) def _stdout_reader() -> None: for line in proc.stdout: try: inbox.put(json.loads(line)) except Exception: inbox.put({"raw": line.rstrip("\n")}) def _stderr_reader() -> None: if proc.stderr is None: return for line in proc.stderr: stderr_tail.append(line.rstrip("\n")) out_thread = threading.Thread(target=_stdout_reader, daemon=True) err_thread = threading.Thread(target=_stderr_reader, daemon=True) out_thread.start() err_thread.start() next_id = 0 def _request(method: str, params: dict[str, Any], *, text_parts: list[str] | None = None, reasoning_parts: list[str] | None = None) -> Any: nonlocal next_id next_id += 1 request_id = next_id payload = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params, } proc.stdin.write(json.dumps(payload) + "\n") proc.stdin.flush() deadline = time.time() + timeout_seconds while time.time() < deadline: if proc.poll() is not None: break try: msg = inbox.get(timeout=0.1) except queue.Empty: continue if self._handle_server_message( msg, process=proc, cwd=self._acp_cwd, text_parts=text_parts, reasoning_parts=reasoning_parts, ): continue if msg.get("id") != request_id: continue if "error" in msg: err = msg.get("error") or {} raise RuntimeError( f"Copilot ACP {method} failed: {err.get('message') or err}" ) return msg.get("result") stderr_text = "\n".join(stderr_tail).strip() if proc.poll() is not None and stderr_text: raise RuntimeError(f"Copilot ACP process exited early: {stderr_text}") raise TimeoutError(f"Timed out waiting for Copilot ACP response to {method}.") try: _request( "initialize", { "protocolVersion": 1, "clientCapabilities": { "fs": { "readTextFile": True, "writeTextFile": True, } }, "clientInfo": { "name": "hermes-agent", "title": "Hermes Agent", "version": "0.0.0", }, }, ) session = _request( "session/new", { "cwd": self._acp_cwd, "mcpServers": [], }, ) or {} session_id = str(session.get("sessionId") or "").strip() if not session_id: raise RuntimeError("Copilot ACP did not return a sessionId.") text_parts: list[str] = [] reasoning_parts: list[str] = [] _request( "session/prompt", { "sessionId": session_id, "prompt": [ { "type": "text", "text": prompt_text, } ], }, text_parts=text_parts, reasoning_parts=reasoning_parts, ) return "".join(text_parts), "".join(reasoning_parts) finally: self.close() def _handle_server_message( self, msg: dict[str, Any], *, process: subprocess.Popen[str], cwd: str, text_parts: list[str] | None, reasoning_parts: list[str] | None, ) -> bool: method = msg.get("method") if not isinstance(method, str): return False if method == "session/update": params = msg.get("params") or {} update = params.get("update") or {} kind = str(update.get("sessionUpdate") or "").strip() content = update.get("content") or {} chunk_text = "" if isinstance(content, dict): chunk_text = str(content.get("text") or "") if kind == "agent_message_chunk" and chunk_text and text_parts is not None: text_parts.append(chunk_text) elif kind == "agent_thought_chunk" and chunk_text and reasoning_parts is not None: reasoning_parts.append(chunk_text) return True if process.stdin is None: return True message_id = msg.get("id") params = msg.get("params") or {} if method == "session/request_permission": response = { "jsonrpc": "2.0", "id": message_id, "result": { "outcome": { "outcome": "allow_once", } }, } elif method == "fs/read_text_file": try: path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd) content = path.read_text() if path.exists() else "" line = params.get("line") limit = params.get("limit") if isinstance(line, int) and line > 1: lines = content.splitlines(keepends=True) start = line - 1 end = start + limit if isinstance(limit, int) and limit > 0 else None content = "".join(lines[start:end]) response = { "jsonrpc": "2.0", "id": message_id, "result": { "content": content, }, } except Exception as exc: response = _jsonrpc_error(message_id, -32602, str(exc)) elif method == "fs/write_text_file": try: path = _ensure_path_within_cwd(str(params.get("path") or ""), cwd) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(str(params.get("content") or "")) response = { "jsonrpc": "2.0", "id": message_id, "result": None, } except Exception as exc: response = _jsonrpc_error(message_id, -32602, str(exc)) else: response = _jsonrpc_error( message_id, -32601, f"ACP client method '{method}' is not supported by Hermes yet.", ) process.stdin.write(json.dumps(response) + "\n") process.stdin.flush() return True