refactor(auth): transition Codex OAuth tokens to Hermes auth store

Updated the authentication mechanism to store Codex OAuth tokens in the Hermes auth store located at ~/.hermes/auth.json instead of the previous ~/.codex/auth.json. This change includes refactoring related functions for reading and saving tokens, ensuring better management of authentication states and preventing conflicts between different applications. Adjusted tests to reflect the new storage structure and improved error handling for missing or malformed tokens.
This commit is contained in:
teknium1
2026-03-01 19:59:24 -08:00
parent 8bc2de4ab6
commit 5e598a588f
7 changed files with 295 additions and 380 deletions

View File

@@ -268,15 +268,11 @@ def _nous_base_url() -> str:
def _read_codex_access_token() -> Optional[str]: def _read_codex_access_token() -> Optional[str]:
"""Read a valid Codex OAuth access token from ~/.codex/auth.json.""" """Read a valid Codex OAuth access token from Hermes auth store (~/.hermes/auth.json)."""
try: try:
codex_auth = Path.home() / ".codex" / "auth.json" from hermes_cli.auth import _read_codex_tokens
if not codex_auth.is_file(): data = _read_codex_tokens()
return None tokens = data.get("tokens", {})
data = json.loads(codex_auth.read_text())
tokens = data.get("tokens")
if not isinstance(tokens, dict):
return None
access_token = tokens.get("access_token") access_token = tokens.get("access_token")
if isinstance(access_token, str) and access_token.strip(): if isinstance(access_token, str) and access_token.strip():
return access_token.strip() return access_token.strip()

View File

@@ -415,175 +415,88 @@ def _is_remote_session() -> bool:
# ============================================================================= # =============================================================================
# OpenAI Codex auth file helpers # OpenAI Codex auth — tokens stored in ~/.hermes/auth.json (not ~/.codex/)
#
# Hermes maintains its own Codex OAuth session separate from the Codex CLI
# and VS Code extension. This prevents refresh token rotation conflicts
# where one app's refresh invalidates the other's session.
# ============================================================================= # =============================================================================
def resolve_codex_home_path() -> Path: def _read_codex_tokens(*, _lock: bool = True) -> Dict[str, Any]:
"""Resolve CODEX_HOME, defaulting to ~/.codex.""" """Read Codex OAuth tokens from Hermes auth store (~/.hermes/auth.json).
codex_home = os.getenv("CODEX_HOME", "").strip()
if not codex_home: Returns dict with 'tokens' (access_token, refresh_token) and 'last_refresh'.
codex_home = str(Path.home() / ".codex") Raises AuthError if no Codex tokens are stored.
return Path(codex_home).expanduser() """
if _lock:
with _auth_store_lock():
def _codex_auth_file_path() -> Path: auth_store = _load_auth_store()
return resolve_codex_home_path() / "auth.json" else:
auth_store = _load_auth_store()
state = _load_provider_state(auth_store, "openai-codex")
def _codex_auth_lock_path(auth_path: Path) -> Path: if not state:
return auth_path.with_suffix(auth_path.suffix + ".lock")
@contextmanager
def _codex_auth_file_lock(
auth_path: Path,
timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS,
):
lock_path = _codex_auth_lock_path(auth_path)
lock_path.parent.mkdir(parents=True, exist_ok=True)
with lock_path.open("a+") as lock_file:
if fcntl is None:
yield
return
deadline = time.time() + max(1.0, timeout_seconds)
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() >= deadline:
raise TimeoutError(f"Timed out waiting for Codex auth lock: {lock_path}")
time.sleep(0.05)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def read_codex_auth_file() -> Dict[str, Any]:
"""Read and validate Codex auth.json shape."""
codex_home = resolve_codex_home_path()
if not codex_home.exists():
raise AuthError( raise AuthError(
f"Codex home directory not found at {codex_home}.", "No Codex credentials stored. Run `hermes login` to authenticate.",
provider="openai-codex",
code="codex_home_missing",
relogin_required=True,
)
auth_path = codex_home / "auth.json"
if not auth_path.exists():
raise AuthError(
f"Codex auth file not found at {auth_path}.",
provider="openai-codex", provider="openai-codex",
code="codex_auth_missing", code="codex_auth_missing",
relogin_required=True, relogin_required=True,
) )
tokens = state.get("tokens")
try:
payload = json.loads(auth_path.read_text())
except Exception as exc:
raise AuthError(
f"Failed to parse Codex auth file at {auth_path}.",
provider="openai-codex",
code="codex_auth_invalid_json",
relogin_required=True,
) from exc
tokens = payload.get("tokens")
if not isinstance(tokens, dict): if not isinstance(tokens, dict):
raise AuthError( raise AuthError(
"Codex auth file is missing a valid 'tokens' object.", "Codex auth state is missing tokens. Run `hermes login` to re-authenticate.",
provider="openai-codex", provider="openai-codex",
code="codex_auth_invalid_shape", code="codex_auth_invalid_shape",
relogin_required=True, relogin_required=True,
) )
access_token = tokens.get("access_token") access_token = tokens.get("access_token")
refresh_token = tokens.get("refresh_token") refresh_token = tokens.get("refresh_token")
if not isinstance(access_token, str) or not access_token.strip(): if not isinstance(access_token, str) or not access_token.strip():
raise AuthError( raise AuthError(
"Codex auth file is missing tokens.access_token.", "Codex auth is missing access_token. Run `hermes login` to re-authenticate.",
provider="openai-codex", provider="openai-codex",
code="codex_auth_missing_access_token", code="codex_auth_missing_access_token",
relogin_required=True, relogin_required=True,
) )
if not isinstance(refresh_token, str) or not refresh_token.strip(): if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError( raise AuthError(
"Codex auth file is missing tokens.refresh_token.", "Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.",
provider="openai-codex", provider="openai-codex",
code="codex_auth_missing_refresh_token", code="codex_auth_missing_refresh_token",
relogin_required=True, relogin_required=True,
) )
return { return {
"payload": payload,
"tokens": tokens, "tokens": tokens,
"auth_path": auth_path, "last_refresh": state.get("last_refresh"),
"codex_home": codex_home,
} }
def _persist_codex_auth_payload( def _save_codex_tokens(tokens: Dict[str, str], last_refresh: str = None) -> None:
auth_path: Path, """Save Codex OAuth tokens to Hermes auth store (~/.hermes/auth.json)."""
payload: Dict[str, Any], if last_refresh is None:
*, last_refresh = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
lock_held: bool = False, with _auth_store_lock():
) -> None: auth_store = _load_auth_store()
auth_path.parent.mkdir(parents=True, exist_ok=True) state = _load_provider_state(auth_store, "openai-codex") or {}
state["tokens"] = tokens
def _write() -> None: state["last_refresh"] = last_refresh
serialized = json.dumps(payload, indent=2, ensure_ascii=False) + "\n" state["auth_mode"] = "chatgpt"
tmp_path = auth_path.parent / f".{auth_path.name}.{os.getpid()}.{time.time_ns()}.tmp" _save_provider_state(auth_store, "openai-codex", state)
try: _save_auth_store(auth_store)
with tmp_path.open("w", encoding="utf-8") as tmp_file:
tmp_file.write(serialized)
tmp_file.flush()
os.fsync(tmp_file.fileno())
os.replace(tmp_path, auth_path)
finally:
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
try:
auth_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
if lock_held:
_write()
return
with _codex_auth_file_lock(auth_path):
_write()
def _refresh_codex_auth_tokens( def _refresh_codex_auth_tokens(
*, tokens: Dict[str, str],
payload: Dict[str, Any],
auth_path: Path,
timeout_seconds: float, timeout_seconds: float,
lock_held: bool = False, ) -> Dict[str, str]:
) -> Dict[str, Any]: """Refresh Codex access token using the refresh token.
tokens = payload.get("tokens")
if not isinstance(tokens, dict): Saves the new tokens to Hermes auth store automatically.
raise AuthError( """
"Codex auth file is missing a valid 'tokens' object.",
provider="openai-codex",
code="codex_auth_invalid_shape",
relogin_required=True,
)
refresh_token = tokens.get("refresh_token") refresh_token = tokens.get("refresh_token")
if not isinstance(refresh_token, str) or not refresh_token.strip(): if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError( raise AuthError(
"Codex auth file is missing tokens.refresh_token.", "Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.",
provider="openai-codex", provider="openai-codex",
code="codex_auth_missing_refresh_token", code="codex_auth_missing_refresh_token",
relogin_required=True, relogin_required=True,
@@ -649,23 +562,61 @@ def _refresh_codex_auth_tokens(
next_refresh = refresh_payload.get("refresh_token") next_refresh = refresh_payload.get("refresh_token")
if isinstance(next_refresh, str) and next_refresh.strip(): if isinstance(next_refresh, str) and next_refresh.strip():
updated_tokens["refresh_token"] = next_refresh.strip() updated_tokens["refresh_token"] = next_refresh.strip()
payload["tokens"] = updated_tokens
payload["last_refresh"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") _save_codex_tokens(updated_tokens)
_persist_codex_auth_payload(auth_path, payload, lock_held=lock_held)
return updated_tokens return updated_tokens
def _import_codex_cli_tokens() -> Optional[Dict[str, str]]:
"""Try to read tokens from ~/.codex/auth.json (Codex CLI shared file).
Returns tokens dict if valid, None otherwise. Does NOT write to the shared file.
"""
codex_home = os.getenv("CODEX_HOME", "").strip()
if not codex_home:
codex_home = str(Path.home() / ".codex")
auth_path = Path(codex_home).expanduser() / "auth.json"
if not auth_path.is_file():
return None
try:
payload = json.loads(auth_path.read_text())
tokens = payload.get("tokens")
if not isinstance(tokens, dict):
return None
if not tokens.get("access_token") or not tokens.get("refresh_token"):
return None
return dict(tokens)
except Exception:
return None
def resolve_codex_runtime_credentials( def resolve_codex_runtime_credentials(
*, *,
force_refresh: bool = False, force_refresh: bool = False,
refresh_if_expiring: bool = True, refresh_if_expiring: bool = True,
refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS, refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Resolve runtime credentials from Codex CLI auth state.""" """Resolve runtime credentials from Hermes's own Codex token store."""
data = read_codex_auth_file() try:
payload = data["payload"] data = _read_codex_tokens()
except AuthError as orig_err:
# Only attempt migration when there are NO tokens stored at all
# (code == "codex_auth_missing"), not when tokens exist but are invalid.
if orig_err.code != "codex_auth_missing":
raise
# Migration: user had Codex as active provider with old storage (~/.codex/).
cli_tokens = _import_codex_cli_tokens()
if cli_tokens:
logger.info("Migrating Codex credentials from ~/.codex/ to Hermes auth store")
print("⚠️ Migrating Codex credentials to Hermes's own auth store.")
print(" This avoids conflicts with Codex CLI and VS Code.")
print(" Run `hermes login` to create a fully independent session.\n")
_save_codex_tokens(cli_tokens)
data = _read_codex_tokens()
else:
raise
tokens = dict(data["tokens"]) tokens = dict(data["tokens"])
auth_path = data["auth_path"]
access_token = str(tokens.get("access_token", "") or "").strip() access_token = str(tokens.get("access_token", "") or "").strip()
refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20")) refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20"))
@@ -673,10 +624,9 @@ def resolve_codex_runtime_credentials(
if (not should_refresh) and refresh_if_expiring: if (not should_refresh) and refresh_if_expiring:
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds) should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh: if should_refresh:
lock_timeout = max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0) # Re-read under lock to avoid racing with other Hermes processes
with _codex_auth_file_lock(auth_path, timeout_seconds=lock_timeout): with _auth_store_lock(timeout_seconds=max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)):
data = read_codex_auth_file() data = _read_codex_tokens(_lock=False)
payload = data["payload"]
tokens = dict(data["tokens"]) tokens = dict(data["tokens"])
access_token = str(tokens.get("access_token", "") or "").strip() access_token = str(tokens.get("access_token", "") or "").strip()
@@ -685,12 +635,7 @@ def resolve_codex_runtime_credentials(
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds) should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh: if should_refresh:
tokens = _refresh_codex_auth_tokens( tokens = _refresh_codex_auth_tokens(tokens, refresh_timeout_seconds)
payload=payload,
auth_path=auth_path,
timeout_seconds=refresh_timeout_seconds,
lock_held=True,
)
access_token = str(tokens.get("access_token", "") or "").strip() access_token = str(tokens.get("access_token", "") or "").strip()
base_url = ( base_url = (
@@ -702,11 +647,9 @@ def resolve_codex_runtime_credentials(
"provider": "openai-codex", "provider": "openai-codex",
"base_url": base_url, "base_url": base_url,
"api_key": access_token, "api_key": access_token,
"source": "codex-auth-json", "source": "hermes-auth-store",
"last_refresh": payload.get("last_refresh"), "last_refresh": data.get("last_refresh"),
"auth_mode": payload.get("auth_mode"), "auth_mode": "chatgpt",
"auth_file": str(auth_path),
"codex_home": str(data["codex_home"]),
} }
@@ -1140,15 +1083,11 @@ def get_nous_auth_status() -> Dict[str, Any]:
def get_codex_auth_status() -> Dict[str, Any]: def get_codex_auth_status() -> Dict[str, Any]:
"""Status snapshot for Codex auth.""" """Status snapshot for Codex auth."""
state = get_provider_auth_state("openai-codex") or {}
auth_file = state.get("auth_file") or str(_codex_auth_file_path())
codex_home = state.get("codex_home") or str(resolve_codex_home_path())
try: try:
creds = resolve_codex_runtime_credentials() creds = resolve_codex_runtime_credentials()
return { return {
"logged_in": True, "logged_in": True,
"auth_file": creds.get("auth_file"), "auth_store": str(_auth_file_path()),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"), "last_refresh": creds.get("last_refresh"),
"auth_mode": creds.get("auth_mode"), "auth_mode": creds.get("auth_mode"),
"source": creds.get("source"), "source": creds.get("source"),
@@ -1156,8 +1095,7 @@ def get_codex_auth_status() -> Dict[str, Any]:
except AuthError as exc: except AuthError as exc:
return { return {
"logged_in": False, "logged_in": False,
"auth_file": auth_file, "auth_store": str(_auth_file_path()),
"codex_home": codex_home,
"error": str(exc), "error": str(exc),
} }
@@ -1186,21 +1124,15 @@ def detect_external_credentials() -> List[Dict[str, Any]]:
""" """
found: List[Dict[str, Any]] = [] found: List[Dict[str, Any]] = []
# Codex CLI: ~/.codex/auth.json (or $CODEX_HOME/auth.json) # Codex CLI: ~/.codex/auth.json (importable, not shared)
try: cli_tokens = _import_codex_cli_tokens()
codex_home = resolve_codex_home_path() if cli_tokens:
codex_auth = codex_home / "auth.json" codex_path = Path.home() / ".codex" / "auth.json"
if codex_auth.is_file(): found.append({
data = json.loads(codex_auth.read_text()) "provider": "openai-codex",
tokens = data.get("tokens", {}) "path": str(codex_path),
if isinstance(tokens, dict) and tokens.get("access_token"): "label": f"Codex CLI credentials found ({codex_path}) — run `hermes login` to create a separate session",
found.append({ })
"provider": "openai-codex",
"path": str(codex_auth),
"label": f"Codex CLI credentials found ({codex_auth})",
})
except Exception:
pass
return found return found
@@ -1369,52 +1301,58 @@ def login_command(args) -> None:
def _login_openai_codex(args, pconfig: ProviderConfig) -> None: def _login_openai_codex(args, pconfig: ProviderConfig) -> None:
"""OpenAI Codex login via device code flow (no Codex CLI required).""" """OpenAI Codex login via device code flow. Tokens stored in ~/.hermes/auth.json."""
codex_home = resolve_codex_home_path()
# Check for existing valid credentials first # Check for existing Hermes-owned credentials
try: try:
existing = resolve_codex_runtime_credentials() existing = resolve_codex_runtime_credentials()
print(f"Existing Codex credentials found at {codex_home / 'auth.json'}") print("Existing Codex credentials found in Hermes auth store.")
try: try:
reuse = input("Use existing credentials? [Y/n]: ").strip().lower() reuse = input("Use existing credentials? [Y/n]: ").strip().lower()
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
reuse = "y" reuse = "y"
if reuse in ("", "y", "yes"): if reuse in ("", "y", "yes"):
creds = existing config_path = _update_config_for_provider("openai-codex", existing.get("base_url", DEFAULT_CODEX_BASE_URL))
_save_codex_provider_state(creds) print()
print("Login successful!")
print(f" Config updated: {config_path} (model.provider=openai-codex)")
return return
except AuthError: except AuthError:
pass pass
# No existing creds (or user declined) -- run device code flow # Check for existing Codex CLI tokens we can import
cli_tokens = _import_codex_cli_tokens()
if cli_tokens:
print("Found existing Codex CLI credentials at ~/.codex/auth.json")
print("Hermes will create its own session to avoid conflicts with Codex CLI / VS Code.")
try:
do_import = input("Import these credentials? (a separate login is recommended) [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
do_import = "n"
if do_import in ("y", "yes"):
_save_codex_tokens(cli_tokens)
base_url = os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") or DEFAULT_CODEX_BASE_URL
config_path = _update_config_for_provider("openai-codex", base_url)
print()
print("Credentials imported. Note: if Codex CLI refreshes its token,")
print("Hermes will keep working independently with its own session.")
print(f" Config updated: {config_path} (model.provider=openai-codex)")
return
# Run a fresh device code flow — Hermes gets its own OAuth session
print() print()
print("Signing in to OpenAI Codex...") print("Signing in to OpenAI Codex...")
print("(Hermes creates its own session — won't affect Codex CLI or VS Code)")
print() print()
creds = _codex_device_code_login() creds = _codex_device_code_login()
_save_codex_provider_state(creds)
def _save_codex_provider_state(creds: Dict[str, Any]) -> None:
"""Persist Codex provider state to auth store and config."""
auth_state = {
"auth_file": creds.get("auth_file"),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"),
"auth_mode": creds.get("auth_mode"),
"source": creds.get("source"),
}
with _auth_store_lock():
auth_store = _load_auth_store()
_save_provider_state(auth_store, "openai-codex", auth_state)
saved_to = _save_auth_store(auth_store)
# Save tokens to Hermes auth store
_save_codex_tokens(creds["tokens"], creds.get("last_refresh"))
config_path = _update_config_for_provider("openai-codex", creds.get("base_url", DEFAULT_CODEX_BASE_URL)) config_path = _update_config_for_provider("openai-codex", creds.get("base_url", DEFAULT_CODEX_BASE_URL))
print() print()
print("Login successful!") print("Login successful!")
print(f" Auth state: {saved_to}") print(f" Auth state: ~/.hermes/auth.json")
print(f" Config updated: {config_path} (model.provider=openai-codex)") print(f" Config updated: {config_path} (model.provider=openai-codex)")
@@ -1545,31 +1483,19 @@ def _codex_device_code_login() -> Dict[str, Any]:
provider="openai-codex", code="token_exchange_no_access_token", provider="openai-codex", code="token_exchange_no_access_token",
) )
# Step 5: Persist tokens to ~/.codex/auth.json # Return tokens for the caller to persist (no longer writes to ~/.codex/)
codex_home = resolve_codex_home_path()
codex_home.mkdir(parents=True, exist_ok=True)
auth_path = codex_home / "auth.json"
payload = {
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
},
"last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
_persist_codex_auth_payload(auth_path, payload, lock_held=False)
base_url = ( base_url = (
os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/")
or DEFAULT_CODEX_BASE_URL or DEFAULT_CODEX_BASE_URL
) )
return { return {
"api_key": access_token, "tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
},
"base_url": base_url, "base_url": base_url,
"auth_file": str(auth_path), "last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"codex_home": str(codex_home),
"last_refresh": payload["last_refresh"],
"auth_mode": "chatgpt", "auth_mode": "chatgpt",
"source": "device-code", "source": "device-code",
} }

View File

@@ -7,7 +7,7 @@ import logging
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from hermes_cli.auth import resolve_codex_home_path import os
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -119,7 +119,8 @@ def get_codex_model_ids(access_token: Optional[str] = None) -> List[str]:
Resolution order: API (live, if token provided) > config.toml default > Resolution order: API (live, if token provided) > config.toml default >
local cache > hardcoded defaults. local cache > hardcoded defaults.
""" """
codex_home = resolve_codex_home_path() codex_home_str = os.getenv("CODEX_HOME", "").strip() or str(Path.home() / ".codex")
codex_home = Path(codex_home_str).expanduser()
ordered: List[str] = [] ordered: List[str] = []
# Try live API if we have a token # Try live API if we have a token

View File

@@ -127,9 +127,7 @@ def resolve_runtime_provider(
"api_mode": "codex_responses", "api_mode": "codex_responses",
"base_url": creds.get("base_url", "").rstrip("/"), "base_url": creds.get("base_url", "").rstrip("/"),
"api_key": creds.get("api_key", ""), "api_key": creds.get("api_key", ""),
"source": creds.get("source", "codex-auth-json"), "source": creds.get("source", "hermes-auth-store"),
"auth_file": creds.get("auth_file"),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"), "last_refresh": creds.get("last_refresh"),
"requested_provider": requested_provider, "requested_provider": requested_provider,
} }

View File

@@ -45,29 +45,42 @@ def codex_auth_dir(tmp_path, monkeypatch):
class TestReadCodexAccessToken: class TestReadCodexAccessToken:
def test_valid_auth_file(self, tmp_path): def test_valid_auth_store(self, tmp_path, monkeypatch):
codex_dir = tmp_path / ".codex" hermes_home = tmp_path / "hermes"
codex_dir.mkdir() hermes_home.mkdir(parents=True, exist_ok=True)
auth = codex_dir / "auth.json" (hermes_home / "auth.json").write_text(json.dumps({
auth.write_text(json.dumps({ "version": 1,
"tokens": {"access_token": "tok-123", "refresh_token": "r-456"} "providers": {
"openai-codex": {
"tokens": {"access_token": "tok-123", "refresh_token": "r-456"},
},
},
})) }))
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): monkeypatch.setenv("HERMES_HOME", str(hermes_home))
result = _read_codex_access_token() result = _read_codex_access_token()
assert result == "tok-123" assert result == "tok-123"
def test_missing_file_returns_none(self, tmp_path): def test_missing_returns_none(self, tmp_path, monkeypatch):
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): hermes_home = tmp_path / "hermes"
result = _read_codex_access_token() hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
result = _read_codex_access_token()
assert result is None assert result is None
def test_empty_token_returns_none(self, tmp_path): def test_empty_token_returns_none(self, tmp_path, monkeypatch):
codex_dir = tmp_path / ".codex" hermes_home = tmp_path / "hermes"
codex_dir.mkdir() hermes_home.mkdir(parents=True, exist_ok=True)
auth = codex_dir / "auth.json" (hermes_home / "auth.json").write_text(json.dumps({
auth.write_text(json.dumps({"tokens": {"access_token": " "}})) "version": 1,
with patch("agent.auxiliary_client.Path.home", return_value=tmp_path): "providers": {
result = _read_codex_access_token() "openai-codex": {
"tokens": {"access_token": " ", "refresh_token": "r"},
},
},
}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
result = _read_codex_access_token()
assert result is None assert result is None
def test_malformed_json_returns_none(self, tmp_path): def test_malformed_json_returns_none(self, tmp_path):

View File

@@ -1,9 +1,9 @@
"""Tests for Codex auth — tokens stored in Hermes auth store (~/.hermes/auth.json)."""
import json import json
import time import time
import base64 import base64
from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from types import SimpleNamespace
import pytest import pytest
import yaml import yaml
@@ -12,32 +12,35 @@ from hermes_cli.auth import (
AuthError, AuthError,
DEFAULT_CODEX_BASE_URL, DEFAULT_CODEX_BASE_URL,
PROVIDER_REGISTRY, PROVIDER_REGISTRY,
_persist_codex_auth_payload, _read_codex_tokens,
_login_openai_codex, _save_codex_tokens,
login_command, _import_codex_cli_tokens,
get_codex_auth_status, get_codex_auth_status,
get_provider_auth_state, get_provider_auth_state,
read_codex_auth_file,
resolve_codex_runtime_credentials, resolve_codex_runtime_credentials,
resolve_provider, resolve_provider,
) )
def _write_codex_auth(codex_home: Path, *, access_token: str = "access", refresh_token: str = "refresh") -> Path: def _setup_hermes_auth(hermes_home: Path, *, access_token: str = "access", refresh_token: str = "refresh"):
codex_home.mkdir(parents=True, exist_ok=True) """Write Codex tokens into the Hermes auth store."""
auth_file = codex_home / "auth.json" hermes_home.mkdir(parents=True, exist_ok=True)
auth_file.write_text( auth_store = {
json.dumps( "version": 1,
{ "active_provider": "openai-codex",
"auth_mode": "oauth", "providers": {
"last_refresh": "2026-02-26T00:00:00Z", "openai-codex": {
"tokens": { "tokens": {
"access_token": access_token, "access_token": access_token,
"refresh_token": refresh_token, "refresh_token": refresh_token,
}, },
} "last_refresh": "2026-02-26T00:00:00Z",
) "auth_mode": "chatgpt",
) },
},
}
auth_file = hermes_home / "auth.json"
auth_file.write_text(json.dumps(auth_store, indent=2))
return auth_file return auth_file
@@ -47,42 +50,49 @@ def _jwt_with_exp(exp_epoch: int) -> str:
return f"h.{encoded}.s" return f"h.{encoded}.s"
def test_read_codex_auth_file_success(tmp_path, monkeypatch): def test_read_codex_tokens_success(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home" hermes_home = tmp_path / "hermes"
auth_file = _write_codex_auth(codex_home) _setup_hermes_auth(hermes_home)
monkeypatch.setenv("CODEX_HOME", str(codex_home)) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
payload = read_codex_auth_file() data = _read_codex_tokens()
assert data["tokens"]["access_token"] == "access"
assert data["tokens"]["refresh_token"] == "refresh"
assert payload["auth_path"] == auth_file
assert payload["tokens"]["access_token"] == "access" def test_read_codex_tokens_missing(tmp_path, monkeypatch):
assert payload["tokens"]["refresh_token"] == "refresh" hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
# Empty auth store
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with pytest.raises(AuthError) as exc:
_read_codex_tokens()
assert exc.value.code == "codex_auth_missing"
def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch): def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home" hermes_home = tmp_path / "hermes"
_write_codex_auth(codex_home, access_token="") _setup_hermes_auth(hermes_home, access_token="")
monkeypatch.setenv("CODEX_HOME", str(codex_home)) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
with pytest.raises(AuthError) as exc: with pytest.raises(AuthError) as exc:
resolve_codex_runtime_credentials() resolve_codex_runtime_credentials()
assert exc.value.code == "codex_auth_missing_access_token" assert exc.value.code == "codex_auth_missing_access_token"
assert exc.value.relogin_required is True assert exc.value.relogin_required is True
def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch): def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home" hermes_home = tmp_path / "hermes"
expiring_token = _jwt_with_exp(int(time.time()) - 10) expiring_token = _jwt_with_exp(int(time.time()) - 10)
_write_codex_auth(codex_home, access_token=expiring_token, refresh_token="refresh-old") _setup_hermes_auth(hermes_home, access_token=expiring_token, refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home)) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
called = {"count": 0} called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False): def _fake_refresh(tokens, timeout_seconds):
called["count"] += 1 called["count"] += 1
assert auth_path == codex_home / "auth.json"
assert lock_held is True
return {"access_token": "access-new", "refresh_token": "refresh-new"} return {"access_token": "access-new", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh) monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
@@ -94,15 +104,14 @@ def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, mo
def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch): def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home" hermes_home = tmp_path / "hermes"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old") _setup_hermes_auth(hermes_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home)) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
called = {"count": 0} called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False): def _fake_refresh(tokens, timeout_seconds):
called["count"] += 1 called["count"] += 1
assert lock_held is True
return {"access_token": "access-forced", "refresh_token": "refresh-new"} return {"access_token": "access-forced", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh) monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
@@ -113,98 +122,71 @@ def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch):
assert resolved["api_key"] == "access-forced" assert resolved["api_key"] == "access-forced"
def test_resolve_codex_runtime_credentials_uses_file_lock_on_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
lock_calls = {"enter": 0, "exit": 0}
@contextmanager
def _fake_lock(auth_path, timeout_seconds=15.0):
assert auth_path == codex_home / "auth.json"
lock_calls["enter"] += 1
try:
yield
finally:
lock_calls["exit"] += 1
refresh_calls = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
refresh_calls["count"] += 1
assert lock_held is True
return {"access_token": "access-updated", "refresh_token": "refresh-updated"}
monkeypatch.setattr("hermes_cli.auth._codex_auth_file_lock", _fake_lock)
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
assert refresh_calls["count"] == 1
assert lock_calls["enter"] == 1
assert lock_calls["exit"] == 1
assert resolved["api_key"] == "access-updated"
def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch): def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False) monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
assert resolve_provider("openai-codex") == "openai-codex" assert resolve_provider("openai-codex") == "openai-codex"
def test_persist_codex_auth_payload_writes_atomically(tmp_path): def test_save_codex_tokens_roundtrip(tmp_path, monkeypatch):
auth_path = tmp_path / "auth.json" hermes_home = tmp_path / "hermes"
auth_path.write_text('{"stale":true}\n') hermes_home.mkdir(parents=True, exist_ok=True)
payload = { (hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
"auth_mode": "oauth", monkeypatch.setenv("HERMES_HOME", str(hermes_home))
"tokens": {
"access_token": "next-access",
"refresh_token": "next-refresh",
},
"last_refresh": "2026-02-26T00:00:00Z",
}
_persist_codex_auth_payload(auth_path, payload) _save_codex_tokens({"access_token": "at123", "refresh_token": "rt456"})
data = _read_codex_tokens()
stored = json.loads(auth_path.read_text()) assert data["tokens"]["access_token"] == "at123"
assert stored == payload assert data["tokens"]["refresh_token"] == "rt456"
assert list(tmp_path.glob(".auth.json.*.tmp")) == []
def test_get_codex_auth_status_not_logged_in(tmp_path, monkeypatch): def test_import_codex_cli_tokens(tmp_path, monkeypatch):
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "missing-codex-home")) codex_home = tmp_path / "codex-cli"
status = get_codex_auth_status() codex_home.mkdir(parents=True, exist_ok=True)
assert status["logged_in"] is False (codex_home / "auth.json").write_text(json.dumps({
assert "error" in status "tokens": {"access_token": "cli-at", "refresh_token": "cli-rt"},
}))
monkeypatch.setenv("CODEX_HOME", str(codex_home))
tokens = _import_codex_cli_tokens()
assert tokens is not None
assert tokens["access_token"] == "cli-at"
assert tokens["refresh_token"] == "cli-rt"
def test_login_openai_codex_persists_provider_state(tmp_path, monkeypatch): def test_import_codex_cli_tokens_missing(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes-home" monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent"))
codex_home = tmp_path / "codex-home" assert _import_codex_cli_tokens() is None
_write_codex_auth(codex_home)
def test_codex_tokens_not_written_to_shared_file(tmp_path, monkeypatch):
"""Verify Hermes never writes to ~/.codex/auth.json."""
hermes_home = tmp_path / "hermes"
codex_home = tmp_path / "codex-cli"
hermes_home.mkdir(parents=True, exist_ok=True)
codex_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(codex_home)) monkeypatch.setenv("CODEX_HOME", str(codex_home))
# Mock input() to accept existing credentials
monkeypatch.setattr("builtins.input", lambda _: "y")
_login_openai_codex(SimpleNamespace(), PROVIDER_REGISTRY["openai-codex"]) _save_codex_tokens({"access_token": "hermes-at", "refresh_token": "hermes-rt"})
state = get_provider_auth_state("openai-codex") # ~/.codex/auth.json should NOT exist
assert state is not None assert not (codex_home / "auth.json").exists()
assert state["source"] == "codex-auth-json"
assert state["auth_file"].endswith("auth.json")
config_path = hermes_home / "config.yaml" # Hermes auth store should have the tokens
config = yaml.safe_load(config_path.read_text()) data = _read_codex_tokens()
assert config["model"]["provider"] == "openai-codex" assert data["tokens"]["access_token"] == "hermes-at"
assert config["model"]["base_url"] == DEFAULT_CODEX_BASE_URL
def test_login_command_shows_deprecation(monkeypatch, capsys): def test_resolve_returns_hermes_auth_store_source(tmp_path, monkeypatch):
"""login_command is deprecated and directs users to hermes model.""" hermes_home = tmp_path / "hermes"
with pytest.raises(SystemExit) as exc_info: _setup_hermes_auth(hermes_home)
login_command(SimpleNamespace()) monkeypatch.setenv("HERMES_HOME", str(hermes_home))
assert exc_info.value.code == 0
captured = capsys.readouterr() creds = resolve_codex_runtime_credentials()
assert "hermes model" in captured.out assert creds["source"] == "hermes-auth-store"
assert creds["provider"] == "openai-codex"
assert creds["base_url"] == DEFAULT_CODEX_BASE_URL

View File

@@ -10,42 +10,41 @@ from hermes_cli.auth import detect_external_credentials
class TestDetectCodexCLI: class TestDetectCodexCLI:
def test_detects_valid_codex_auth(self, tmp_path): def test_detects_valid_codex_auth(self, tmp_path, monkeypatch):
codex_dir = tmp_path / ".codex" codex_dir = tmp_path / ".codex"
codex_dir.mkdir() codex_dir.mkdir()
auth = codex_dir / "auth.json" auth = codex_dir / "auth.json"
auth.write_text(json.dumps({ auth.write_text(json.dumps({
"tokens": {"access_token": "tok-123", "refresh_token": "ref-456"} "tokens": {"access_token": "tok-123", "refresh_token": "ref-456"}
})) }))
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): monkeypatch.setenv("CODEX_HOME", str(codex_dir))
result = detect_external_credentials() result = detect_external_credentials()
codex_hits = [c for c in result if c["provider"] == "openai-codex"] codex_hits = [c for c in result if c["provider"] == "openai-codex"]
assert len(codex_hits) == 1 assert len(codex_hits) == 1
assert "Codex CLI" in codex_hits[0]["label"] assert "Codex CLI" in codex_hits[0]["label"]
assert str(auth) == codex_hits[0]["path"]
def test_skips_codex_without_access_token(self, tmp_path): def test_skips_codex_without_access_token(self, tmp_path, monkeypatch):
codex_dir = tmp_path / ".codex" codex_dir = tmp_path / ".codex"
codex_dir.mkdir() codex_dir.mkdir()
(codex_dir / "auth.json").write_text(json.dumps({"tokens": {}})) (codex_dir / "auth.json").write_text(json.dumps({"tokens": {}}))
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): monkeypatch.setenv("CODEX_HOME", str(codex_dir))
result = detect_external_credentials() result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result) assert not any(c["provider"] == "openai-codex" for c in result)
def test_skips_missing_codex_dir(self, tmp_path): def test_skips_missing_codex_dir(self, tmp_path, monkeypatch):
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / "nonexistent"): monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent"))
result = detect_external_credentials() result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result) assert not any(c["provider"] == "openai-codex" for c in result)
def test_skips_malformed_codex_auth(self, tmp_path): def test_skips_malformed_codex_auth(self, tmp_path, monkeypatch):
codex_dir = tmp_path / ".codex" codex_dir = tmp_path / ".codex"
codex_dir.mkdir() codex_dir.mkdir()
(codex_dir / "auth.json").write_text("{bad json") (codex_dir / "auth.json").write_text("{bad json")
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=codex_dir): monkeypatch.setenv("CODEX_HOME", str(codex_dir))
result = detect_external_credentials() result = detect_external_credentials()
assert not any(c["provider"] == "openai-codex" for c in result) assert not any(c["provider"] == "openai-codex" for c in result)
def test_returns_empty_when_nothing_found(self, tmp_path): def test_returns_empty_when_nothing_found(self, tmp_path, monkeypatch):
with patch("hermes_cli.auth.resolve_codex_home_path", return_value=tmp_path / ".codex"): monkeypatch.setenv("CODEX_HOME", str(tmp_path / "nonexistent"))
result = detect_external_credentials() result = detect_external_credentials()
assert result == [] assert result == []