"""Thin OAuth adapter for MCP HTTP servers. Wraps the MCP SDK's built-in ``OAuthClientProvider`` (which implements ``httpx.Auth``) with Hermes-specific token storage and browser-based authorization. The SDK handles all of the heavy lifting: PKCE generation, metadata discovery, dynamic client registration, token exchange, and refresh. Usage in mcp_tool.py:: from tools.mcp_oauth import build_oauth_auth auth = build_oauth_auth(server_name, server_url) # pass ``auth`` as the httpx auth parameter """ from __future__ import annotations import asyncio import json import logging import os import socket import threading import webbrowser from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse logger = logging.getLogger(__name__) _TOKEN_DIR_NAME = "mcp-tokens" # --------------------------------------------------------------------------- # Token storage — persists tokens + client info to ~/.hermes/mcp-tokens/ # --------------------------------------------------------------------------- def _sanitize_server_name(name: str) -> str: """Sanitize server name for safe use as a filename.""" import re clean = re.sub(r"[^\w\-]", "-", name.strip().lower()) clean = re.sub(r"-+", "-", clean).strip("-") return clean[:60] or "unnamed" class HermesTokenStorage: """File-backed token storage implementing the MCP SDK's TokenStorage protocol.""" def __init__(self, server_name: str): self._server_name = _sanitize_server_name(server_name) def _base_dir(self) -> Path: home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) d = home / _TOKEN_DIR_NAME d.mkdir(parents=True, exist_ok=True) return d def _tokens_path(self) -> Path: return self._base_dir() / f"{self._server_name}.json" def _client_path(self) -> Path: return self._base_dir() / f"{self._server_name}.client.json" # -- TokenStorage protocol (async) -- async def get_tokens(self): data = self._read_json(self._tokens_path()) if not data: return None try: from mcp.shared.auth import OAuthToken return OAuthToken(**data) except Exception: return None async def set_tokens(self, tokens) -> None: self._write_json(self._tokens_path(), tokens.model_dump(exclude_none=True)) async def get_client_info(self): data = self._read_json(self._client_path()) if not data: return None try: from mcp.shared.auth import OAuthClientInformationFull return OAuthClientInformationFull(**data) except Exception: return None async def set_client_info(self, client_info) -> None: self._write_json(self._client_path(), client_info.model_dump(exclude_none=True)) # -- helpers -- @staticmethod def _read_json(path: Path) -> dict | None: if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None @staticmethod def _write_json(path: Path, data: dict) -> None: path.write_text(json.dumps(data, indent=2), encoding="utf-8") try: path.chmod(0o600) except OSError: pass def remove(self) -> None: """Delete stored tokens and client info for this server.""" for p in (self._tokens_path(), self._client_path()): try: p.unlink(missing_ok=True) except OSError: pass # --------------------------------------------------------------------------- # Browser-based callback handler # --------------------------------------------------------------------------- def _find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def _make_callback_handler(): """Create a callback handler class with instance-scoped result storage.""" result = {"auth_code": None, "state": None} class Handler(BaseHTTPRequestHandler): def do_GET(self): qs = parse_qs(urlparse(self.path).query) result["auth_code"] = (qs.get("code") or [None])[0] result["state"] = (qs.get("state") or [None])[0] self.send_response(200) self.send_header("Content-Type", "text/html") self.end_headers() self.wfile.write(b"