diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 4fb879414..c2b3bbfa0 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -268,15 +268,11 @@ def _nous_base_url() -> str: def _read_codex_access_token() -> Optional[str]: - """Read a valid Codex OAuth access token from ~/.codex/auth.json.""" + """Read a valid Codex OAuth access token from Hermes auth store (~/.hermes/auth.json).""" try: - codex_auth = Path.home() / ".codex" / "auth.json" - if not codex_auth.is_file(): - return None - data = json.loads(codex_auth.read_text()) - tokens = data.get("tokens") - if not isinstance(tokens, dict): - return None + from hermes_cli.auth import _read_codex_tokens + data = _read_codex_tokens() + tokens = data.get("tokens", {}) access_token = tokens.get("access_token") if isinstance(access_token, str) and access_token.strip(): return access_token.strip() diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index 098b7620c..34b07b71b 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -415,175 +415,88 @@ def _is_remote_session() -> bool: # ============================================================================= -# OpenAI Codex auth file helpers +# OpenAI Codex auth — tokens stored in ~/.hermes/auth.json (not ~/.codex/) +# +# Hermes maintains its own Codex OAuth session separate from the Codex CLI +# and VS Code extension. This prevents refresh token rotation conflicts +# where one app's refresh invalidates the other's session. # ============================================================================= -def resolve_codex_home_path() -> Path: - """Resolve CODEX_HOME, defaulting to ~/.codex.""" - codex_home = os.getenv("CODEX_HOME", "").strip() - if not codex_home: - codex_home = str(Path.home() / ".codex") - return Path(codex_home).expanduser() - - -def _codex_auth_file_path() -> Path: - return resolve_codex_home_path() / "auth.json" - - -def _codex_auth_lock_path(auth_path: Path) -> Path: - return auth_path.with_suffix(auth_path.suffix + ".lock") - - -@contextmanager -def _codex_auth_file_lock( - auth_path: Path, - timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS, -): - lock_path = _codex_auth_lock_path(auth_path) - lock_path.parent.mkdir(parents=True, exist_ok=True) - - with lock_path.open("a+") as lock_file: - if fcntl is None: - yield - return - - deadline = time.time() + max(1.0, timeout_seconds) - while True: - try: - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) - break - except BlockingIOError: - if time.time() >= deadline: - raise TimeoutError(f"Timed out waiting for Codex auth lock: {lock_path}") - time.sleep(0.05) - - try: - yield - finally: - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - - -def read_codex_auth_file() -> Dict[str, Any]: - """Read and validate Codex auth.json shape.""" - codex_home = resolve_codex_home_path() - if not codex_home.exists(): +def _read_codex_tokens(*, _lock: bool = True) -> Dict[str, Any]: + """Read Codex OAuth tokens from Hermes auth store (~/.hermes/auth.json). + + Returns dict with 'tokens' (access_token, refresh_token) and 'last_refresh'. + Raises AuthError if no Codex tokens are stored. + """ + if _lock: + with _auth_store_lock(): + auth_store = _load_auth_store() + else: + auth_store = _load_auth_store() + state = _load_provider_state(auth_store, "openai-codex") + if not state: raise AuthError( - f"Codex home directory not found at {codex_home}.", - provider="openai-codex", - code="codex_home_missing", - relogin_required=True, - ) - - auth_path = codex_home / "auth.json" - if not auth_path.exists(): - raise AuthError( - f"Codex auth file not found at {auth_path}.", + "No Codex credentials stored. Run `hermes login` to authenticate.", provider="openai-codex", code="codex_auth_missing", relogin_required=True, ) - - try: - payload = json.loads(auth_path.read_text()) - except Exception as exc: - raise AuthError( - f"Failed to parse Codex auth file at {auth_path}.", - provider="openai-codex", - code="codex_auth_invalid_json", - relogin_required=True, - ) from exc - - tokens = payload.get("tokens") + tokens = state.get("tokens") if not isinstance(tokens, dict): raise AuthError( - "Codex auth file is missing a valid 'tokens' object.", + "Codex auth state is missing tokens. Run `hermes login` to re-authenticate.", provider="openai-codex", code="codex_auth_invalid_shape", relogin_required=True, ) - access_token = tokens.get("access_token") refresh_token = tokens.get("refresh_token") if not isinstance(access_token, str) or not access_token.strip(): raise AuthError( - "Codex auth file is missing tokens.access_token.", + "Codex auth is missing access_token. Run `hermes login` to re-authenticate.", provider="openai-codex", code="codex_auth_missing_access_token", relogin_required=True, ) if not isinstance(refresh_token, str) or not refresh_token.strip(): raise AuthError( - "Codex auth file is missing tokens.refresh_token.", + "Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.", provider="openai-codex", code="codex_auth_missing_refresh_token", relogin_required=True, ) - return { - "payload": payload, "tokens": tokens, - "auth_path": auth_path, - "codex_home": codex_home, + "last_refresh": state.get("last_refresh"), } -def _persist_codex_auth_payload( - auth_path: Path, - payload: Dict[str, Any], - *, - lock_held: bool = False, -) -> None: - auth_path.parent.mkdir(parents=True, exist_ok=True) - - def _write() -> None: - serialized = json.dumps(payload, indent=2, ensure_ascii=False) + "\n" - tmp_path = auth_path.parent / f".{auth_path.name}.{os.getpid()}.{time.time_ns()}.tmp" - try: - with tmp_path.open("w", encoding="utf-8") as tmp_file: - tmp_file.write(serialized) - tmp_file.flush() - os.fsync(tmp_file.fileno()) - os.replace(tmp_path, auth_path) - finally: - if tmp_path.exists(): - try: - tmp_path.unlink() - except OSError: - pass - - try: - auth_path.chmod(stat.S_IRUSR | stat.S_IWUSR) - except OSError: - pass - - if lock_held: - _write() - return - - with _codex_auth_file_lock(auth_path): - _write() +def _save_codex_tokens(tokens: Dict[str, str], last_refresh: str = None) -> None: + """Save Codex OAuth tokens to Hermes auth store (~/.hermes/auth.json).""" + if last_refresh is None: + last_refresh = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + with _auth_store_lock(): + auth_store = _load_auth_store() + state = _load_provider_state(auth_store, "openai-codex") or {} + state["tokens"] = tokens + state["last_refresh"] = last_refresh + state["auth_mode"] = "chatgpt" + _save_provider_state(auth_store, "openai-codex", state) + _save_auth_store(auth_store) def _refresh_codex_auth_tokens( - *, - payload: Dict[str, Any], - auth_path: Path, + tokens: Dict[str, str], timeout_seconds: float, - lock_held: bool = False, -) -> Dict[str, Any]: - tokens = payload.get("tokens") - if not isinstance(tokens, dict): - raise AuthError( - "Codex auth file is missing a valid 'tokens' object.", - provider="openai-codex", - code="codex_auth_invalid_shape", - relogin_required=True, - ) - +) -> Dict[str, str]: + """Refresh Codex access token using the refresh token. + + Saves the new tokens to Hermes auth store automatically. + """ refresh_token = tokens.get("refresh_token") if not isinstance(refresh_token, str) or not refresh_token.strip(): raise AuthError( - "Codex auth file is missing tokens.refresh_token.", + "Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.", provider="openai-codex", code="codex_auth_missing_refresh_token", relogin_required=True, @@ -649,23 +562,61 @@ def _refresh_codex_auth_tokens( next_refresh = refresh_payload.get("refresh_token") if isinstance(next_refresh, str) and next_refresh.strip(): updated_tokens["refresh_token"] = next_refresh.strip() - payload["tokens"] = updated_tokens - payload["last_refresh"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") - _persist_codex_auth_payload(auth_path, payload, lock_held=lock_held) + + _save_codex_tokens(updated_tokens) return updated_tokens +def _import_codex_cli_tokens() -> Optional[Dict[str, str]]: + """Try to read tokens from ~/.codex/auth.json (Codex CLI shared file). + + Returns tokens dict if valid, None otherwise. Does NOT write to the shared file. + """ + codex_home = os.getenv("CODEX_HOME", "").strip() + if not codex_home: + codex_home = str(Path.home() / ".codex") + auth_path = Path(codex_home).expanduser() / "auth.json" + if not auth_path.is_file(): + return None + try: + payload = json.loads(auth_path.read_text()) + tokens = payload.get("tokens") + if not isinstance(tokens, dict): + return None + if not tokens.get("access_token") or not tokens.get("refresh_token"): + return None + return dict(tokens) + except Exception: + return None + + def resolve_codex_runtime_credentials( *, force_refresh: bool = False, refresh_if_expiring: bool = True, refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS, ) -> Dict[str, Any]: - """Resolve runtime credentials from Codex CLI auth state.""" - data = read_codex_auth_file() - payload = data["payload"] + """Resolve runtime credentials from Hermes's own Codex token store.""" + try: + data = _read_codex_tokens() + except AuthError as orig_err: + # Only attempt migration when there are NO tokens stored at all + # (code == "codex_auth_missing"), not when tokens exist but are invalid. + if orig_err.code != "codex_auth_missing": + raise + + # Migration: user had Codex as active provider with old storage (~/.codex/). + cli_tokens = _import_codex_cli_tokens() + if cli_tokens: + logger.info("Migrating Codex credentials from ~/.codex/ to Hermes auth store") + print("⚠️ Migrating Codex credentials to Hermes's own auth store.") + print(" This avoids conflicts with Codex CLI and VS Code.") + print(" Run `hermes login` to create a fully independent session.\n") + _save_codex_tokens(cli_tokens) + data = _read_codex_tokens() + else: + raise tokens = dict(data["tokens"]) - auth_path = data["auth_path"] access_token = str(tokens.get("access_token", "") or "").strip() refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20")) @@ -673,10 +624,9 @@ def resolve_codex_runtime_credentials( if (not should_refresh) and refresh_if_expiring: should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds) if should_refresh: - lock_timeout = max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0) - with _codex_auth_file_lock(auth_path, timeout_seconds=lock_timeout): - data = read_codex_auth_file() - payload = data["payload"] + # Re-read under lock to avoid racing with other Hermes processes + with _auth_store_lock(timeout_seconds=max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)): + data = _read_codex_tokens(_lock=False) tokens = dict(data["tokens"]) access_token = str(tokens.get("access_token", "") or "").strip() @@ -685,12 +635,7 @@ def resolve_codex_runtime_credentials( should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds) if should_refresh: - tokens = _refresh_codex_auth_tokens( - payload=payload, - auth_path=auth_path, - timeout_seconds=refresh_timeout_seconds, - lock_held=True, - ) + tokens = _refresh_codex_auth_tokens(tokens, refresh_timeout_seconds) access_token = str(tokens.get("access_token", "") or "").strip() base_url = ( @@ -702,11 +647,9 @@ def resolve_codex_runtime_credentials( "provider": "openai-codex", "base_url": base_url, "api_key": access_token, - "source": "codex-auth-json", - "last_refresh": payload.get("last_refresh"), - "auth_mode": payload.get("auth_mode"), - "auth_file": str(auth_path), - "codex_home": str(data["codex_home"]), + "source": "hermes-auth-store", + "last_refresh": data.get("last_refresh"), + "auth_mode": "chatgpt", } @@ -1140,15 +1083,11 @@ def get_nous_auth_status() -> Dict[str, Any]: def get_codex_auth_status() -> Dict[str, Any]: """Status snapshot for Codex auth.""" - state = get_provider_auth_state("openai-codex") or {} - auth_file = state.get("auth_file") or str(_codex_auth_file_path()) - codex_home = state.get("codex_home") or str(resolve_codex_home_path()) try: creds = resolve_codex_runtime_credentials() return { "logged_in": True, - "auth_file": creds.get("auth_file"), - "codex_home": creds.get("codex_home"), + "auth_store": str(_auth_file_path()), "last_refresh": creds.get("last_refresh"), "auth_mode": creds.get("auth_mode"), "source": creds.get("source"), @@ -1156,8 +1095,7 @@ def get_codex_auth_status() -> Dict[str, Any]: except AuthError as exc: return { "logged_in": False, - "auth_file": auth_file, - "codex_home": codex_home, + "auth_store": str(_auth_file_path()), "error": str(exc), } @@ -1186,21 +1124,15 @@ def detect_external_credentials() -> List[Dict[str, Any]]: """ found: List[Dict[str, Any]] = [] - # Codex CLI: ~/.codex/auth.json (or $CODEX_HOME/auth.json) - try: - codex_home = resolve_codex_home_path() - codex_auth = codex_home / "auth.json" - if codex_auth.is_file(): - data = json.loads(codex_auth.read_text()) - tokens = data.get("tokens", {}) - if isinstance(tokens, dict) and tokens.get("access_token"): - found.append({ - "provider": "openai-codex", - "path": str(codex_auth), - "label": f"Codex CLI credentials found ({codex_auth})", - }) - except Exception: - pass + # Codex CLI: ~/.codex/auth.json (importable, not shared) + cli_tokens = _import_codex_cli_tokens() + if cli_tokens: + codex_path = Path.home() / ".codex" / "auth.json" + found.append({ + "provider": "openai-codex", + "path": str(codex_path), + "label": f"Codex CLI credentials found ({codex_path}) — run `hermes login` to create a separate session", + }) return found @@ -1369,52 +1301,58 @@ def login_command(args) -> None: def _login_openai_codex(args, pconfig: ProviderConfig) -> None: - """OpenAI Codex login via device code flow (no Codex CLI required).""" - codex_home = resolve_codex_home_path() + """OpenAI Codex login via device code flow. Tokens stored in ~/.hermes/auth.json.""" - # Check for existing valid credentials first + # Check for existing Hermes-owned credentials try: existing = resolve_codex_runtime_credentials() - print(f"Existing Codex credentials found at {codex_home / 'auth.json'}") + print("Existing Codex credentials found in Hermes auth store.") try: reuse = input("Use existing credentials? [Y/n]: ").strip().lower() except (EOFError, KeyboardInterrupt): reuse = "y" if reuse in ("", "y", "yes"): - creds = existing - _save_codex_provider_state(creds) + config_path = _update_config_for_provider("openai-codex", existing.get("base_url", DEFAULT_CODEX_BASE_URL)) + print() + print("Login successful!") + print(f" Config updated: {config_path} (model.provider=openai-codex)") return except AuthError: pass - # No existing creds (or user declined) -- run device code flow + # Check for existing Codex CLI tokens we can import + cli_tokens = _import_codex_cli_tokens() + if cli_tokens: + print("Found existing Codex CLI credentials at ~/.codex/auth.json") + print("Hermes will create its own session to avoid conflicts with Codex CLI / VS Code.") + try: + do_import = input("Import these credentials? (a separate login is recommended) [y/N]: ").strip().lower() + except (EOFError, KeyboardInterrupt): + do_import = "n" + if do_import in ("y", "yes"): + _save_codex_tokens(cli_tokens) + base_url = os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") or DEFAULT_CODEX_BASE_URL + config_path = _update_config_for_provider("openai-codex", base_url) + print() + print("Credentials imported. Note: if Codex CLI refreshes its token,") + print("Hermes will keep working independently with its own session.") + print(f" Config updated: {config_path} (model.provider=openai-codex)") + return + + # Run a fresh device code flow — Hermes gets its own OAuth session print() print("Signing in to OpenAI Codex...") + print("(Hermes creates its own session — won't affect Codex CLI or VS Code)") print() creds = _codex_device_code_login() - _save_codex_provider_state(creds) - - -def _save_codex_provider_state(creds: Dict[str, Any]) -> None: - """Persist Codex provider state to auth store and config.""" - auth_state = { - "auth_file": creds.get("auth_file"), - "codex_home": creds.get("codex_home"), - "last_refresh": creds.get("last_refresh"), - "auth_mode": creds.get("auth_mode"), - "source": creds.get("source"), - } - - with _auth_store_lock(): - auth_store = _load_auth_store() - _save_provider_state(auth_store, "openai-codex", auth_state) - saved_to = _save_auth_store(auth_store) + # Save tokens to Hermes auth store + _save_codex_tokens(creds["tokens"], creds.get("last_refresh")) config_path = _update_config_for_provider("openai-codex", creds.get("base_url", DEFAULT_CODEX_BASE_URL)) print() print("Login successful!") - print(f" Auth state: {saved_to}") + print(f" Auth state: ~/.hermes/auth.json") print(f" Config updated: {config_path} (model.provider=openai-codex)") @@ -1545,31 +1483,19 @@ def _codex_device_code_login() -> Dict[str, Any]: provider="openai-codex", code="token_exchange_no_access_token", ) - # Step 5: Persist tokens to ~/.codex/auth.json - codex_home = resolve_codex_home_path() - codex_home.mkdir(parents=True, exist_ok=True) - auth_path = codex_home / "auth.json" - - payload = { - "tokens": { - "access_token": access_token, - "refresh_token": refresh_token, - }, - "last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), - } - _persist_codex_auth_payload(auth_path, payload, lock_held=False) - + # Return tokens for the caller to persist (no longer writes to ~/.codex/) base_url = ( os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") or DEFAULT_CODEX_BASE_URL ) return { - "api_key": access_token, + "tokens": { + "access_token": access_token, + "refresh_token": refresh_token, + }, "base_url": base_url, - "auth_file": str(auth_path), - "codex_home": str(codex_home), - "last_refresh": payload["last_refresh"], + "last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), "auth_mode": "chatgpt", "source": "device-code", } diff --git a/hermes_cli/codex_models.py b/hermes_cli/codex_models.py index 75559396f..416c76add 100644 --- a/hermes_cli/codex_models.py +++ b/hermes_cli/codex_models.py @@ -7,7 +7,7 @@ import logging from pathlib import Path from typing import List, Optional -from hermes_cli.auth import resolve_codex_home_path +import os logger = logging.getLogger(__name__) @@ -119,7 +119,8 @@ def get_codex_model_ids(access_token: Optional[str] = None) -> List[str]: Resolution order: API (live, if token provided) > config.toml default > local cache > hardcoded defaults. """ - codex_home = resolve_codex_home_path() + codex_home_str = os.getenv("CODEX_HOME", "").strip() or str(Path.home() / ".codex") + codex_home = Path(codex_home_str).expanduser() ordered: List[str] = [] # Try live API if we have a token diff --git a/hermes_cli/runtime_provider.py b/hermes_cli/runtime_provider.py index 1f070ac22..51de8d366 100644 --- a/hermes_cli/runtime_provider.py +++ b/hermes_cli/runtime_provider.py @@ -127,9 +127,7 @@ def resolve_runtime_provider( "api_mode": "codex_responses", "base_url": creds.get("base_url", "").rstrip("/"), "api_key": creds.get("api_key", ""), - "source": creds.get("source", "codex-auth-json"), - "auth_file": creds.get("auth_file"), - "codex_home": creds.get("codex_home"), + "source": creds.get("source", "hermes-auth-store"), "last_refresh": creds.get("last_refresh"), "requested_provider": requested_provider, } diff --git a/tests/agent/test_auxiliary_client.py b/tests/agent/test_auxiliary_client.py index efcbce29f..a8f797fe2 100644 --- a/tests/agent/test_auxiliary_client.py +++ b/tests/agent/test_auxiliary_client.py @@ -45,29 +45,42 @@ def codex_auth_dir(tmp_path, monkeypatch): class TestReadCodexAccessToken: - def test_valid_auth_file(self, tmp_path): - codex_dir = tmp_path / ".codex" - codex_dir.mkdir() - auth = codex_dir / "auth.json" - auth.write_text(json.dumps({ - "tokens": {"access_token": "tok-123", "refresh_token": "r-456"} + def test_valid_auth_store(self, tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + (hermes_home / "auth.json").write_text(json.dumps({ + "version": 1, + "providers": { + "openai-codex": { + "tokens": {"access_token": "tok-123", "refresh_token": "r-456"}, + }, + }, })) - with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): - result = _read_codex_access_token() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + result = _read_codex_access_token() assert result == "tok-123" - def test_missing_file_returns_none(self, tmp_path): - with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): - result = _read_codex_access_token() + def test_missing_returns_none(self, tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + (hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}})) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + result = _read_codex_access_token() assert result is None - def test_empty_token_returns_none(self, tmp_path): - codex_dir = tmp_path / ".codex" - codex_dir.mkdir() - auth = codex_dir / "auth.json" - auth.write_text(json.dumps({"tokens": {"access_token": " "}})) - with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): - result = _read_codex_access_token() + def test_empty_token_returns_none(self, tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + (hermes_home / "auth.json").write_text(json.dumps({ + "version": 1, + "providers": { + "openai-codex": { + "tokens": {"access_token": " ", "refresh_token": "r"}, + }, + }, + })) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + result = _read_codex_access_token() assert result is None def test_malformed_json_returns_none(self, tmp_path): diff --git a/tests/test_auth_codex_provider.py b/tests/test_auth_codex_provider.py index 7d3076807..4119126e6 100644 --- a/tests/test_auth_codex_provider.py +++ b/tests/test_auth_codex_provider.py @@ -1,9 +1,9 @@ +"""Tests for Codex auth — tokens stored in Hermes auth store (~/.hermes/auth.json).""" + import json import time import base64 -from contextlib import contextmanager from pathlib import Path -from types import SimpleNamespace import pytest import yaml @@ -12,32 +12,35 @@ from hermes_cli.auth import ( AuthError, DEFAULT_CODEX_BASE_URL, PROVIDER_REGISTRY, - _persist_codex_auth_payload, - _login_openai_codex, - login_command, + _read_codex_tokens, + _save_codex_tokens, + _import_codex_cli_tokens, get_codex_auth_status, get_provider_auth_state, - read_codex_auth_file, resolve_codex_runtime_credentials, resolve_provider, ) -def _write_codex_auth(codex_home: Path, *, access_token: str = "access", refresh_token: str = "refresh") -> Path: - codex_home.mkdir(parents=True, exist_ok=True) - auth_file = codex_home / "auth.json" - auth_file.write_text( - json.dumps( - { - "auth_mode": "oauth", - "last_refresh": "2026-02-26T00:00:00Z", +def _setup_hermes_auth(hermes_home: Path, *, access_token: str = "access", refresh_token: str = "refresh"): + """Write Codex tokens into the Hermes auth store.""" + hermes_home.mkdir(parents=True, exist_ok=True) + auth_store = { + "version": 1, + "active_provider": "openai-codex", + "providers": { + "openai-codex": { "tokens": { "access_token": access_token, "refresh_token": refresh_token, }, - } - ) - ) + "last_refresh": "2026-02-26T00:00:00Z", + "auth_mode": "chatgpt", + }, + }, + } + auth_file = hermes_home / "auth.json" + auth_file.write_text(json.dumps(auth_store, indent=2)) return auth_file @@ -47,42 +50,49 @@ def _jwt_with_exp(exp_epoch: int) -> str: return f"h.{encoded}.s" -def test_read_codex_auth_file_success(tmp_path, monkeypatch): - codex_home = tmp_path / "codex-home" - auth_file = _write_codex_auth(codex_home) - monkeypatch.setenv("CODEX_HOME", str(codex_home)) +def test_read_codex_tokens_success(tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + _setup_hermes_auth(hermes_home) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) - payload = read_codex_auth_file() + data = _read_codex_tokens() + assert data["tokens"]["access_token"] == "access" + assert data["tokens"]["refresh_token"] == "refresh" - assert payload["auth_path"] == auth_file - assert payload["tokens"]["access_token"] == "access" - assert payload["tokens"]["refresh_token"] == "refresh" + +def test_read_codex_tokens_missing(tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + # Empty auth store + (hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}})) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + with pytest.raises(AuthError) as exc: + _read_codex_tokens() + assert exc.value.code == "codex_auth_missing" def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch): - codex_home = tmp_path / "codex-home" - _write_codex_auth(codex_home, access_token="") - monkeypatch.setenv("CODEX_HOME", str(codex_home)) + hermes_home = tmp_path / "hermes" + _setup_hermes_auth(hermes_home, access_token="") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) with pytest.raises(AuthError) as exc: resolve_codex_runtime_credentials() - assert exc.value.code == "codex_auth_missing_access_token" assert exc.value.relogin_required is True def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch): - codex_home = tmp_path / "codex-home" + hermes_home = tmp_path / "hermes" expiring_token = _jwt_with_exp(int(time.time()) - 10) - _write_codex_auth(codex_home, access_token=expiring_token, refresh_token="refresh-old") - monkeypatch.setenv("CODEX_HOME", str(codex_home)) + _setup_hermes_auth(hermes_home, access_token=expiring_token, refresh_token="refresh-old") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) called = {"count": 0} - def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False): + def _fake_refresh(tokens, timeout_seconds): called["count"] += 1 - assert auth_path == codex_home / "auth.json" - assert lock_held is True return {"access_token": "access-new", "refresh_token": "refresh-new"} monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh) @@ -94,15 +104,14 @@ def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, mo def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch): - codex_home = tmp_path / "codex-home" - _write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old") - monkeypatch.setenv("CODEX_HOME", str(codex_home)) + hermes_home = tmp_path / "hermes" + _setup_hermes_auth(hermes_home, access_token="access-current", refresh_token="refresh-old") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) called = {"count": 0} - def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False): + def _fake_refresh(tokens, timeout_seconds): called["count"] += 1 - assert lock_held is True return {"access_token": "access-forced", "refresh_token": "refresh-new"} monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh) @@ -113,98 +122,71 @@ def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch): assert resolved["api_key"] == "access-forced" -def test_resolve_codex_runtime_credentials_uses_file_lock_on_refresh(tmp_path, monkeypatch): - codex_home = tmp_path / "codex-home" - _write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old") - monkeypatch.setenv("CODEX_HOME", str(codex_home)) - - lock_calls = {"enter": 0, "exit": 0} - - @contextmanager - def _fake_lock(auth_path, timeout_seconds=15.0): - assert auth_path == codex_home / "auth.json" - lock_calls["enter"] += 1 - try: - yield - finally: - lock_calls["exit"] += 1 - - refresh_calls = {"count": 0} - - def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False): - refresh_calls["count"] += 1 - assert lock_held is True - return {"access_token": "access-updated", "refresh_token": "refresh-updated"} - - monkeypatch.setattr("hermes_cli.auth._codex_auth_file_lock", _fake_lock) - monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh) - - resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False) - - assert refresh_calls["count"] == 1 - assert lock_calls["enter"] == 1 - assert lock_calls["exit"] == 1 - assert resolved["api_key"] == "access-updated" - - def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch): monkeypatch.delenv("OPENAI_API_KEY", raising=False) monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) assert resolve_provider("openai-codex") == "openai-codex" -def test_persist_codex_auth_payload_writes_atomically(tmp_path): - auth_path = tmp_path / "auth.json" - auth_path.write_text('{"stale":true}\n') - payload = { - "auth_mode": "oauth", - "tokens": { - "access_token": "next-access", - "refresh_token": "next-refresh", - }, - "last_refresh": "2026-02-26T00:00:00Z", - } +def test_save_codex_tokens_roundtrip(tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + hermes_home.mkdir(parents=True, exist_ok=True) + (hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}})) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) - _persist_codex_auth_payload(auth_path, payload) + _save_codex_tokens({"access_token": "at123", "refresh_token": "rt456"}) + data = _read_codex_tokens() - stored = json.loads(auth_path.read_text()) - assert stored == payload - assert list(tmp_path.glob(".auth.json.*.tmp")) == [] + assert data["tokens"]["access_token"] == "at123" + assert data["tokens"]["refresh_token"] == "rt456" -def test_get_codex_auth_status_not_logged_in(tmp_path, monkeypatch): - monkeypatch.setenv("CODEX_HOME", str(tmp_path / "missing-codex-home")) - status = get_codex_auth_status() - assert status["logged_in"] is False - assert "error" in status +def test_import_codex_cli_tokens(tmp_path, monkeypatch): + codex_home = tmp_path / "codex-cli" + codex_home.mkdir(parents=True, exist_ok=True) + (codex_home / "auth.json").write_text(json.dumps({ + "tokens": {"access_token": "cli-at", "refresh_token": "cli-rt"}, + })) + monkeypatch.setenv("CODEX_HOME", str(codex_home)) + + tokens = _import_codex_cli_tokens() + assert tokens is not None + assert tokens["access_token"] == "cli-at" + assert tokens["refresh_token"] == "cli-rt" -def test_login_openai_codex_persists_provider_state(tmp_path, monkeypatch): - hermes_home = tmp_path / "hermes-home" - codex_home = tmp_path / "codex-home" - _write_codex_auth(codex_home) +def test_import_codex_cli_tokens_missing(tmp_path, monkeypatch): + monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent")) + assert _import_codex_cli_tokens() is None + + +def test_codex_tokens_not_written_to_shared_file(tmp_path, monkeypatch): + """Verify Hermes never writes to ~/.codex/auth.json.""" + hermes_home = tmp_path / "hermes" + codex_home = tmp_path / "codex-cli" + hermes_home.mkdir(parents=True, exist_ok=True) + codex_home.mkdir(parents=True, exist_ok=True) + + (hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}})) monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("CODEX_HOME", str(codex_home)) - # Mock input() to accept existing credentials - monkeypatch.setattr("builtins.input", lambda _: "y") - _login_openai_codex(SimpleNamespace(), PROVIDER_REGISTRY["openai-codex"]) + _save_codex_tokens({"access_token": "hermes-at", "refresh_token": "hermes-rt"}) - state = get_provider_auth_state("openai-codex") - assert state is not None - assert state["source"] == "codex-auth-json" - assert state["auth_file"].endswith("auth.json") + # ~/.codex/auth.json should NOT exist + assert not (codex_home / "auth.json").exists() - config_path = hermes_home / "config.yaml" - config = yaml.safe_load(config_path.read_text()) - assert config["model"]["provider"] == "openai-codex" - assert config["model"]["base_url"] == DEFAULT_CODEX_BASE_URL + # Hermes auth store should have the tokens + data = _read_codex_tokens() + assert data["tokens"]["access_token"] == "hermes-at" -def test_login_command_shows_deprecation(monkeypatch, capsys): - """login_command is deprecated and directs users to hermes model.""" - with pytest.raises(SystemExit) as exc_info: - login_command(SimpleNamespace()) - assert exc_info.value.code == 0 - captured = capsys.readouterr() - assert "hermes model" in captured.out +def test_resolve_returns_hermes_auth_store_source(tmp_path, monkeypatch): + hermes_home = tmp_path / "hermes" + _setup_hermes_auth(hermes_home) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + creds = resolve_codex_runtime_credentials() + assert creds["source"] == "hermes-auth-store" + assert creds["provider"] == "openai-codex" + assert creds["base_url"] == DEFAULT_CODEX_BASE_URL diff --git a/tests/test_external_credential_detection.py b/tests/test_external_credential_detection.py index a1fe2a2f9..4028a0de5 100644 --- a/tests/test_external_credential_detection.py +++ b/tests/test_external_credential_detection.py @@ -10,42 +10,41 @@ from hermes_cli.auth import detect_external_credentials class TestDetectCodexCLI: - def test_detects_valid_codex_auth(self, tmp_path): + def test_detects_valid_codex_auth(self, tmp_path, monkeypatch): codex_dir = tmp_path / ".codex" codex_dir.mkdir() auth = codex_dir / "auth.json" auth.write_text(json.dumps({ "tokens": {"access_token": "tok-123", "refresh_token": "ref-456"} })) - with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): - result = detect_external_credentials() + monkeypatch.setenv("CODEX_HOME", str(codex_dir)) + result = detect_external_credentials() codex_hits = [c for c in result if c["provider"] == "openai-codex"] assert len(codex_hits) == 1 assert "Codex CLI" in codex_hits[0]["label"] - assert str(auth) == codex_hits[0]["path"] - def test_skips_codex_without_access_token(self, tmp_path): + def test_skips_codex_without_access_token(self, tmp_path, monkeypatch): codex_dir = tmp_path / ".codex" codex_dir.mkdir() (codex_dir / "auth.json").write_text(json.dumps({"tokens": {}})) - with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): - result = detect_external_credentials() + monkeypatch.setenv("CODEX_HOME", str(codex_dir)) + result = detect_external_credentials() assert not any(c["provider"] == "openai-codex" for c in result) - def test_skips_missing_codex_dir(self, tmp_path): - with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / "nonexistent"): - result = detect_external_credentials() + def test_skips_missing_codex_dir(self, tmp_path, monkeypatch): + monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent")) + result = detect_external_credentials() assert not any(c["provider"] == "openai-codex" for c in result) - def test_skips_malformed_codex_auth(self, tmp_path): + def test_skips_malformed_codex_auth(self, tmp_path, monkeypatch): codex_dir = tmp_path / ".codex" codex_dir.mkdir() (codex_dir / "auth.json").write_text("{bad json") - with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): - result = detect_external_credentials() + monkeypatch.setenv("CODEX_HOME", str(codex_dir)) + result = detect_external_credentials() assert not any(c["provider"] == "openai-codex" for c in result) - def test_returns_empty_when_nothing_found(self, tmp_path): - with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / ".codex"): - result = detect_external_credentials() + def test_returns_empty_when_nothing_found(self, tmp_path, monkeypatch): + monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent")) + result = detect_external_credentials() assert result == []