diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index a59252399..5d2620247 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -76,32 +76,47 @@ def build_anthropic_client(api_key: str, base_url: str = None): def read_claude_code_credentials() -> Optional[Dict[str, Any]]: - """Read credentials from Claude Code's credential file. + """Read credentials from Claude Code's config files. - Returns dict with {accessToken, refreshToken, expiresAt} or None. + Checks two locations (in order): + 1. ~/.claude.json — top-level primaryApiKey (native binary, v2.x) + 2. ~/.claude/.credentials.json — claudeAiOauth block (npm/legacy installs) + + Returns dict with {accessToken, refreshToken?, expiresAt?} or None. """ + # 1. Native binary (v2.x): ~/.claude.json with top-level primaryApiKey + claude_json = Path.home() / ".claude.json" + if claude_json.exists(): + try: + data = json.loads(claude_json.read_text(encoding="utf-8")) + primary_key = data.get("primaryApiKey", "") + if primary_key: + return { + "accessToken": primary_key, + "refreshToken": "", + "expiresAt": 0, # Managed keys don't have a user-visible expiry + } + except (json.JSONDecodeError, OSError, IOError) as e: + logger.debug("Failed to read ~/.claude.json: %s", e) + + # 2. Legacy/npm installs: ~/.claude/.credentials.json cred_path = Path.home() / ".claude" / ".credentials.json" - if not cred_path.exists(): - return None + if cred_path.exists(): + try: + data = json.loads(cred_path.read_text(encoding="utf-8")) + oauth_data = data.get("claudeAiOauth") + if oauth_data and isinstance(oauth_data, dict): + access_token = oauth_data.get("accessToken", "") + if access_token: + return { + "accessToken": access_token, + "refreshToken": oauth_data.get("refreshToken", ""), + "expiresAt": oauth_data.get("expiresAt", 0), + } + except (json.JSONDecodeError, OSError, IOError) as e: + logger.debug("Failed to read ~/.claude/.credentials.json: %s", e) - try: - data = json.loads(cred_path.read_text(encoding="utf-8")) - oauth_data = data.get("claudeAiOauth") - if not oauth_data or not isinstance(oauth_data, dict): - return None - - access_token = oauth_data.get("accessToken", "") - if not access_token: - return None - - return { - "accessToken": access_token, - "refreshToken": oauth_data.get("refreshToken", ""), - "expiresAt": oauth_data.get("expiresAt", 0), - } - except (json.JSONDecodeError, OSError, IOError) as e: - logger.debug("Failed to read Claude Code credentials: %s", e) - return None + return None def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool: @@ -110,6 +125,7 @@ def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool: expires_at = creds.get("expiresAt", 0) if not expires_at: + # No expiry set (managed keys) — valid if token is present return bool(creds.get("accessToken")) # expiresAt is in milliseconds since epoch