"""Anthropic Messages API adapter for Hermes Agent. Translates between Hermes's internal OpenAI-style message format and Anthropic's Messages API. Follows the same pattern as the codex_responses adapter — all provider-specific logic is isolated here. Auth supports: - Regular API keys (sk-ant-api*) → x-api-key header - OAuth setup-tokens (sk-ant-oat*) → Bearer auth + beta header - Claude Code credentials (~/.claude.json or ~/.claude/.credentials.json) → Bearer auth """ import json import logging import os from pathlib import Path from types import SimpleNamespace from typing import Any, Dict, List, Optional, Tuple try: import anthropic as _anthropic_sdk except ImportError: _anthropic_sdk = None # type: ignore[assignment] logger = logging.getLogger(__name__) THINKING_BUDGET = {"xhigh": 32000, "high": 16000, "medium": 8000, "low": 4000} ADAPTIVE_EFFORT_MAP = { "xhigh": "max", "high": "high", "medium": "medium", "low": "low", "minimal": "low", } def _supports_adaptive_thinking(model: str) -> bool: """Return True for Claude 4.6 models that support adaptive thinking.""" return any(v in model for v in ("4-6", "4.6")) # Beta headers for enhanced features (sent with ALL auth types) _COMMON_BETAS = [ "interleaved-thinking-2025-05-14", "fine-grained-tool-streaming-2025-05-14", ] # Additional beta headers required for OAuth/subscription auth. # Matches what Claude Code (and pi-ai / OpenCode) send. _OAUTH_ONLY_BETAS = [ "claude-code-20250219", "oauth-2025-04-20", ] # Claude Code identity — required for OAuth requests to be routed correctly. # Without these, Anthropic's infrastructure intermittently 500s OAuth traffic. # The version must stay reasonably current — Anthropic rejects OAuth requests # when the spoofed user-agent version is too far behind the actual release. _CLAUDE_CODE_VERSION_FALLBACK = "2.1.74" def _detect_claude_code_version() -> str: """Detect the installed Claude Code version, fall back to a static constant. Anthropic's OAuth infrastructure validates the user-agent version and may reject requests with a version that's too old. Detecting dynamically means users who keep Claude Code updated never hit stale-version 400s. """ import subprocess as _sp for cmd in ("claude", "claude-code"): try: result = _sp.run( [cmd, "--version"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0 and result.stdout.strip(): # Output is like "2.1.74 (Claude Code)" or just "2.1.74" version = result.stdout.strip().split()[0] if version and version[0].isdigit(): return version except Exception: pass return _CLAUDE_CODE_VERSION_FALLBACK _CLAUDE_CODE_VERSION = _detect_claude_code_version() _CLAUDE_CODE_SYSTEM_PREFIX = "You are Claude Code, Anthropic's official CLI for Claude." _MCP_TOOL_PREFIX = "mcp_" def _is_oauth_token(key: str) -> bool: """Check if the key is an OAuth/setup token (not a regular Console API key). Regular API keys start with 'sk-ant-api'. Everything else (setup-tokens starting with 'sk-ant-oat', managed keys, JWTs, etc.) needs Bearer auth. """ if not key: return False # Regular Console API keys use x-api-key header if key.startswith("sk-ant-api"): return False # Everything else (setup-tokens, managed keys, JWTs) uses Bearer auth return True def build_anthropic_client(api_key: str, base_url: str = None): """Create an Anthropic client, auto-detecting setup-tokens vs API keys. Returns an anthropic.Anthropic instance. """ if _anthropic_sdk is None: raise ImportError( "The 'anthropic' package is required for the Anthropic provider. " "Install it with: pip install 'anthropic>=0.39.0'" ) from httpx import Timeout kwargs = { "timeout": Timeout(timeout=900.0, connect=10.0), } if base_url: kwargs["base_url"] = base_url if _is_oauth_token(api_key): # OAuth access token / setup-token → Bearer auth + Claude Code identity. # Anthropic routes OAuth requests based on user-agent and headers; # without Claude Code's fingerprint, requests get intermittent 500s. all_betas = _COMMON_BETAS + _OAUTH_ONLY_BETAS kwargs["auth_token"] = api_key kwargs["default_headers"] = { "anthropic-beta": ",".join(all_betas), "user-agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)", "x-app": "cli", } else: # Regular API key → x-api-key header + common betas kwargs["api_key"] = api_key if _COMMON_BETAS: kwargs["default_headers"] = {"anthropic-beta": ",".join(_COMMON_BETAS)} return _anthropic_sdk.Anthropic(**kwargs) def read_claude_code_credentials() -> Optional[Dict[str, Any]]: """Read refreshable Claude Code OAuth credentials from ~/.claude/.credentials.json. This intentionally excludes ~/.claude.json primaryApiKey. Opencode's subscription flow is OAuth/setup-token based with refreshable credentials, and native direct Anthropic provider usage should follow that path rather than auto-detecting Claude's first-party managed key. Returns dict with {accessToken, refreshToken?, expiresAt?} or None. """ cred_path = Path.home() / ".claude" / ".credentials.json" if cred_path.exists(): try: data = json.loads(cred_path.read_text(encoding="utf-8")) oauth_data = data.get("claudeAiOauth") if oauth_data and isinstance(oauth_data, dict): access_token = oauth_data.get("accessToken", "") if access_token: return { "accessToken": access_token, "refreshToken": oauth_data.get("refreshToken", ""), "expiresAt": oauth_data.get("expiresAt", 0), "source": "claude_code_credentials_file", } except (json.JSONDecodeError, OSError, IOError) as e: logger.debug("Failed to read ~/.claude/.credentials.json: %s", e) return None def read_claude_managed_key() -> Optional[str]: """Read Claude's native managed key from ~/.claude.json for diagnostics only.""" claude_json = Path.home() / ".claude.json" if claude_json.exists(): try: data = json.loads(claude_json.read_text(encoding="utf-8")) primary_key = data.get("primaryApiKey", "") if isinstance(primary_key, str) and primary_key.strip(): return primary_key.strip() except (json.JSONDecodeError, OSError, IOError) as e: logger.debug("Failed to read ~/.claude.json: %s", e) return None def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool: """Check if Claude Code credentials have a non-expired access token.""" import time expires_at = creds.get("expiresAt", 0) if not expires_at: # No expiry set (managed keys) — valid if token is present return bool(creds.get("accessToken")) # expiresAt is in milliseconds since epoch now_ms = int(time.time() * 1000) # Allow 60 seconds of buffer return now_ms < (expires_at - 60_000) def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]: """Attempt to refresh an expired Claude Code OAuth token. Uses the same token endpoint and client_id as Claude Code / OpenCode. Only works for credentials that have a refresh token (from claude /login or claude setup-token with OAuth flow). Returns the new access token, or None if refresh fails. """ import urllib.parse import urllib.request refresh_token = creds.get("refreshToken", "") if not refresh_token: logger.debug("No refresh token available — cannot refresh") return None # Client ID used by Claude Code's OAuth flow CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" data = urllib.parse.urlencode({ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": CLIENT_ID, }).encode() req = urllib.request.Request( "https://console.anthropic.com/v1/oauth/token", data=data, headers={ "Content-Type": "application/x-www-form-urlencoded", "User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read().decode()) new_access = result.get("access_token", "") new_refresh = result.get("refresh_token", refresh_token) expires_in = result.get("expires_in", 3600) # seconds if new_access: import time new_expires_ms = int(time.time() * 1000) + (expires_in * 1000) # Write refreshed credentials back to ~/.claude/.credentials.json _write_claude_code_credentials(new_access, new_refresh, new_expires_ms) logger.debug("Successfully refreshed Claude Code OAuth token") return new_access except Exception as e: logger.debug("Failed to refresh Claude Code token: %s", e) return None def _write_claude_code_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None: """Write refreshed credentials back to ~/.claude/.credentials.json.""" cred_path = Path.home() / ".claude" / ".credentials.json" try: # Read existing file to preserve other fields existing = {} if cred_path.exists(): existing = json.loads(cred_path.read_text(encoding="utf-8")) existing["claudeAiOauth"] = { "accessToken": access_token, "refreshToken": refresh_token, "expiresAt": expires_at_ms, } cred_path.parent.mkdir(parents=True, exist_ok=True) cred_path.write_text(json.dumps(existing, indent=2), encoding="utf-8") # Restrict permissions (credentials file) cred_path.chmod(0o600) except (OSError, IOError) as e: logger.debug("Failed to write refreshed credentials: %s", e) def _resolve_claude_code_token_from_credentials(creds: Optional[Dict[str, Any]] = None) -> Optional[str]: """Resolve a token from Claude Code credential files, refreshing if needed.""" creds = creds or read_claude_code_credentials() if creds and is_claude_code_token_valid(creds): logger.debug("Using Claude Code credentials (auto-detected)") return creds["accessToken"] if creds: logger.debug("Claude Code credentials expired — attempting refresh") refreshed = _refresh_oauth_token(creds) if refreshed: return refreshed logger.debug("Token refresh failed — re-run 'claude setup-token' to reauthenticate") return None def _prefer_refreshable_claude_code_token(env_token: str, creds: Optional[Dict[str, Any]]) -> Optional[str]: """Prefer Claude Code creds when a persisted env OAuth token would shadow refresh. Hermes historically persisted setup tokens into ANTHROPIC_TOKEN. That makes later refresh impossible because the static env token wins before we ever inspect Claude Code's refreshable credential file. If we have a refreshable Claude Code credential record, prefer it over the static env OAuth token. """ if not env_token or not _is_oauth_token(env_token) or not isinstance(creds, dict): return None if not creds.get("refreshToken"): return None resolved = _resolve_claude_code_token_from_credentials(creds) if resolved and resolved != env_token: logger.debug( "Preferring Claude Code credential file over static env OAuth token so refresh can proceed" ) return resolved return None def get_anthropic_token_source(token: Optional[str] = None) -> str: """Best-effort source classification for an Anthropic credential token.""" token = (token or "").strip() if not token: return "none" env_token = os.getenv("ANTHROPIC_TOKEN", "").strip() if env_token and env_token == token: return "anthropic_token_env" cc_env_token = os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "").strip() if cc_env_token and cc_env_token == token: return "claude_code_oauth_token_env" creds = read_claude_code_credentials() if creds and creds.get("accessToken") == token: return str(creds.get("source") or "claude_code_credentials") managed_key = read_claude_managed_key() if managed_key and managed_key == token: return "claude_json_primary_api_key" api_key = os.getenv("ANTHROPIC_API_KEY", "").strip() if api_key and api_key == token: return "anthropic_api_key_env" return "unknown" def resolve_anthropic_token() -> Optional[str]: """Resolve an Anthropic token from all available sources. Priority: 1. ANTHROPIC_TOKEN env var (OAuth/setup token saved by Hermes) 2. CLAUDE_CODE_OAUTH_TOKEN env var 3. Claude Code credentials (~/.claude.json or ~/.claude/.credentials.json) — with automatic refresh if expired and a refresh token is available 4. ANTHROPIC_API_KEY env var (regular API key, or legacy fallback) Returns the token string or None. """ creds = read_claude_code_credentials() # 1. Hermes-managed OAuth/setup token env var token = os.getenv("ANTHROPIC_TOKEN", "").strip() if token: preferred = _prefer_refreshable_claude_code_token(token, creds) if preferred: return preferred return token # 2. CLAUDE_CODE_OAUTH_TOKEN (used by Claude Code for setup-tokens) cc_token = os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "").strip() if cc_token: preferred = _prefer_refreshable_claude_code_token(cc_token, creds) if preferred: return preferred return cc_token # 3. Hermes-managed OAuth credentials (~/.hermes/.anthropic_oauth.json) hermes_creds = read_hermes_oauth_credentials() if hermes_creds: if is_claude_code_token_valid(hermes_creds): logger.debug("Using Hermes-managed OAuth credentials") return hermes_creds["accessToken"] # Expired — try refresh logger.debug("Hermes OAuth token expired — attempting refresh") refreshed = refresh_hermes_oauth_token() if refreshed: return refreshed # 4. Claude Code credential file resolved_claude_token = _resolve_claude_code_token_from_credentials(creds) if resolved_claude_token: return resolved_claude_token # 5. Regular API key, or a legacy OAuth token saved in ANTHROPIC_API_KEY. # This remains as a compatibility fallback for pre-migration Hermes configs. api_key = os.getenv("ANTHROPIC_API_KEY", "").strip() if api_key: return api_key return None def run_oauth_setup_token() -> Optional[str]: """Run 'claude setup-token' interactively and return the resulting token. Checks multiple sources after the subprocess completes: 1. Claude Code credential files (may be written by the subprocess) 2. CLAUDE_CODE_OAUTH_TOKEN / ANTHROPIC_TOKEN env vars Returns the token string, or None if no credentials were obtained. Raises FileNotFoundError if the 'claude' CLI is not installed. """ import shutil import subprocess claude_path = shutil.which("claude") if not claude_path: raise FileNotFoundError( "The 'claude' CLI is not installed. " "Install it with: npm install -g @anthropic-ai/claude-code" ) # Run interactively — stdin/stdout/stderr inherited so user can interact try: subprocess.run([claude_path, "setup-token"]) except (KeyboardInterrupt, EOFError): return None # Check if credentials were saved to Claude Code's config files creds = read_claude_code_credentials() if creds and is_claude_code_token_valid(creds): return creds["accessToken"] # Check env vars that may have been set for env_var in ("CLAUDE_CODE_OAUTH_TOKEN", "ANTHROPIC_TOKEN"): val = os.getenv(env_var, "").strip() if val: return val return None # ── Hermes-native PKCE OAuth flow ──────────────────────────────────────── # Mirrors the flow used by Claude Code, pi-ai, and OpenCode. # Stores credentials in ~/.hermes/.anthropic_oauth.json (our own file). _OAUTH_CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" _OAUTH_TOKEN_URL = "https://console.anthropic.com/v1/oauth/token" _OAUTH_REDIRECT_URI = "https://console.anthropic.com/oauth/code/callback" _OAUTH_SCOPES = "org:create_api_key user:profile user:inference" _HERMES_OAUTH_FILE = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) / ".anthropic_oauth.json" def _generate_pkce() -> tuple: """Generate PKCE code_verifier and code_challenge (S256).""" import base64 import hashlib import secrets verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode() challenge = base64.urlsafe_b64encode( hashlib.sha256(verifier.encode()).digest() ).rstrip(b"=").decode() return verifier, challenge def run_hermes_oauth_login() -> Optional[str]: """Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription. Opens a browser to claude.ai for authorization, prompts for the code, exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json. Returns the access token on success, None on failure. """ import time import webbrowser verifier, challenge = _generate_pkce() # Build authorization URL params = { "code": "true", "client_id": _OAUTH_CLIENT_ID, "response_type": "code", "redirect_uri": _OAUTH_REDIRECT_URI, "scope": _OAUTH_SCOPES, "code_challenge": challenge, "code_challenge_method": "S256", "state": verifier, } from urllib.parse import urlencode auth_url = f"https://claude.ai/oauth/authorize?{urlencode(params)}" print() print("Authorize Hermes with your Claude Pro/Max subscription.") print() print("╭─ Claude Pro/Max Authorization ────────────────────╮") print("│ │") print("│ Open this link in your browser: │") print("╰───────────────────────────────────────────────────╯") print() print(f" {auth_url}") print() # Try to open browser automatically (works on desktop, silently fails on headless/SSH) try: webbrowser.open(auth_url) print(" (Browser opened automatically)") except Exception: pass print() print("After authorizing, you'll see a code. Paste it below.") print() try: auth_code = input("Authorization code: ").strip() except (KeyboardInterrupt, EOFError): return None if not auth_code: print("No code entered.") return None # Split code#state format splits = auth_code.split("#") code = splits[0] state = splits[1] if len(splits) > 1 else "" # Exchange code for tokens try: import urllib.request exchange_data = json.dumps({ "grant_type": "authorization_code", "client_id": _OAUTH_CLIENT_ID, "code": code, "state": state, "redirect_uri": _OAUTH_REDIRECT_URI, "code_verifier": verifier, }).encode() req = urllib.request.Request( _OAUTH_TOKEN_URL, data=exchange_data, headers={ "Content-Type": "application/json", "User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)", }, method="POST", ) with urllib.request.urlopen(req, timeout=15) as resp: result = json.loads(resp.read().decode()) except Exception as e: print(f"Token exchange failed: {e}") return None access_token = result.get("access_token", "") refresh_token = result.get("refresh_token", "") expires_in = result.get("expires_in", 3600) if not access_token: print("No access token in response.") return None # Store credentials expires_at_ms = int(time.time() * 1000) + (expires_in * 1000) _save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms) # Also write to Claude Code's credential file for backward compat _write_claude_code_credentials(access_token, refresh_token, expires_at_ms) print("Authentication successful!") return access_token def _save_hermes_oauth_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None: """Save OAuth credentials to ~/.hermes/.anthropic_oauth.json.""" data = { "accessToken": access_token, "refreshToken": refresh_token, "expiresAt": expires_at_ms, } try: _HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True) _HERMES_OAUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") _HERMES_OAUTH_FILE.chmod(0o600) except (OSError, IOError) as e: logger.debug("Failed to save Hermes OAuth credentials: %s", e) def read_hermes_oauth_credentials() -> Optional[Dict[str, Any]]: """Read Hermes-managed OAuth credentials from ~/.hermes/.anthropic_oauth.json.""" if _HERMES_OAUTH_FILE.exists(): try: data = json.loads(_HERMES_OAUTH_FILE.read_text(encoding="utf-8")) if data.get("accessToken"): return data except (json.JSONDecodeError, OSError, IOError) as e: logger.debug("Failed to read Hermes OAuth credentials: %s", e) return None def refresh_hermes_oauth_token() -> Optional[str]: """Refresh the Hermes-managed OAuth token using the stored refresh token. Returns the new access token, or None if refresh fails. """ import time import urllib.request creds = read_hermes_oauth_credentials() if not creds or not creds.get("refreshToken"): return None try: data = json.dumps({ "grant_type": "refresh_token", "refresh_token": creds["refreshToken"], "client_id": _OAUTH_CLIENT_ID, }).encode() req = urllib.request.Request( _OAUTH_TOKEN_URL, data=data, headers={ "Content-Type": "application/json", "User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)", }, method="POST", ) with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read().decode()) new_access = result.get("access_token", "") new_refresh = result.get("refresh_token", creds["refreshToken"]) expires_in = result.get("expires_in", 3600) if new_access: new_expires_ms = int(time.time() * 1000) + (expires_in * 1000) _save_hermes_oauth_credentials(new_access, new_refresh, new_expires_ms) # Also update Claude Code's credential file _write_claude_code_credentials(new_access, new_refresh, new_expires_ms) logger.debug("Successfully refreshed Hermes OAuth token") return new_access except Exception as e: logger.debug("Failed to refresh Hermes OAuth token: %s", e) return None # --------------------------------------------------------------------------- # Message / tool / response format conversion # --------------------------------------------------------------------------- def normalize_model_name(model: str) -> str: """Normalize a model name for the Anthropic API. - Strips 'anthropic/' prefix (OpenRouter format, case-insensitive) - Converts dots to hyphens in version numbers (OpenRouter uses dots, Anthropic uses hyphens: claude-opus-4.6 → claude-opus-4-6) """ lower = model.lower() if lower.startswith("anthropic/"): model = model[len("anthropic/"):] # OpenRouter uses dots for version separators (claude-opus-4.6), # Anthropic uses hyphens (claude-opus-4-6). Convert dots to hyphens. model = model.replace(".", "-") return model def _sanitize_tool_id(tool_id: str) -> str: """Sanitize a tool call ID for the Anthropic API. Anthropic requires IDs matching [a-zA-Z0-9_-]. Replace invalid characters with underscores and ensure non-empty. """ import re if not tool_id: return "tool_0" sanitized = re.sub(r"[^a-zA-Z0-9_-]", "_", tool_id) return sanitized or "tool_0" def _convert_openai_image_part_to_anthropic(part: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Convert an OpenAI-style image block to Anthropic's image source format.""" image_data = part.get("image_url", {}) url = image_data.get("url", "") if isinstance(image_data, dict) else str(image_data) if not isinstance(url, str) or not url.strip(): return None url = url.strip() if url.startswith("data:"): header, sep, data = url.partition(",") if sep and ";base64" in header: media_type = header[5:].split(";", 1)[0] or "image/png" return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": data, }, } if url.startswith("http://") or url.startswith("https://"): return { "type": "image", "source": { "type": "url", "url": url, }, } return None def _convert_user_content_part_to_anthropic(part: Any) -> Optional[Dict[str, Any]]: if isinstance(part, dict): ptype = part.get("type") if ptype == "text": block = {"type": "text", "text": part.get("text", "")} if isinstance(part.get("cache_control"), dict): block["cache_control"] = dict(part["cache_control"]) return block if ptype == "image_url": return _convert_openai_image_part_to_anthropic(part) if ptype == "image" and part.get("source"): return dict(part) if ptype == "image" and part.get("data"): media_type = part.get("mimeType") or part.get("media_type") or "image/png" return { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": part.get("data", ""), }, } if ptype == "tool_result": return dict(part) elif part is not None: return {"type": "text", "text": str(part)} return None def convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]: """Convert OpenAI tool definitions to Anthropic format.""" if not tools: return [] result = [] for t in tools: fn = t.get("function", {}) result.append({ "name": fn.get("name", ""), "description": fn.get("description", ""), "input_schema": fn.get("parameters", {"type": "object", "properties": {}}), }) return result def _image_source_from_openai_url(url: str) -> Dict[str, str]: """Convert an OpenAI-style image URL/data URL into Anthropic image source.""" url = str(url or "").strip() if not url: return {"type": "url", "url": ""} if url.startswith("data:"): header, _, data = url.partition(",") media_type = "image/jpeg" if header.startswith("data:"): mime_part = header[len("data:"):].split(";", 1)[0].strip() if mime_part.startswith("image/"): media_type = mime_part return { "type": "base64", "media_type": media_type, "data": data, } return {"type": "url", "url": url} def _convert_content_part_to_anthropic(part: Any) -> Optional[Dict[str, Any]]: """Convert a single OpenAI-style content part to Anthropic format.""" if part is None: return None if isinstance(part, str): return {"type": "text", "text": part} if not isinstance(part, dict): return {"type": "text", "text": str(part)} ptype = part.get("type") if ptype == "input_text": block: Dict[str, Any] = {"type": "text", "text": part.get("text", "")} elif ptype in {"image_url", "input_image"}: image_value = part.get("image_url", {}) url = image_value.get("url", "") if isinstance(image_value, dict) else str(image_value or "") block = {"type": "image", "source": _image_source_from_openai_url(url)} else: block = dict(part) if isinstance(part.get("cache_control"), dict) and "cache_control" not in block: block["cache_control"] = dict(part["cache_control"]) return block def _convert_content_to_anthropic(content: Any) -> Any: """Convert OpenAI-style multimodal content arrays to Anthropic blocks.""" if not isinstance(content, list): return content converted = [] for part in content: block = _convert_content_part_to_anthropic(part) if block is not None: converted.append(block) return converted def convert_messages_to_anthropic( messages: List[Dict], ) -> Tuple[Optional[Any], List[Dict]]: """Convert OpenAI-format messages to Anthropic format. Returns (system_prompt, anthropic_messages). System messages are extracted since Anthropic takes them as a separate param. system_prompt is a string or list of content blocks (when cache_control present). """ system = None result = [] for m in messages: role = m.get("role", "user") content = m.get("content", "") if role == "system": if isinstance(content, list): # Preserve cache_control markers on content blocks has_cache = any( p.get("cache_control") for p in content if isinstance(p, dict) ) if has_cache: system = [p for p in content if isinstance(p, dict)] else: system = "\n".join( p["text"] for p in content if p.get("type") == "text" ) else: system = content continue if role == "assistant": blocks = [] if content: if isinstance(content, list): converted_content = _convert_content_to_anthropic(content) if isinstance(converted_content, list): blocks.extend(converted_content) else: blocks.append({"type": "text", "text": str(content)}) for tc in m.get("tool_calls", []): if not tc or not isinstance(tc, dict): continue fn = tc.get("function", {}) args = fn.get("arguments", "{}") try: parsed_args = json.loads(args) if isinstance(args, str) else args except (json.JSONDecodeError, ValueError): parsed_args = {} blocks.append({ "type": "tool_use", "id": _sanitize_tool_id(tc.get("id", "")), "name": fn.get("name", ""), "input": parsed_args, }) # Anthropic rejects empty assistant content effective = blocks or content if not effective or effective == "": effective = [{"type": "text", "text": "(empty)"}] result.append({"role": "assistant", "content": effective}) continue if role == "tool": # Sanitize tool_use_id and ensure non-empty content result_content = content if isinstance(content, str) else json.dumps(content) if not result_content: result_content = "(no output)" tool_result = { "type": "tool_result", "tool_use_id": _sanitize_tool_id(m.get("tool_call_id", "")), "content": result_content, } if isinstance(m.get("cache_control"), dict): tool_result["cache_control"] = dict(m["cache_control"]) # Merge consecutive tool results into one user message if ( result and result[-1]["role"] == "user" and isinstance(result[-1]["content"], list) and result[-1]["content"] and result[-1]["content"][0].get("type") == "tool_result" ): result[-1]["content"].append(tool_result) else: result.append({"role": "user", "content": [tool_result]}) continue # Regular user message if isinstance(content, list): converted_blocks = _convert_content_to_anthropic(content) result.append({ "role": "user", "content": converted_blocks or [{"type": "text", "text": ""}], }) else: result.append({"role": "user", "content": content}) # Strip orphaned tool_use blocks (no matching tool_result follows) tool_result_ids = set() for m in result: if m["role"] == "user" and isinstance(m["content"], list): for block in m["content"]: if block.get("type") == "tool_result": tool_result_ids.add(block.get("tool_use_id")) for m in result: if m["role"] == "assistant" and isinstance(m["content"], list): m["content"] = [ b for b in m["content"] if b.get("type") != "tool_use" or b.get("id") in tool_result_ids ] if not m["content"]: m["content"] = [{"type": "text", "text": "(tool call removed)"}] # Strip orphaned tool_result blocks (no matching tool_use precedes them). # This is the mirror of the above: context compression or session truncation # can remove an assistant message containing a tool_use while leaving the # subsequent tool_result intact. Anthropic rejects these with a 400. tool_use_ids = set() for m in result: if m["role"] == "assistant" and isinstance(m["content"], list): for block in m["content"]: if block.get("type") == "tool_use": tool_use_ids.add(block.get("id")) for m in result: if m["role"] == "user" and isinstance(m["content"], list): m["content"] = [ b for b in m["content"] if b.get("type") != "tool_result" or b.get("tool_use_id") in tool_use_ids ] if not m["content"]: m["content"] = [{"type": "text", "text": "(tool result removed)"}] # Enforce strict role alternation (Anthropic rejects consecutive same-role messages) fixed = [] for m in result: if fixed and fixed[-1]["role"] == m["role"]: if m["role"] == "user": # Merge consecutive user messages prev_content = fixed[-1]["content"] curr_content = m["content"] if isinstance(prev_content, str) and isinstance(curr_content, str): fixed[-1]["content"] = prev_content + "\n" + curr_content elif isinstance(prev_content, list) and isinstance(curr_content, list): fixed[-1]["content"] = prev_content + curr_content else: # Mixed types — wrap string in list if isinstance(prev_content, str): prev_content = [{"type": "text", "text": prev_content}] if isinstance(curr_content, str): curr_content = [{"type": "text", "text": curr_content}] fixed[-1]["content"] = prev_content + curr_content else: # Consecutive assistant messages — merge text content prev_blocks = fixed[-1]["content"] curr_blocks = m["content"] if isinstance(prev_blocks, list) and isinstance(curr_blocks, list): fixed[-1]["content"] = prev_blocks + curr_blocks elif isinstance(prev_blocks, str) and isinstance(curr_blocks, str): fixed[-1]["content"] = prev_blocks + "\n" + curr_blocks else: # Mixed types — normalize both to list and merge if isinstance(prev_blocks, str): prev_blocks = [{"type": "text", "text": prev_blocks}] if isinstance(curr_blocks, str): curr_blocks = [{"type": "text", "text": curr_blocks}] fixed[-1]["content"] = prev_blocks + curr_blocks else: fixed.append(m) result = fixed return system, result def build_anthropic_kwargs( model: str, messages: List[Dict], tools: Optional[List[Dict]], max_tokens: Optional[int], reasoning_config: Optional[Dict[str, Any]], tool_choice: Optional[str] = None, is_oauth: bool = False, ) -> Dict[str, Any]: """Build kwargs for anthropic.messages.create(). When *is_oauth* is True, applies Claude Code compatibility transforms: system prompt prefix, tool name prefixing, and prompt sanitization. """ system, anthropic_messages = convert_messages_to_anthropic(messages) anthropic_tools = convert_tools_to_anthropic(tools) if tools else [] model = normalize_model_name(model) effective_max_tokens = max_tokens or 16384 # ── OAuth: Claude Code identity ────────────────────────────────── if is_oauth: # 1. Prepend Claude Code system prompt identity cc_block = {"type": "text", "text": _CLAUDE_CODE_SYSTEM_PREFIX} if isinstance(system, list): system = [cc_block] + system elif isinstance(system, str) and system: system = [cc_block, {"type": "text", "text": system}] else: system = [cc_block] # 2. Sanitize system prompt — replace product name references # to avoid Anthropic's server-side content filters. for block in system: if isinstance(block, dict) and block.get("type") == "text": text = block.get("text", "") text = text.replace("Hermes Agent", "Claude Code") text = text.replace("Hermes agent", "Claude Code") text = text.replace("hermes-agent", "claude-code") text = text.replace("Nous Research", "Anthropic") block["text"] = text # 3. Prefix tool names with mcp_ (Claude Code convention) if anthropic_tools: for tool in anthropic_tools: if "name" in tool: tool["name"] = _MCP_TOOL_PREFIX + tool["name"] # 4. Prefix tool names in message history (tool_use and tool_result blocks) for msg in anthropic_messages: content = msg.get("content") if isinstance(content, list): for block in content: if isinstance(block, dict): if block.get("type") == "tool_use" and "name" in block: if not block["name"].startswith(_MCP_TOOL_PREFIX): block["name"] = _MCP_TOOL_PREFIX + block["name"] elif block.get("type") == "tool_result" and "tool_use_id" in block: pass # tool_result uses ID, not name kwargs: Dict[str, Any] = { "model": model, "messages": anthropic_messages, "max_tokens": effective_max_tokens, } if system: kwargs["system"] = system if anthropic_tools: kwargs["tools"] = anthropic_tools # Map OpenAI tool_choice to Anthropic format if tool_choice == "auto" or tool_choice is None: kwargs["tool_choice"] = {"type": "auto"} elif tool_choice == "required": kwargs["tool_choice"] = {"type": "any"} elif tool_choice == "none": # Anthropic has no tool_choice "none" — omit tools entirely to prevent use kwargs.pop("tools", None) elif isinstance(tool_choice, str): # Specific tool name kwargs["tool_choice"] = {"type": "tool", "name": tool_choice} # Map reasoning_config to Anthropic's thinking parameter. # Claude 4.6 models use adaptive thinking + output_config.effort. # Older models use manual thinking with budget_tokens. # Haiku models do NOT support extended thinking at all — skip entirely. if reasoning_config and isinstance(reasoning_config, dict): if reasoning_config.get("enabled") is not False and "haiku" not in model.lower(): effort = str(reasoning_config.get("effort", "medium")).lower() budget = THINKING_BUDGET.get(effort, 8000) if _supports_adaptive_thinking(model): kwargs["thinking"] = {"type": "adaptive"} kwargs["output_config"] = { "effort": ADAPTIVE_EFFORT_MAP.get(effort, "medium") } else: kwargs["thinking"] = {"type": "enabled", "budget_tokens": budget} # Anthropic requires temperature=1 when thinking is enabled on older models kwargs["temperature"] = 1 kwargs["max_tokens"] = max(effective_max_tokens, budget + 4096) return kwargs def normalize_anthropic_response( response, strip_tool_prefix: bool = False, ) -> Tuple[SimpleNamespace, str]: """Normalize Anthropic response to match the shape expected by AIAgent. Returns (assistant_message, finish_reason) where assistant_message has .content, .tool_calls, and .reasoning attributes. When *strip_tool_prefix* is True, removes the ``mcp_`` prefix that was added to tool names for OAuth Claude Code compatibility. """ text_parts = [] reasoning_parts = [] tool_calls = [] for block in response.content: if block.type == "text": text_parts.append(block.text) elif block.type == "thinking": reasoning_parts.append(block.thinking) elif block.type == "tool_use": name = block.name if strip_tool_prefix and name.startswith(_MCP_TOOL_PREFIX): name = name[len(_MCP_TOOL_PREFIX):] tool_calls.append( SimpleNamespace( id=block.id, type="function", function=SimpleNamespace( name=name, arguments=json.dumps(block.input), ), ) ) # Map Anthropic stop_reason to OpenAI finish_reason stop_reason_map = { "end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop", } finish_reason = stop_reason_map.get(response.stop_reason, "stop") return ( SimpleNamespace( content="\n".join(text_parts) if text_parts else None, tool_calls=tool_calls or None, reasoning="\n\n".join(reasoning_parts) if reasoning_parts else None, reasoning_content=None, reasoning_details=None, ), finish_reason, )