diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index b600a390..7d43a24d 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -343,12 +343,24 @@ def resolve_anthropic_token() -> Optional[str]: return preferred return cc_token - # 3. Claude Code credential file + # 3. Hermes-managed OAuth credentials (~/.hermes/.anthropic_oauth.json) + hermes_creds = read_hermes_oauth_credentials() + if hermes_creds: + if is_claude_code_token_valid(hermes_creds): + logger.debug("Using Hermes-managed OAuth credentials") + return hermes_creds["accessToken"] + # Expired — try refresh + logger.debug("Hermes OAuth token expired — attempting refresh") + refreshed = refresh_hermes_oauth_token() + if refreshed: + return refreshed + + # 4. Claude Code credential file resolved_claude_token = _resolve_claude_code_token_from_credentials(creds) if resolved_claude_token: return resolved_claude_token - # 4. Regular API key, or a legacy OAuth token saved in ANTHROPIC_API_KEY. + # 5. Regular API key, or a legacy OAuth token saved in ANTHROPIC_API_KEY. # This remains as a compatibility fallback for pre-migration Hermes configs. api_key = os.getenv("ANTHROPIC_API_KEY", "").strip() if api_key: @@ -397,6 +409,199 @@ def run_oauth_setup_token() -> Optional[str]: return None +# ── Hermes-native PKCE OAuth flow ──────────────────────────────────────── +# Mirrors the flow used by Claude Code, pi-ai, and OpenCode. +# Stores credentials in ~/.hermes/.anthropic_oauth.json (our own file). + +_OAUTH_CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" +_OAUTH_TOKEN_URL = "https://console.anthropic.com/v1/oauth/token" +_OAUTH_REDIRECT_URI = "https://console.anthropic.com/oauth/code/callback" +_OAUTH_SCOPES = "org:create_api_key user:profile user:inference" +_HERMES_OAUTH_FILE = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))) / ".anthropic_oauth.json" + + +def _generate_pkce() -> tuple: + """Generate PKCE code_verifier and code_challenge (S256).""" + import base64 + import hashlib + import secrets + + verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode() + challenge = base64.urlsafe_b64encode( + hashlib.sha256(verifier.encode()).digest() + ).rstrip(b"=").decode() + return verifier, challenge + + +def run_hermes_oauth_login() -> Optional[str]: + """Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription. + + Opens a browser to claude.ai for authorization, prompts for the code, + exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json. + + Returns the access token on success, None on failure. + """ + import time + import webbrowser + + verifier, challenge = _generate_pkce() + + # Build authorization URL + params = { + "code": "true", + "client_id": _OAUTH_CLIENT_ID, + "response_type": "code", + "redirect_uri": _OAUTH_REDIRECT_URI, + "scope": _OAUTH_SCOPES, + "code_challenge": challenge, + "code_challenge_method": "S256", + "state": verifier, + } + from urllib.parse import urlencode + auth_url = f"https://claude.ai/oauth/authorize?{urlencode(params)}" + + print() + print("Opening browser for Claude Pro/Max authorization...") + print(f"If the browser doesn't open, visit this URL:") + print(f" {auth_url}") + print() + + try: + webbrowser.open(auth_url) + except Exception: + pass # URL printed above as fallback + + try: + auth_code = input("Paste the authorization code here: ").strip() + except (KeyboardInterrupt, EOFError): + return None + + if not auth_code: + print("No code entered.") + return None + + # Split code#state format + splits = auth_code.split("#") + code = splits[0] + state = splits[1] if len(splits) > 1 else "" + + # Exchange code for tokens + try: + import urllib.request + exchange_data = json.dumps({ + "grant_type": "authorization_code", + "client_id": _OAUTH_CLIENT_ID, + "code": code, + "state": state, + "redirect_uri": _OAUTH_REDIRECT_URI, + "code_verifier": verifier, + }).encode() + + req = urllib.request.Request( + _OAUTH_TOKEN_URL, + data=exchange_data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + with urllib.request.urlopen(req, timeout=15) as resp: + result = json.loads(resp.read().decode()) + except Exception as e: + print(f"Token exchange failed: {e}") + return None + + access_token = result.get("access_token", "") + refresh_token = result.get("refresh_token", "") + expires_in = result.get("expires_in", 3600) + + if not access_token: + print("No access token in response.") + return None + + # Store credentials + expires_at_ms = int(time.time() * 1000) + (expires_in * 1000) + _save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms) + + # Also write to Claude Code's credential file for backward compat + _write_claude_code_credentials(access_token, refresh_token, expires_at_ms) + + print("Authentication successful!") + return access_token + + +def _save_hermes_oauth_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None: + """Save OAuth credentials to ~/.hermes/.anthropic_oauth.json.""" + data = { + "accessToken": access_token, + "refreshToken": refresh_token, + "expiresAt": expires_at_ms, + } + try: + _HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True) + _HERMES_OAUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") + _HERMES_OAUTH_FILE.chmod(0o600) + except (OSError, IOError) as e: + logger.debug("Failed to save Hermes OAuth credentials: %s", e) + + +def read_hermes_oauth_credentials() -> Optional[Dict[str, Any]]: + """Read Hermes-managed OAuth credentials from ~/.hermes/.anthropic_oauth.json.""" + if _HERMES_OAUTH_FILE.exists(): + try: + data = json.loads(_HERMES_OAUTH_FILE.read_text(encoding="utf-8")) + if data.get("accessToken"): + return data + except (json.JSONDecodeError, OSError, IOError) as e: + logger.debug("Failed to read Hermes OAuth credentials: %s", e) + return None + + +def refresh_hermes_oauth_token() -> Optional[str]: + """Refresh the Hermes-managed OAuth token using the stored refresh token. + + Returns the new access token, or None if refresh fails. + """ + import time + import urllib.request + + creds = read_hermes_oauth_credentials() + if not creds or not creds.get("refreshToken"): + return None + + try: + data = json.dumps({ + "grant_type": "refresh_token", + "refresh_token": creds["refreshToken"], + "client_id": _OAUTH_CLIENT_ID, + }).encode() + + req = urllib.request.Request( + _OAUTH_TOKEN_URL, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + with urllib.request.urlopen(req, timeout=10) as resp: + result = json.loads(resp.read().decode()) + + new_access = result.get("access_token", "") + new_refresh = result.get("refresh_token", creds["refreshToken"]) + expires_in = result.get("expires_in", 3600) + + if new_access: + new_expires_ms = int(time.time() * 1000) + (expires_in * 1000) + _save_hermes_oauth_credentials(new_access, new_refresh, new_expires_ms) + # Also update Claude Code's credential file + _write_claude_code_credentials(new_access, new_refresh, new_expires_ms) + logger.debug("Successfully refreshed Hermes OAuth token") + return new_access + except Exception as e: + logger.debug("Failed to refresh Hermes OAuth token: %s", e) + + return None + + # --------------------------------------------------------------------------- # Message / tool / response format conversion # ---------------------------------------------------------------------------