Merge pull request #43 from grp06/codex/align-codex-provider-conventions-mainrepo

Enable ChatGPT subscription Codex support end-to-end
This commit is contained in:
Teknium
2026-02-28 18:13:58 -08:00
committed by GitHub
19 changed files with 3280 additions and 129 deletions

86
cli.py
View File

@@ -803,7 +803,7 @@ class HermesCLI:
Args:
model: Model to use (default: from env or claude-sonnet)
toolsets: List of toolsets to enable (default: all)
provider: Inference provider ("auto", "openrouter", "nous")
provider: Inference provider ("auto", "openrouter", "nous", "openai-codex")
api_key: API key (default: from environment)
base_url: API base URL (default: OpenRouter)
max_turns: Maximum tool-calling iterations (default: 60)
@@ -821,26 +821,28 @@ class HermesCLI:
# Configuration - priority: CLI args > env vars > config file
# Model can come from: CLI arg, LLM_MODEL env, OPENAI_MODEL env (custom endpoint), or config
self.model = model or os.getenv("LLM_MODEL") or os.getenv("OPENAI_MODEL") or CLI_CONFIG["model"]["default"]
# Base URL: custom endpoint (OPENAI_BASE_URL) takes precedence over OpenRouter
self.base_url = base_url or os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
# API key: custom endpoint (OPENAI_API_KEY) takes precedence over OpenRouter
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
# Provider resolution: determines whether to use OAuth credentials or env var keys
from hermes_cli.auth import resolve_provider
self._explicit_api_key = api_key
self._explicit_base_url = base_url
# Provider selection is resolved lazily at use-time via _ensure_runtime_credentials().
self.requested_provider = (
provider
or os.getenv("HERMES_INFERENCE_PROVIDER")
or CLI_CONFIG["model"].get("provider")
or "auto"
)
self.provider = resolve_provider(
self.requested_provider,
explicit_api_key=api_key,
explicit_base_url=base_url,
self._provider_source: Optional[str] = None
self.provider = self.requested_provider
self.api_mode = "chat_completions"
self.base_url = (
base_url
or os.getenv("OPENAI_BASE_URL")
or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
)
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
# Max turns priority: CLI arg > env var > config file (agent.max_turns or root max_turns) > default
if max_turns != 60: # CLI arg was explicitly set
self._nous_key_expires_at: Optional[str] = None
self._nous_key_source: Optional[str] = None
# Max turns priority: CLI arg > config file > env var > default
@@ -903,45 +905,51 @@ class HermesCLI:
def _ensure_runtime_credentials(self) -> bool:
"""
Ensure OAuth provider credentials are fresh before agent use.
For Nous Portal: checks agent key TTL, refreshes/re-mints as needed.
If the key changed, tears down the agent so it rebuilds with new creds.
Ensure runtime credentials are resolved before agent use.
Re-resolves provider credentials so key rotation and token refresh
are picked up without restarting the CLI.
Returns True if credentials are ready, False on auth failure.
"""
if self.provider != "nous":
return True
from hermes_cli.auth import format_auth_error, resolve_nous_runtime_credentials
from hermes_cli.runtime_provider import (
resolve_runtime_provider,
format_runtime_provider_error,
)
try:
credentials = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(
60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))
),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
runtime = resolve_runtime_provider(
requested=self.requested_provider,
explicit_api_key=self._explicit_api_key,
explicit_base_url=self._explicit_base_url,
)
except Exception as exc:
message = format_auth_error(exc)
message = format_runtime_provider_error(exc)
self.console.print(f"[bold red]{message}[/]")
return False
api_key = credentials.get("api_key")
base_url = credentials.get("base_url")
api_key = runtime.get("api_key")
base_url = runtime.get("base_url")
resolved_provider = runtime.get("provider", "openrouter")
resolved_api_mode = runtime.get("api_mode", self.api_mode)
if not isinstance(api_key, str) or not api_key:
self.console.print("[bold red]Nous credential resolver returned an empty API key.[/]")
self.console.print("[bold red]Provider resolver returned an empty API key.[/]")
return False
if not isinstance(base_url, str) or not base_url:
self.console.print("[bold red]Nous credential resolver returned an empty base URL.[/]")
self.console.print("[bold red]Provider resolver returned an empty base URL.[/]")
return False
credentials_changed = api_key != self.api_key or base_url != self.base_url
routing_changed = (
resolved_provider != self.provider
or resolved_api_mode != self.api_mode
)
self.provider = resolved_provider
self.api_mode = resolved_api_mode
self._provider_source = runtime.get("source")
self.api_key = api_key
self.base_url = base_url
self._nous_key_expires_at = credentials.get("expires_at")
self._nous_key_source = credentials.get("source")
# AIAgent/OpenAI client holds auth at init time, so rebuild if key rotated
if credentials_changed and self.agent is not None:
if (credentials_changed or routing_changed) and self.agent is not None:
self.agent = None
return True
@@ -957,7 +965,7 @@ class HermesCLI:
if self.agent is not None:
return True
if self.provider == "nous" and not self._ensure_runtime_credentials():
if not self._ensure_runtime_credentials():
return False
# Initialize SQLite session store for CLI sessions
@@ -1001,6 +1009,8 @@ class HermesCLI:
model=self.model,
api_key=self.api_key,
base_url=self.base_url,
provider=self.provider,
api_mode=self.api_mode,
max_iterations=self.max_turns,
enabled_toolsets=self.enabled_toolsets,
verbose_logging=self.verbose,
@@ -1093,8 +1103,8 @@ class HermesCLI:
toolsets_info = f" [dim #B8860B]·[/] [#CD7F32]toolsets: {', '.join(self.enabled_toolsets)}[/]"
provider_info = f" [dim #B8860B]·[/] [dim]provider: {self.provider}[/]"
if self.provider == "nous" and self._nous_key_source:
provider_info += f" [dim #B8860B]·[/] [dim]key: {self._nous_key_source}[/]"
if self._provider_source:
provider_info += f" [dim #B8860B]·[/] [dim]auth: {self._provider_source}[/]"
self.console.print(
f" {api_indicator} [#FFBF00]{model_short}[/] "
@@ -1929,8 +1939,8 @@ class HermesCLI:
Returns:
The agent's response, or None on error
"""
# Refresh OAuth credentials if needed (handles key rotation transparently)
if self.provider == "nous" and not self._ensure_runtime_credentials():
# Refresh provider credentials if needed (handles key rotation transparently)
if not self._ensure_runtime_credentials():
return None
# Initialize agent if needed

View File

@@ -172,10 +172,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except UnicodeDecodeError:
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
model = os.getenv("HERMES_MODEL", "anthropic/claude-opus-4.6")
# Custom endpoint (OPENAI_*) takes precedence, matching CLI behavior
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml
@@ -188,24 +185,27 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
base_url = _model_cfg.get("base_url", base_url)
# Check if provider is nous — resolve OAuth credentials
provider = _model_cfg.get("provider", "") if isinstance(_model_cfg, dict) else ""
if provider == "nous":
try:
from hermes_cli.auth import resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
api_key = creds.get("api_key", api_key)
base_url = creds.get("base_url", base_url)
except Exception as nous_err:
logging.warning("Nous Portal credential resolution failed for cron: %s", nous_err)
except Exception:
pass
from hermes_cli.runtime_provider import (
resolve_runtime_provider,
format_runtime_provider_error,
)
try:
runtime = resolve_runtime_provider(
requested=os.getenv("HERMES_INFERENCE_PROVIDER"),
)
except Exception as exc:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
agent = AIAgent(
model=model,
api_key=api_key,
base_url=base_url,
api_key=runtime.get("api_key"),
base_url=runtime.get("base_url"),
provider=runtime.get("provider"),
api_mode=runtime.get("api_mode"),
quiet_mode=True,
session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)

View File

@@ -125,6 +125,28 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp
logger = logging.getLogger(__name__)
def _resolve_runtime_agent_kwargs() -> dict:
"""Resolve provider credentials for gateway-created AIAgent instances."""
from hermes_cli.runtime_provider import (
resolve_runtime_provider,
format_runtime_provider_error,
)
try:
runtime = resolve_runtime_provider(
requested=os.getenv("HERMES_INFERENCE_PROVIDER"),
)
except Exception as exc:
raise RuntimeError(format_runtime_provider_error(exc)) from exc
return {
"api_key": runtime.get("api_key"),
"base_url": runtime.get("base_url"),
"provider": runtime.get("provider"),
"api_mode": runtime.get("api_mode"),
}
class GatewayRunner:
"""
Main gateway controller.
@@ -958,14 +980,11 @@ class GatewayRunner:
from run_agent import AIAgent
loop = asyncio.get_event_loop()
# Resolve credentials so the flush agent can reach the LLM
_flush_api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
_flush_base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
_flush_model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL", "anthropic/claude-opus-4.6")
_flush_model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
def _do_flush():
tmp_agent = AIAgent(
model=_flush_model,
api_key=_flush_api_key,
base_url=_flush_base_url,
**_resolve_runtime_agent_kwargs(),
max_iterations=5,
quiet_mode=True,
enabled_toolsets=["memory"],
@@ -1678,7 +1697,7 @@ class GatewayRunner:
combined_ephemeral = context_prompt or ""
if self._ephemeral_system_prompt:
combined_ephemeral = (combined_ephemeral + "\n\n" + self._ephemeral_system_prompt).strip()
# Re-read .env and config for fresh credentials (gateway is long-lived,
# keys may change without restart).
try:
@@ -1688,9 +1707,6 @@ class GatewayRunner:
except Exception:
pass
# Custom endpoint (OPENAI_*) takes precedence, matching CLI behavior
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
@@ -1704,24 +1720,22 @@ class GatewayRunner:
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
base_url = _model_cfg.get("base_url", base_url)
# Check if provider is nous — resolve OAuth credentials
provider = _model_cfg.get("provider", "") if isinstance(_model_cfg, dict) else ""
if provider == "nous":
try:
from hermes_cli.auth import resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
api_key = creds.get("api_key", api_key)
base_url = creds.get("base_url", base_url)
except Exception as nous_err:
logger.warning("Nous Portal credential resolution failed: %s", nous_err)
except Exception:
pass
try:
runtime_kwargs = _resolve_runtime_agent_kwargs()
except Exception as exc:
return {
"final_response": f"⚠️ Provider authentication failed: {exc}",
"messages": [],
"api_calls": 0,
"tools": [],
}
agent = AIAgent(
model=model,
api_key=api_key,
base_url=base_url,
**runtime_kwargs,
max_iterations=max_iterations,
quiet_mode=True,
verbose_logging=False,

View File

@@ -18,7 +18,10 @@ from __future__ import annotations
import json
import logging
import os
import shutil
import stat
import base64
import subprocess
import time
import webbrowser
from contextlib import contextmanager
@@ -55,6 +58,10 @@ DEFAULT_NOUS_SCOPE = "inference:mint_agent_key"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token"
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
# =============================================================================
@@ -84,7 +91,12 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
client_id=DEFAULT_NOUS_CLIENT_ID,
scope=DEFAULT_NOUS_SCOPE,
),
# Future: "openai_codex", "anthropic", etc.
"openai-codex": ProviderConfig(
id="openai-codex",
name="OpenAI Codex",
auth_type="oauth_external",
inference_base_url=DEFAULT_CODEX_BASE_URL,
),
}
@@ -298,12 +310,15 @@ def resolve_provider(
"""
normalized = (requested or "auto").strip().lower()
if normalized in {"openrouter", "custom"}:
return "openrouter"
if normalized in PROVIDER_REGISTRY:
return normalized
if normalized == "openrouter":
return "openrouter"
if normalized != "auto":
return "openrouter"
raise AuthError(
f"Unknown provider '{normalized}'.",
code="invalid_provider",
)
# Explicit one-off CLI creds always mean openrouter/custom
if explicit_api_key or explicit_base_url:
@@ -314,8 +329,8 @@ def resolve_provider(
auth_store = _load_auth_store()
active = auth_store.get("active_provider")
if active and active in PROVIDER_REGISTRY:
state = _load_provider_state(auth_store, active)
if state and (state.get("access_token") or state.get("refresh_token")):
status = get_auth_status(active)
if status.get("logged_in"):
return active
except Exception as e:
logger.debug("Could not detect active auth provider: %s", e)
@@ -369,6 +384,27 @@ def _optional_base_url(value: Any) -> Optional[str]:
return cleaned if cleaned else None
def _decode_jwt_claims(token: Any) -> Dict[str, Any]:
if not isinstance(token, str) or token.count(".") != 2:
return {}
payload = token.split(".")[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
try:
raw = base64.urlsafe_b64decode(payload.encode("utf-8"))
claims = json.loads(raw.decode("utf-8"))
except Exception:
return {}
return claims if isinstance(claims, dict) else {}
def _codex_access_token_is_expiring(access_token: Any, skew_seconds: int) -> bool:
claims = _decode_jwt_claims(access_token)
exp = claims.get("exp")
if not isinstance(exp, (int, float)):
return False
return float(exp) <= (time.time() + max(0, int(skew_seconds)))
# =============================================================================
# SSH / remote session detection
# =============================================================================
@@ -378,6 +414,302 @@ def _is_remote_session() -> bool:
return bool(os.getenv("SSH_CLIENT") or os.getenv("SSH_TTY"))
# =============================================================================
# OpenAI Codex auth file helpers
# =============================================================================
def resolve_codex_home_path() -> Path:
"""Resolve CODEX_HOME, defaulting to ~/.codex."""
codex_home = os.getenv("CODEX_HOME", "").strip()
if not codex_home:
codex_home = str(Path.home() / ".codex")
return Path(codex_home).expanduser()
def _codex_auth_file_path() -> Path:
return resolve_codex_home_path() / "auth.json"
def _codex_auth_lock_path(auth_path: Path) -> Path:
return auth_path.with_suffix(auth_path.suffix + ".lock")
@contextmanager
def _codex_auth_file_lock(
auth_path: Path,
timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS,
):
lock_path = _codex_auth_lock_path(auth_path)
lock_path.parent.mkdir(parents=True, exist_ok=True)
with lock_path.open("a+") as lock_file:
if fcntl is None:
yield
return
deadline = time.time() + max(1.0, timeout_seconds)
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() >= deadline:
raise TimeoutError(f"Timed out waiting for Codex auth lock: {lock_path}")
time.sleep(0.05)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def read_codex_auth_file() -> Dict[str, Any]:
"""Read and validate Codex auth.json shape."""
codex_home = resolve_codex_home_path()
if not codex_home.exists():
raise AuthError(
f"Codex home directory not found at {codex_home}.",
provider="openai-codex",
code="codex_home_missing",
relogin_required=True,
)
auth_path = codex_home / "auth.json"
if not auth_path.exists():
raise AuthError(
f"Codex auth file not found at {auth_path}.",
provider="openai-codex",
code="codex_auth_missing",
relogin_required=True,
)
try:
payload = json.loads(auth_path.read_text())
except Exception as exc:
raise AuthError(
f"Failed to parse Codex auth file at {auth_path}.",
provider="openai-codex",
code="codex_auth_invalid_json",
relogin_required=True,
) from exc
tokens = payload.get("tokens")
if not isinstance(tokens, dict):
raise AuthError(
"Codex auth file is missing a valid 'tokens' object.",
provider="openai-codex",
code="codex_auth_invalid_shape",
relogin_required=True,
)
access_token = tokens.get("access_token")
refresh_token = tokens.get("refresh_token")
if not isinstance(access_token, str) or not access_token.strip():
raise AuthError(
"Codex auth file is missing tokens.access_token.",
provider="openai-codex",
code="codex_auth_missing_access_token",
relogin_required=True,
)
if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError(
"Codex auth file is missing tokens.refresh_token.",
provider="openai-codex",
code="codex_auth_missing_refresh_token",
relogin_required=True,
)
return {
"payload": payload,
"tokens": tokens,
"auth_path": auth_path,
"codex_home": codex_home,
}
def _persist_codex_auth_payload(
auth_path: Path,
payload: Dict[str, Any],
*,
lock_held: bool = False,
) -> None:
auth_path.parent.mkdir(parents=True, exist_ok=True)
def _write() -> None:
serialized = json.dumps(payload, indent=2, ensure_ascii=False) + "\n"
tmp_path = auth_path.parent / f".{auth_path.name}.{os.getpid()}.{time.time_ns()}.tmp"
try:
with tmp_path.open("w", encoding="utf-8") as tmp_file:
tmp_file.write(serialized)
tmp_file.flush()
os.fsync(tmp_file.fileno())
os.replace(tmp_path, auth_path)
finally:
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
try:
auth_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
if lock_held:
_write()
return
with _codex_auth_file_lock(auth_path):
_write()
def _refresh_codex_auth_tokens(
*,
payload: Dict[str, Any],
auth_path: Path,
timeout_seconds: float,
lock_held: bool = False,
) -> Dict[str, Any]:
tokens = payload.get("tokens")
if not isinstance(tokens, dict):
raise AuthError(
"Codex auth file is missing a valid 'tokens' object.",
provider="openai-codex",
code="codex_auth_invalid_shape",
relogin_required=True,
)
refresh_token = tokens.get("refresh_token")
if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError(
"Codex auth file is missing tokens.refresh_token.",
provider="openai-codex",
code="codex_auth_missing_refresh_token",
relogin_required=True,
)
timeout = httpx.Timeout(max(5.0, float(timeout_seconds)))
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}) as client:
response = client.post(
CODEX_OAUTH_TOKEN_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CODEX_OAUTH_CLIENT_ID,
},
)
if response.status_code != 200:
code = "codex_refresh_failed"
message = f"Codex token refresh failed with status {response.status_code}."
relogin_required = False
try:
err = response.json()
if isinstance(err, dict):
err_code = err.get("error")
if isinstance(err_code, str) and err_code.strip():
code = err_code.strip()
err_desc = err.get("error_description") or err.get("message")
if isinstance(err_desc, str) and err_desc.strip():
message = f"Codex token refresh failed: {err_desc.strip()}"
except Exception:
pass
if code in {"invalid_grant", "invalid_token", "invalid_request"}:
relogin_required = True
raise AuthError(
message,
provider="openai-codex",
code=code,
relogin_required=relogin_required,
)
try:
refresh_payload = response.json()
except Exception as exc:
raise AuthError(
"Codex token refresh returned invalid JSON.",
provider="openai-codex",
code="codex_refresh_invalid_json",
relogin_required=True,
) from exc
access_token = refresh_payload.get("access_token")
if not isinstance(access_token, str) or not access_token.strip():
raise AuthError(
"Codex token refresh response was missing access_token.",
provider="openai-codex",
code="codex_refresh_missing_access_token",
relogin_required=True,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = access_token.strip()
next_refresh = refresh_payload.get("refresh_token")
if isinstance(next_refresh, str) and next_refresh.strip():
updated_tokens["refresh_token"] = next_refresh.strip()
payload["tokens"] = updated_tokens
payload["last_refresh"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
_persist_codex_auth_payload(auth_path, payload, lock_held=lock_held)
return updated_tokens
def resolve_codex_runtime_credentials(
*,
force_refresh: bool = False,
refresh_if_expiring: bool = True,
refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
) -> Dict[str, Any]:
"""Resolve runtime credentials from Codex CLI auth state."""
data = read_codex_auth_file()
payload = data["payload"]
tokens = dict(data["tokens"])
auth_path = data["auth_path"]
access_token = str(tokens.get("access_token", "") or "").strip()
refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20"))
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
lock_timeout = max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)
with _codex_auth_file_lock(auth_path, timeout_seconds=lock_timeout):
data = read_codex_auth_file()
payload = data["payload"]
tokens = dict(data["tokens"])
access_token = str(tokens.get("access_token", "") or "").strip()
should_refresh = bool(force_refresh)
if (not should_refresh) and refresh_if_expiring:
should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
if should_refresh:
tokens = _refresh_codex_auth_tokens(
payload=payload,
auth_path=auth_path,
timeout_seconds=refresh_timeout_seconds,
lock_held=True,
)
access_token = str(tokens.get("access_token", "") or "").strip()
base_url = (
os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/")
or DEFAULT_CODEX_BASE_URL
)
return {
"provider": "openai-codex",
"base_url": base_url,
"api_key": access_token,
"source": "codex-auth-json",
"last_refresh": payload.get("last_refresh"),
"auth_mode": payload.get("auth_mode"),
"auth_file": str(auth_path),
"codex_home": str(data["codex_home"]),
}
# =============================================================================
# TLS verification helper
# =============================================================================
@@ -806,11 +1138,37 @@ def get_nous_auth_status() -> Dict[str, Any]:
}
def get_codex_auth_status() -> Dict[str, Any]:
"""Status snapshot for Codex auth."""
state = get_provider_auth_state("openai-codex") or {}
auth_file = state.get("auth_file") or str(_codex_auth_file_path())
codex_home = state.get("codex_home") or str(resolve_codex_home_path())
try:
creds = resolve_codex_runtime_credentials()
return {
"logged_in": True,
"auth_file": creds.get("auth_file"),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"),
"auth_mode": creds.get("auth_mode"),
"source": creds.get("source"),
}
except AuthError as exc:
return {
"logged_in": False,
"auth_file": auth_file,
"codex_home": codex_home,
"error": str(exc),
}
def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Generic auth status dispatcher."""
target = provider_id or get_active_provider()
if target == "nous":
return get_nous_auth_status()
if target == "openai-codex":
return get_codex_auth_status()
return {"logged_in": False}
@@ -982,11 +1340,64 @@ def login_command(args) -> None:
if provider_id == "nous":
_login_nous(args, pconfig)
elif provider_id == "openai-codex":
_login_openai_codex(args, pconfig)
else:
print(f"Login for provider '{provider_id}' is not yet implemented.")
raise SystemExit(1)
def _login_openai_codex(args, pconfig: ProviderConfig) -> None:
"""OpenAI Codex login flow using Codex CLI auth state."""
codex_path = shutil.which("codex")
if not codex_path:
print("Codex CLI was not found in PATH.")
print("Install Codex CLI, then retry `hermes login --provider openai-codex`.")
raise SystemExit(1)
print(f"Starting Hermes login via {pconfig.name}...")
print(f"Using Codex CLI: {codex_path}")
print(f"Codex home: {resolve_codex_home_path()}")
creds: Dict[str, Any]
try:
creds = resolve_codex_runtime_credentials()
except AuthError:
print("No usable Codex auth found. Running `codex login`...")
try:
subprocess.run(["codex", "login"], check=True)
except subprocess.CalledProcessError as exc:
print(f"Codex login failed with exit code {exc.returncode}.")
raise SystemExit(1)
except KeyboardInterrupt:
print("\nLogin cancelled.")
raise SystemExit(130)
try:
creds = resolve_codex_runtime_credentials()
except AuthError as exc:
print(format_auth_error(exc))
raise SystemExit(1)
auth_state = {
"auth_file": creds.get("auth_file"),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"),
"auth_mode": creds.get("auth_mode"),
"source": creds.get("source"),
}
with _auth_store_lock():
auth_store = _load_auth_store()
_save_provider_state(auth_store, "openai-codex", auth_state)
saved_to = _save_auth_store(auth_store)
config_path = _update_config_for_provider("openai-codex", creds["base_url"])
print()
print("Login successful!")
print(f" Auth state: {saved_to}")
print(f" Config updated: {config_path} (model.provider=openai-codex)")
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
portal_base_url = (

View File

@@ -0,0 +1,91 @@
"""Codex model discovery from local Codex CLI cache/config."""
from __future__ import annotations
import json
from pathlib import Path
from typing import List, Optional
from hermes_cli.auth import resolve_codex_home_path
DEFAULT_CODEX_MODELS: List[str] = [
"gpt-5-codex",
"gpt-5.3-codex",
"gpt-5.2-codex",
"gpt-5.1-codex",
]
def _read_default_model(codex_home: Path) -> Optional[str]:
config_path = codex_home / "config.toml"
if not config_path.exists():
return None
try:
import tomllib
except Exception:
return None
try:
payload = tomllib.loads(config_path.read_text(encoding="utf-8"))
except Exception:
return None
model = payload.get("model") if isinstance(payload, dict) else None
if isinstance(model, str) and model.strip():
return model.strip()
return None
def _read_cache_models(codex_home: Path) -> List[str]:
cache_path = codex_home / "models_cache.json"
if not cache_path.exists():
return []
try:
raw = json.loads(cache_path.read_text(encoding="utf-8"))
except Exception:
return []
entries = raw.get("models") if isinstance(raw, dict) else None
sortable = []
if isinstance(entries, list):
for item in entries:
if not isinstance(item, dict):
continue
slug = item.get("slug")
if not isinstance(slug, str) or not slug.strip():
continue
slug = slug.strip()
if "codex" not in slug.lower():
continue
if item.get("supported_in_api") is False:
continue
visibility = item.get("visibility")
if isinstance(visibility, str) and visibility.strip().lower() == "hidden":
continue
priority = item.get("priority")
rank = int(priority) if isinstance(priority, (int, float)) else 10_000
sortable.append((rank, slug))
sortable.sort(key=lambda item: (item[0], item[1]))
deduped: List[str] = []
for _, slug in sortable:
if slug not in deduped:
deduped.append(slug)
return deduped
def get_codex_model_ids() -> List[str]:
codex_home = resolve_codex_home_path()
ordered: List[str] = []
default_model = _read_default_model(codex_home)
if default_model:
ordered.append(default_model)
for model_id in _read_cache_models(codex_home):
if model_id not in ordered:
ordered.append(model_id)
for model_id in DEFAULT_CODEX_MODELS:
if model_id not in ordered:
ordered.append(model_id)
return ordered

View File

@@ -175,6 +175,36 @@ def run_doctor(args):
else:
check_warn("config.yaml not found", "(using defaults)")
# =========================================================================
# Check: Auth providers
# =========================================================================
print()
print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
try:
from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
nous_status = get_nous_auth_status()
if nous_status.get("logged_in"):
check_ok("Nous Portal auth", "(logged in)")
else:
check_warn("Nous Portal auth", "(not logged in)")
codex_status = get_codex_auth_status()
if codex_status.get("logged_in"):
check_ok("OpenAI Codex auth", "(logged in)")
else:
check_warn("OpenAI Codex auth", "(not logged in)")
if codex_status.get("error"):
check_info(codex_status["error"])
except Exception as e:
check_warn("Auth provider status", f"(could not check: {e})")
if shutil.which("codex"):
check_ok("codex CLI")
else:
check_warn("codex CLI not found", "(required for openai-codex login)")
# =========================================================================
# Check: Directory structure
# =========================================================================

View File

@@ -60,6 +60,7 @@ logger = logging.getLogger(__name__)
def _has_any_provider_configured() -> bool:
"""Check if at least one inference provider is usable."""
from hermes_cli.config import get_env_path, get_hermes_home
from hermes_cli.auth import get_auth_status
# Check env vars (may be set by .env or shell).
# OPENAI_BASE_URL alone counts — local models (vLLM, llama.cpp, etc.)
@@ -91,8 +92,8 @@ def _has_any_provider_configured() -> bool:
auth = json.loads(auth_file.read_text())
active = auth.get("active_provider")
if active:
state = auth.get("providers", {}).get(active, {})
if state.get("access_token") or state.get("refresh_token"):
status = get_auth_status(active)
if status.get("logged_in"):
return True
except Exception:
pass
@@ -289,7 +290,7 @@ def cmd_model(args):
resolve_provider, get_provider_auth_state, PROVIDER_REGISTRY,
_prompt_model_selection, _save_model_choice, _update_config_for_provider,
resolve_nous_runtime_credentials, fetch_nous_models, AuthError, format_auth_error,
_login_nous, ProviderConfig,
_login_nous,
)
from hermes_cli.config import load_config, save_config, get_env_value, save_env_value
@@ -312,7 +313,12 @@ def cmd_model(args):
or config_provider
or "auto"
)
active = resolve_provider(effective_provider)
try:
active = resolve_provider(effective_provider)
except AuthError as exc:
warning = format_auth_error(exc)
print(f"Warning: {warning} Falling back to auto provider detection.")
active = resolve_provider("auto")
# Detect custom endpoint
if active == "openrouter" and get_env_value("OPENAI_BASE_URL"):
@@ -321,6 +327,7 @@ def cmd_model(args):
provider_labels = {
"openrouter": "OpenRouter",
"nous": "Nous Portal",
"openai-codex": "OpenAI Codex",
"custom": "Custom endpoint",
}
active_label = provider_labels.get(active, active)
@@ -334,11 +341,12 @@ def cmd_model(args):
providers = [
("openrouter", "OpenRouter (100+ models, pay-per-use)"),
("nous", "Nous Portal (Nous Research subscription)"),
("openai-codex", "OpenAI Codex"),
("custom", "Custom endpoint (self-hosted / VLLM / etc.)"),
]
# Reorder so the active provider is at the top
active_key = active if active in ("openrouter", "nous") else "custom"
active_key = active if active in ("openrouter", "nous", "openai-codex") else "custom"
ordered = []
for key, label in providers:
if key == active_key:
@@ -359,6 +367,8 @@ def cmd_model(args):
_model_flow_openrouter(config, current_model)
elif selected_provider == "nous":
_model_flow_nous(config, current_model)
elif selected_provider == "openai-codex":
_model_flow_openai_codex(config, current_model)
elif selected_provider == "custom":
_model_flow_custom(config)
@@ -512,6 +522,46 @@ def _model_flow_nous(config, current_model=""):
print("No change.")
def _model_flow_openai_codex(config, current_model=""):
"""OpenAI Codex provider: ensure logged in, then pick model."""
from hermes_cli.auth import (
get_codex_auth_status, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, _login_openai_codex,
PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
)
from hermes_cli.codex_models import get_codex_model_ids
from hermes_cli.config import get_env_value, save_env_value
import argparse
status = get_codex_auth_status()
if not status.get("logged_in"):
print("Not logged into OpenAI Codex. Starting login...")
print()
try:
mock_args = argparse.Namespace()
_login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
except SystemExit:
print("Login cancelled or failed.")
return
except Exception as exc:
print(f"Login failed: {exc}")
return
codex_models = get_codex_model_ids()
selected = _prompt_model_selection(codex_models, current_model=current_model)
if selected:
_save_model_choice(selected)
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
# Clear custom endpoint env vars that would otherwise override Codex.
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
print(f"Default model set to: {selected} (via OpenAI Codex)")
else:
print("No change.")
def _model_flow_custom(config):
"""Custom endpoint: collect URL, API key, and model name."""
from hermes_cli.auth import _save_model_choice, deactivate_provider
@@ -857,7 +907,7 @@ For more help on a command:
)
chat_parser.add_argument(
"--provider",
choices=["auto", "openrouter", "nous"],
choices=["auto", "openrouter", "nous", "openai-codex"],
default=None,
help="Inference provider (default: auto)"
)
@@ -966,9 +1016,9 @@ For more help on a command:
)
login_parser.add_argument(
"--provider",
choices=["nous"],
choices=["nous", "openai-codex"],
default=None,
help="Provider to authenticate with (default: interactive selection)"
help="Provider to authenticate with (default: nous)"
)
login_parser.add_argument(
"--portal-url",
@@ -1020,7 +1070,7 @@ For more help on a command:
)
logout_parser.add_argument(
"--provider",
choices=["nous"],
choices=["nous", "openai-codex"],
default=None,
help="Provider to log out from (default: active provider)"
)

View File

@@ -0,0 +1,149 @@
"""Shared runtime provider resolution for CLI, gateway, cron, and helpers."""
from __future__ import annotations
import os
from typing import Any, Dict, Optional
from hermes_cli.auth import (
AuthError,
format_auth_error,
resolve_provider,
resolve_nous_runtime_credentials,
resolve_codex_runtime_credentials,
)
from hermes_cli.config import load_config
from hermes_constants import OPENROUTER_BASE_URL
def _get_model_config() -> Dict[str, Any]:
config = load_config()
model_cfg = config.get("model")
if isinstance(model_cfg, dict):
return dict(model_cfg)
if isinstance(model_cfg, str) and model_cfg.strip():
return {"default": model_cfg.strip()}
return {}
def resolve_requested_provider(requested: Optional[str] = None) -> str:
"""Resolve provider request from explicit arg, env, then config."""
if requested and requested.strip():
return requested.strip().lower()
env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
if env_provider:
return env_provider
model_cfg = _get_model_config()
cfg_provider = model_cfg.get("provider")
if isinstance(cfg_provider, str) and cfg_provider.strip():
return cfg_provider.strip().lower()
return "auto"
def _resolve_openrouter_runtime(
*,
requested_provider: str,
explicit_api_key: Optional[str] = None,
explicit_base_url: Optional[str] = None,
) -> Dict[str, Any]:
model_cfg = _get_model_config()
cfg_base_url = model_cfg.get("base_url") if isinstance(model_cfg.get("base_url"), str) else ""
cfg_provider = model_cfg.get("provider") if isinstance(model_cfg.get("provider"), str) else ""
requested_norm = (requested_provider or "").strip().lower()
cfg_provider = cfg_provider.strip().lower()
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
use_config_base_url = False
if requested_norm == "auto":
if cfg_base_url.strip() and not explicit_base_url and not env_openai_base_url:
if not cfg_provider or cfg_provider == "auto":
use_config_base_url = True
base_url = (
(explicit_base_url or "").strip()
or env_openai_base_url
or (cfg_base_url.strip() if use_config_base_url else "")
or env_openrouter_base_url
or OPENROUTER_BASE_URL
).rstrip("/")
api_key = (
explicit_api_key
or os.getenv("OPENAI_API_KEY")
or os.getenv("OPENROUTER_API_KEY")
or ""
)
source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config"
return {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": base_url,
"api_key": api_key,
"source": source,
}
def resolve_runtime_provider(
*,
requested: Optional[str] = None,
explicit_api_key: Optional[str] = None,
explicit_base_url: Optional[str] = None,
) -> Dict[str, Any]:
"""Resolve runtime provider credentials for agent execution."""
requested_provider = resolve_requested_provider(requested)
provider = resolve_provider(
requested_provider,
explicit_api_key=explicit_api_key,
explicit_base_url=explicit_base_url,
)
if provider == "nous":
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
return {
"provider": "nous",
"api_mode": "chat_completions",
"base_url": creds.get("base_url", "").rstrip("/"),
"api_key": creds.get("api_key", ""),
"source": creds.get("source", "portal"),
"expires_at": creds.get("expires_at"),
"requested_provider": requested_provider,
}
if provider == "openai-codex":
creds = resolve_codex_runtime_credentials()
return {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": creds.get("base_url", "").rstrip("/"),
"api_key": creds.get("api_key", ""),
"source": creds.get("source", "codex-auth-json"),
"auth_file": creds.get("auth_file"),
"codex_home": creds.get("codex_home"),
"last_refresh": creds.get("last_refresh"),
"requested_provider": requested_provider,
}
runtime = _resolve_openrouter_runtime(
requested_provider=requested_provider,
explicit_api_key=explicit_api_key,
explicit_base_url=explicit_base_url,
)
runtime["requested_provider"] = requested_provider
return runtime
def format_runtime_provider_error(error: Exception) -> str:
if isinstance(error, AuthError):
return format_auth_error(error)
return str(error)

View File

@@ -620,6 +620,7 @@ def run_setup_wizard(args):
get_active_provider, get_provider_auth_state, PROVIDER_REGISTRY,
format_auth_error, AuthError, fetch_nous_models,
resolve_nous_runtime_credentials, _update_config_for_provider,
_login_openai_codex, get_codex_auth_status, DEFAULT_CODEX_BASE_URL,
)
existing_custom = get_env_value("OPENAI_BASE_URL")
existing_or = get_env_value("OPENROUTER_API_KEY")
@@ -640,6 +641,7 @@ def run_setup_wizard(args):
provider_choices = [
"Login with Nous Portal (Nous Research subscription)",
"Login with OpenAI Codex",
"OpenRouter API key (100+ models, pay-per-use)",
"Custom OpenAI-compatible endpoint (self-hosted / VLLM / etc.)",
]
@@ -647,7 +649,7 @@ def run_setup_wizard(args):
provider_choices.append(keep_label)
# Default to "Keep current" if a provider exists, otherwise OpenRouter (most common)
default_provider = len(provider_choices) - 1 if has_any_provider else 1
default_provider = len(provider_choices) - 1 if has_any_provider else 2
if not has_any_provider:
print_warning("An inference provider is required for Hermes to work.")
@@ -656,7 +658,7 @@ def run_setup_wizard(args):
provider_idx = prompt_choice("Select your inference provider:", provider_choices, default_provider)
# Track which provider was selected for model step
selected_provider = None # "nous", "openrouter", "custom", or None (keep)
selected_provider = None # "nous", "openai-codex", "openrouter", "custom", or None (keep)
nous_models = [] # populated if Nous login succeeds
if provider_idx == 0: # Nous Portal
@@ -699,7 +701,31 @@ def run_setup_wizard(args):
print_info("You can try again later with: hermes login")
selected_provider = None
elif provider_idx == 1: # OpenRouter
elif provider_idx == 1: # OpenAI Codex
selected_provider = "openai-codex"
print()
print_header("OpenAI Codex Login")
print()
try:
import argparse
mock_args = argparse.Namespace()
_login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
# Clear custom endpoint vars that would override provider routing.
if existing_custom:
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
except SystemExit:
print_warning("OpenAI Codex login was cancelled or failed.")
print_info("You can try again later with: hermes login --provider openai-codex")
selected_provider = None
except Exception as e:
print_error(f"Login failed: {e}")
print_info("You can try again later with: hermes login --provider openai-codex")
selected_provider = None
elif provider_idx == 2: # OpenRouter
selected_provider = "openrouter"
print()
print_header("OpenRouter API Key")
@@ -726,7 +752,7 @@ def run_setup_wizard(args):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
elif provider_idx == 2: # Custom endpoint
elif provider_idx == 3: # Custom endpoint
selected_provider = "custom"
print()
print_header("Custom OpenAI-Compatible Endpoint")
@@ -753,14 +779,14 @@ def run_setup_wizard(args):
config['model'] = model_name
save_env_value("LLM_MODEL", model_name)
print_success("Custom endpoint configured")
# else: provider_idx == 3 (Keep current) — only shown when a provider already exists
# else: provider_idx == 4 (Keep current) — only shown when a provider already exists
# =========================================================================
# Step 1b: OpenRouter API Key for tools (if not already set)
# =========================================================================
# Tools (vision, web, MoA) use OpenRouter independently of the main provider.
# Prompt for OpenRouter key if not set and a non-OpenRouter provider was chosen.
if selected_provider in ("nous", "custom") and not get_env_value("OPENROUTER_API_KEY"):
if selected_provider in ("nous", "openai-codex", "custom") and not get_env_value("OPENROUTER_API_KEY"):
print()
print_header("OpenRouter API Key (for tools)")
print_info("Tools like vision analysis, web search, and MoA use OpenRouter")
@@ -806,6 +832,25 @@ def run_setup_wizard(args):
config['model'] = custom
save_env_value("LLM_MODEL", custom)
# else: keep current
elif selected_provider == "openai-codex":
from hermes_cli.codex_models import get_codex_model_ids
codex_models = get_codex_model_ids()
model_choices = [f"{m}" for m in codex_models]
model_choices.append("Custom model")
model_choices.append(f"Keep current ({current_model})")
keep_idx = len(model_choices) - 1
model_idx = prompt_choice("Select default model:", model_choices, keep_idx)
if model_idx < len(codex_models):
config['model'] = codex_models[model_idx]
save_env_value("LLM_MODEL", codex_models[model_idx])
elif model_idx == len(codex_models):
custom = prompt("Enter model name")
if custom:
config['model'] = custom
save_env_value("LLM_MODEL", custom)
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
else:
# Static list for OpenRouter / fallback (from canonical list)
from hermes_cli.models import model_ids, menu_labels

View File

@@ -101,10 +101,12 @@ def show_status(args):
print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
try:
from hermes_cli.auth import get_nous_auth_status
from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
nous_status = get_nous_auth_status()
codex_status = get_codex_auth_status()
except Exception:
nous_status = {}
codex_status = {}
nous_logged_in = bool(nous_status.get("logged_in"))
print(
@@ -121,6 +123,20 @@ def show_status(args):
print(f" Key exp: {key_exp}")
print(f" Refresh: {refresh_label}")
codex_logged_in = bool(codex_status.get("logged_in"))
print(
f" {'OpenAI Codex':<12} {check_mark(codex_logged_in)} "
f"{'logged in' if codex_logged_in else 'not logged in (run: hermes login --provider openai-codex)'}"
)
codex_auth_file = codex_status.get("auth_file")
if codex_auth_file:
print(f" Auth file: {codex_auth_file}")
codex_last_refresh = _format_iso_timestamp(codex_status.get("last_refresh"))
if codex_status.get("last_refresh"):
print(f" Refreshed: {codex_last_refresh}")
if codex_status.get("error") and not codex_logged_in:
print(f" Error: {codex_status.get('error')}")
# =========================================================================
# Terminal Configuration
# =========================================================================

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,219 @@
import json
import time
import base64
from contextlib import contextmanager
from pathlib import Path
from types import SimpleNamespace
import pytest
import yaml
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
PROVIDER_REGISTRY,
_persist_codex_auth_payload,
_login_openai_codex,
login_command,
get_codex_auth_status,
get_provider_auth_state,
read_codex_auth_file,
resolve_codex_runtime_credentials,
resolve_provider,
)
def _write_codex_auth(codex_home: Path, *, access_token: str = "access", refresh_token: str = "refresh") -> Path:
codex_home.mkdir(parents=True, exist_ok=True)
auth_file = codex_home / "auth.json"
auth_file.write_text(
json.dumps(
{
"auth_mode": "oauth",
"last_refresh": "2026-02-26T00:00:00Z",
"tokens": {
"access_token": access_token,
"refresh_token": refresh_token,
},
}
)
)
return auth_file
def _jwt_with_exp(exp_epoch: int) -> str:
payload = {"exp": exp_epoch}
encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).rstrip(b"=").decode("utf-8")
return f"h.{encoded}.s"
def test_read_codex_auth_file_success(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
auth_file = _write_codex_auth(codex_home)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
payload = read_codex_auth_file()
assert payload["auth_path"] == auth_file
assert payload["tokens"]["access_token"] == "access"
assert payload["tokens"]["refresh_token"] == "refresh"
def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
with pytest.raises(AuthError) as exc:
resolve_codex_runtime_credentials()
assert exc.value.code == "codex_auth_missing_access_token"
assert exc.value.relogin_required is True
def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
expiring_token = _jwt_with_exp(int(time.time()) - 10)
_write_codex_auth(codex_home, access_token=expiring_token, refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
called["count"] += 1
assert auth_path == codex_home / "auth.json"
assert lock_held is True
return {"access_token": "access-new", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials()
assert called["count"] == 1
assert resolved["api_key"] == "access-new"
def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
called = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
called["count"] += 1
assert lock_held is True
return {"access_token": "access-forced", "refresh_token": "refresh-new"}
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
assert called["count"] == 1
assert resolved["api_key"] == "access-forced"
def test_resolve_codex_runtime_credentials_uses_file_lock_on_refresh(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
monkeypatch.setenv("CODEX_HOME", str(codex_home))
lock_calls = {"enter": 0, "exit": 0}
@contextmanager
def _fake_lock(auth_path, timeout_seconds=15.0):
assert auth_path == codex_home / "auth.json"
lock_calls["enter"] += 1
try:
yield
finally:
lock_calls["exit"] += 1
refresh_calls = {"count": 0}
def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
refresh_calls["count"] += 1
assert lock_held is True
return {"access_token": "access-updated", "refresh_token": "refresh-updated"}
monkeypatch.setattr("hermes_cli.auth._codex_auth_file_lock", _fake_lock)
monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
assert refresh_calls["count"] == 1
assert lock_calls["enter"] == 1
assert lock_calls["exit"] == 1
assert resolved["api_key"] == "access-updated"
def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
assert resolve_provider("openai-codex") == "openai-codex"
def test_persist_codex_auth_payload_writes_atomically(tmp_path):
auth_path = tmp_path / "auth.json"
auth_path.write_text('{"stale":true}\n')
payload = {
"auth_mode": "oauth",
"tokens": {
"access_token": "next-access",
"refresh_token": "next-refresh",
},
"last_refresh": "2026-02-26T00:00:00Z",
}
_persist_codex_auth_payload(auth_path, payload)
stored = json.loads(auth_path.read_text())
assert stored == payload
assert list(tmp_path.glob(".auth.json.*.tmp")) == []
def test_get_codex_auth_status_not_logged_in(tmp_path, monkeypatch):
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "missing-codex-home"))
status = get_codex_auth_status()
assert status["logged_in"] is False
assert "error" in status
def test_login_openai_codex_persists_provider_state(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes-home"
codex_home = tmp_path / "codex-home"
_write_codex_auth(codex_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(codex_home))
monkeypatch.setattr("hermes_cli.auth.shutil.which", lambda _: "/usr/local/bin/codex")
monkeypatch.setattr("hermes_cli.auth.subprocess.run", lambda *a, **k: None)
_login_openai_codex(SimpleNamespace(), PROVIDER_REGISTRY["openai-codex"])
state = get_provider_auth_state("openai-codex")
assert state is not None
assert state["source"] == "codex-auth-json"
assert state["auth_file"].endswith("auth.json")
config_path = hermes_home / "config.yaml"
config = yaml.safe_load(config_path.read_text())
assert config["model"]["provider"] == "openai-codex"
assert config["model"]["base_url"] == DEFAULT_CODEX_BASE_URL
def test_login_command_defaults_to_nous(monkeypatch):
calls = {"nous": 0, "codex": 0}
def _fake_nous(args, pconfig):
calls["nous"] += 1
def _fake_codex(args, pconfig):
calls["codex"] += 1
monkeypatch.setattr("hermes_cli.auth._login_nous", _fake_nous)
monkeypatch.setattr("hermes_cli.auth._login_openai_codex", _fake_codex)
login_command(SimpleNamespace())
assert calls["nous"] == 1
assert calls["codex"] == 0

View File

@@ -0,0 +1,187 @@
import importlib
import sys
import types
from contextlib import nullcontext
from types import SimpleNamespace
from hermes_cli.auth import AuthError
from hermes_cli import main as hermes_main
def _install_prompt_toolkit_stubs():
class _Dummy:
def __init__(self, *args, **kwargs):
pass
class _Condition:
def __init__(self, func):
self.func = func
def __bool__(self):
return bool(self.func())
class _ANSI(str):
pass
root = types.ModuleType("prompt_toolkit")
history = types.ModuleType("prompt_toolkit.history")
styles = types.ModuleType("prompt_toolkit.styles")
patch_stdout = types.ModuleType("prompt_toolkit.patch_stdout")
application = types.ModuleType("prompt_toolkit.application")
layout = types.ModuleType("prompt_toolkit.layout")
processors = types.ModuleType("prompt_toolkit.layout.processors")
filters = types.ModuleType("prompt_toolkit.filters")
dimension = types.ModuleType("prompt_toolkit.layout.dimension")
menus = types.ModuleType("prompt_toolkit.layout.menus")
widgets = types.ModuleType("prompt_toolkit.widgets")
key_binding = types.ModuleType("prompt_toolkit.key_binding")
completion = types.ModuleType("prompt_toolkit.completion")
formatted_text = types.ModuleType("prompt_toolkit.formatted_text")
history.FileHistory = _Dummy
styles.Style = _Dummy
patch_stdout.patch_stdout = lambda *args, **kwargs: nullcontext()
application.Application = _Dummy
layout.Layout = _Dummy
layout.HSplit = _Dummy
layout.Window = _Dummy
layout.FormattedTextControl = _Dummy
layout.ConditionalContainer = _Dummy
processors.Processor = _Dummy
processors.Transformation = _Dummy
processors.PasswordProcessor = _Dummy
processors.ConditionalProcessor = _Dummy
filters.Condition = _Condition
dimension.Dimension = _Dummy
menus.CompletionsMenu = _Dummy
widgets.TextArea = _Dummy
key_binding.KeyBindings = _Dummy
completion.Completer = _Dummy
completion.Completion = _Dummy
formatted_text.ANSI = _ANSI
root.print_formatted_text = lambda *args, **kwargs: None
sys.modules.setdefault("prompt_toolkit", root)
sys.modules.setdefault("prompt_toolkit.history", history)
sys.modules.setdefault("prompt_toolkit.styles", styles)
sys.modules.setdefault("prompt_toolkit.patch_stdout", patch_stdout)
sys.modules.setdefault("prompt_toolkit.application", application)
sys.modules.setdefault("prompt_toolkit.layout", layout)
sys.modules.setdefault("prompt_toolkit.layout.processors", processors)
sys.modules.setdefault("prompt_toolkit.filters", filters)
sys.modules.setdefault("prompt_toolkit.layout.dimension", dimension)
sys.modules.setdefault("prompt_toolkit.layout.menus", menus)
sys.modules.setdefault("prompt_toolkit.widgets", widgets)
sys.modules.setdefault("prompt_toolkit.key_binding", key_binding)
sys.modules.setdefault("prompt_toolkit.completion", completion)
sys.modules.setdefault("prompt_toolkit.formatted_text", formatted_text)
def _import_cli():
try:
importlib.import_module("prompt_toolkit")
except ModuleNotFoundError:
_install_prompt_toolkit_stubs()
return importlib.import_module("cli")
def test_hermes_cli_init_does_not_eagerly_resolve_runtime_provider(monkeypatch):
cli = _import_cli()
calls = {"count": 0}
def _unexpected_runtime_resolve(**kwargs):
calls["count"] += 1
raise AssertionError("resolve_runtime_provider should not be called in HermesCLI.__init__")
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _unexpected_runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
assert shell is not None
assert calls["count"] == 0
def test_runtime_resolution_failure_is_not_sticky(monkeypatch):
cli = _import_cli()
calls = {"count": 0}
def _runtime_resolve(**kwargs):
calls["count"] += 1
if calls["count"] == 1:
raise RuntimeError("temporary auth failure")
return {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "test-key",
"source": "env/config",
}
class _DummyAgent:
def __init__(self, *args, **kwargs):
self.kwargs = kwargs
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
monkeypatch.setattr(cli, "AIAgent", _DummyAgent)
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
assert shell._init_agent() is False
assert shell._init_agent() is True
assert calls["count"] == 2
assert shell.agent is not None
def test_runtime_resolution_rebuilds_agent_on_routing_change(monkeypatch):
cli = _import_cli()
def _runtime_resolve(**kwargs):
return {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://same-endpoint.example/v1",
"api_key": "same-key",
"source": "env/config",
}
monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
shell.provider = "openrouter"
shell.api_mode = "chat_completions"
shell.base_url = "https://same-endpoint.example/v1"
shell.api_key = "same-key"
shell.agent = object()
assert shell._ensure_runtime_credentials() is True
assert shell.agent is None
assert shell.provider == "openai-codex"
assert shell.api_mode == "codex_responses"
def test_cmd_model_falls_back_to_auto_on_invalid_provider(monkeypatch, capsys):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"model": {"default": "gpt-5", "provider": "invalid-provider"}},
)
monkeypatch.setattr("hermes_cli.config.save_config", lambda cfg: None)
monkeypatch.setattr("hermes_cli.config.get_env_value", lambda key: "")
monkeypatch.setattr("hermes_cli.config.save_env_value", lambda key, value: None)
def _resolve_provider(requested, **kwargs):
if requested == "invalid-provider":
raise AuthError("Unknown provider 'invalid-provider'.", code="invalid_provider")
return "openrouter"
monkeypatch.setattr("hermes_cli.auth.resolve_provider", _resolve_provider)
monkeypatch.setattr(hermes_main, "_prompt_provider_choice", lambda choices: len(choices) - 1)
hermes_main.cmd_model(SimpleNamespace())
output = capsys.readouterr().out
assert "Warning:" in output
assert "falling back to auto provider detection" in output.lower()
assert "No change." in output

View File

@@ -0,0 +1,175 @@
import asyncio
import sys
import types
from types import SimpleNamespace
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import cron.scheduler as cron_scheduler
import gateway.run as gateway_run
import run_agent
from gateway.config import Platform
from gateway.session import SessionSource
def _patch_agent_bootstrap(monkeypatch):
monkeypatch.setattr(
run_agent,
"get_tool_definitions",
lambda **kwargs: [
{
"type": "function",
"function": {
"name": "terminal",
"description": "Run shell commands.",
"parameters": {"type": "object", "properties": {}},
},
}
],
)
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
def _codex_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
status="completed",
model="gpt-5-codex",
)
class _UnauthorizedError(RuntimeError):
def __init__(self):
super().__init__("Error code: 401 - unauthorized")
self.status_code = 401
class _FakeOpenAI:
def __init__(self, **kwargs):
self.kwargs = kwargs
def close(self):
return None
class _Codex401ThenSuccessAgent(run_agent.AIAgent):
refresh_attempts = 0
last_init = {}
def __init__(self, *args, **kwargs):
kwargs.setdefault("skip_context_files", True)
kwargs.setdefault("skip_memory", True)
kwargs.setdefault("max_iterations", 4)
type(self).last_init = dict(kwargs)
super().__init__(*args, **kwargs)
self._cleanup_task_resources = lambda task_id: None
self._persist_session = lambda messages, history=None: None
self._save_trajectory = lambda messages, user_message, completed: None
self._save_session_log = lambda messages: None
def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
type(self).refresh_attempts += 1
return True
def run_conversation(self, user_message: str, conversation_history=None):
calls = {"api": 0}
def _fake_api_call(api_kwargs):
calls["api"] += 1
if calls["api"] == 1:
raise _UnauthorizedError()
return _codex_message_response("Recovered via refresh")
self._interruptible_api_call = _fake_api_call
return super().run_conversation(user_message, conversation_history=conversation_history)
def test_cron_run_job_codex_path_handles_internal_401_refresh(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda requested=None: {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
},
)
monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
_Codex401ThenSuccessAgent.refresh_attempts = 0
_Codex401ThenSuccessAgent.last_init = {}
success, output, final_response, error = cron_scheduler.run_job(
{"id": "job-1", "name": "Codex Refresh Test", "prompt": "ping"}
)
assert success is True
assert error is None
assert final_response == "Recovered via refresh"
assert "Recovered via refresh" in output
assert _Codex401ThenSuccessAgent.refresh_attempts == 1
assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"
def test_gateway_run_agent_codex_path_handles_internal_401_refresh(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
},
)
monkeypatch.setenv("HERMES_TOOL_PROGRESS", "false")
_Codex401ThenSuccessAgent.refresh_attempts = 0
_Codex401ThenSuccessAgent.last_init = {}
runner = gateway_run.GatewayRunner.__new__(gateway_run.GatewayRunner)
runner.adapters = {}
runner._ephemeral_system_prompt = ""
runner._prefill_messages = []
runner._reasoning_config = None
runner._running_agents = {}
source = SessionSource(
platform=Platform.LOCAL,
chat_id="cli",
chat_name="CLI",
chat_type="dm",
user_id="user-1",
)
result = asyncio.run(
runner._run_agent(
message="ping",
context_prompt="",
history=[],
source=source,
session_id="session-1",
session_key="agent:main:local:dm",
)
)
assert result["final_response"] == "Recovered via refresh"
assert _Codex401ThenSuccessAgent.refresh_attempts == 1
assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"

View File

@@ -0,0 +1,40 @@
import json
from hermes_cli.codex_models import DEFAULT_CODEX_MODELS, get_codex_model_ids
def test_get_codex_model_ids_prioritizes_default_and_cache(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
codex_home.mkdir(parents=True, exist_ok=True)
(codex_home / "config.toml").write_text('model = "gpt-5.2-codex"\n')
(codex_home / "models_cache.json").write_text(
json.dumps(
{
"models": [
{"slug": "gpt-5.3-codex", "priority": 20, "supported_in_api": True},
{"slug": "gpt-5.1-codex", "priority": 5, "supported_in_api": True},
{"slug": "gpt-4o", "priority": 1, "supported_in_api": True},
{"slug": "gpt-5-hidden-codex", "priority": 2, "visibility": "hidden"},
]
}
)
)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
models = get_codex_model_ids()
assert models[0] == "gpt-5.2-codex"
assert "gpt-5.1-codex" in models
assert "gpt-5.3-codex" in models
assert "gpt-4o" not in models
assert "gpt-5-hidden-codex" not in models
def test_get_codex_model_ids_falls_back_to_curated_defaults(tmp_path, monkeypatch):
codex_home = tmp_path / "codex-home"
codex_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
models = get_codex_model_ids()
assert models[: len(DEFAULT_CODEX_MODELS)] == DEFAULT_CODEX_MODELS

View File

@@ -0,0 +1,733 @@
import sys
import types
from types import SimpleNamespace
import pytest
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import run_agent
def _patch_agent_bootstrap(monkeypatch):
monkeypatch.setattr(
run_agent,
"get_tool_definitions",
lambda **kwargs: [
{
"type": "function",
"function": {
"name": "terminal",
"description": "Run shell commands.",
"parameters": {"type": "object", "properties": {}},
},
}
],
)
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
def _build_agent(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://chatgpt.com/backend-api/codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=4,
skip_context_files=True,
skip_memory=True,
)
agent._cleanup_task_resources = lambda task_id: None
agent._persist_session = lambda messages, history=None: None
agent._save_trajectory = lambda messages, user_message, completed: None
agent._save_session_log = lambda messages: None
return agent
def _codex_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
status="completed",
model="gpt-5-codex",
)
def _codex_tool_call_response():
return SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
id="fc_1",
call_id="call_1",
name="terminal",
arguments="{}",
)
],
usage=SimpleNamespace(input_tokens=12, output_tokens=4, total_tokens=16),
status="completed",
model="gpt-5-codex",
)
def _codex_incomplete_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
status="in_progress",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="in_progress",
model="gpt-5-codex",
)
def _codex_commentary_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
phase="commentary",
status="completed",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="completed",
model="gpt-5-codex",
)
def _codex_ack_message_response(text: str):
return SimpleNamespace(
output=[
SimpleNamespace(
type="message",
status="completed",
content=[SimpleNamespace(type="output_text", text=text)],
)
],
usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
status="completed",
model="gpt-5-codex",
)
class _FakeResponsesStream:
def __init__(self, *, final_response=None, final_error=None):
self._final_response = final_response
self._final_error = final_error
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def __iter__(self):
return iter(())
def get_final_response(self):
if self._final_error is not None:
raise self._final_error
return self._final_response
class _FakeCreateStream:
def __init__(self, events):
self._events = list(events)
self.closed = False
def __iter__(self):
return iter(self._events)
def close(self):
self.closed = True
def _codex_request_kwargs():
return {
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [{"role": "user", "content": "Ping"}],
"tools": None,
"store": False,
}
def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://openrouter.ai/api/v1",
provider="openai-codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.api_mode == "codex_responses"
assert agent.provider == "openai-codex"
def test_api_mode_normalizes_provider_case(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://openrouter.ai/api/v1",
provider="OpenAI-Codex",
api_key="codex-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.provider == "openai-codex"
assert agent.api_mode == "codex_responses"
def test_api_mode_respects_explicit_openrouter_provider_over_codex_url(monkeypatch):
_patch_agent_bootstrap(monkeypatch)
agent = run_agent.AIAgent(
model="gpt-5-codex",
base_url="https://chatgpt.com/backend-api/codex",
provider="openrouter",
api_key="test-token",
quiet_mode=True,
max_iterations=1,
skip_context_files=True,
skip_memory=True,
)
assert agent.api_mode == "chat_completions"
assert agent.provider == "openrouter"
def test_build_api_kwargs_codex(monkeypatch):
agent = _build_agent(monkeypatch)
kwargs = agent._build_api_kwargs(
[
{"role": "system", "content": "You are Hermes."},
{"role": "user", "content": "Ping"},
]
)
assert kwargs["model"] == "gpt-5-codex"
assert kwargs["instructions"] == "You are Hermes."
assert kwargs["store"] is False
assert isinstance(kwargs["input"], list)
assert kwargs["input"][0]["role"] == "user"
assert kwargs["tools"][0]["type"] == "function"
assert kwargs["tools"][0]["name"] == "terminal"
assert kwargs["tools"][0]["strict"] is False
assert "function" not in kwargs["tools"][0]
assert kwargs["store"] is False
assert "timeout" not in kwargs
assert "max_tokens" not in kwargs
assert "extra_body" not in kwargs
def test_run_codex_stream_retries_when_completed_event_missing(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
if calls["stream"] == 1:
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
return _FakeResponsesStream(final_response=_codex_message_response("stream ok"))
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=lambda **kwargs: _codex_message_response("fallback"),
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert response.output[0].content[0].text == "stream ok"
def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0, "create": 0}
def _fake_stream(**kwargs):
calls["stream"] += 1
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
def _fake_create(**kwargs):
calls["create"] += 1
return _codex_message_response("create fallback ok")
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=_fake_create,
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert calls["create"] == 1
assert response.output[0].content[0].text == "create fallback ok"
def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"stream": 0, "create": 0}
create_stream = _FakeCreateStream(
[
SimpleNamespace(type="response.created"),
SimpleNamespace(type="response.in_progress"),
SimpleNamespace(type="response.completed", response=_codex_message_response("streamed create ok")),
]
)
def _fake_stream(**kwargs):
calls["stream"] += 1
return _FakeResponsesStream(
final_error=RuntimeError("Didn't receive a `response.completed` event.")
)
def _fake_create(**kwargs):
calls["create"] += 1
assert kwargs.get("stream") is True
return create_stream
agent.client = SimpleNamespace(
responses=SimpleNamespace(
stream=_fake_stream,
create=_fake_create,
)
)
response = agent._run_codex_stream(_codex_request_kwargs())
assert calls["stream"] == 2
assert calls["create"] == 1
assert create_stream.closed is True
assert response.output[0].content[0].text == "streamed create ok"
def test_run_conversation_codex_plain_text(monkeypatch):
agent = _build_agent(monkeypatch)
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
result = agent.run_conversation("Say OK")
assert result["completed"] is True
assert result["final_response"] == "OK"
assert result["messages"][-1]["role"] == "assistant"
assert result["messages"][-1]["content"] == "OK"
def test_run_conversation_codex_refreshes_after_401_and_retries(monkeypatch):
agent = _build_agent(monkeypatch)
calls = {"api": 0, "refresh": 0}
class _UnauthorizedError(RuntimeError):
def __init__(self):
super().__init__("Error code: 401 - unauthorized")
self.status_code = 401
def _fake_api_call(api_kwargs):
calls["api"] += 1
if calls["api"] == 1:
raise _UnauthorizedError()
return _codex_message_response("Recovered after refresh")
def _fake_refresh(*, force=True):
calls["refresh"] += 1
assert force is True
return True
monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
monkeypatch.setattr(agent, "_try_refresh_codex_client_credentials", _fake_refresh)
result = agent.run_conversation("Say OK")
assert calls["api"] == 2
assert calls["refresh"] == 1
assert result["completed"] is True
assert result["final_response"] == "Recovered after refresh"
def test_try_refresh_codex_client_credentials_rebuilds_client(monkeypatch):
agent = _build_agent(monkeypatch)
closed = {"value": False}
rebuilt = {"kwargs": None}
class _ExistingClient:
def close(self):
closed["value"] = True
class _RebuiltClient:
pass
def _fake_openai(**kwargs):
rebuilt["kwargs"] = kwargs
return _RebuiltClient()
monkeypatch.setattr(
"hermes_cli.auth.resolve_codex_runtime_credentials",
lambda force_refresh=True: {
"api_key": "new-codex-token",
"base_url": "https://chatgpt.com/backend-api/codex",
},
)
monkeypatch.setattr(run_agent, "OpenAI", _fake_openai)
agent.client = _ExistingClient()
ok = agent._try_refresh_codex_client_credentials(force=True)
assert ok is True
assert closed["value"] is True
assert rebuilt["kwargs"]["api_key"] == "new-codex-token"
assert rebuilt["kwargs"]["base_url"] == "https://chatgpt.com/backend-api/codex"
assert isinstance(agent.client, _RebuiltClient)
def test_run_conversation_codex_tool_round_trip(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [_codex_tool_call_response(), _codex_message_response("done")]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("run a command")
assert result["completed"] is True
assert result["final_response"] == "done"
assert any(msg.get("tool_calls") for msg in result["messages"] if msg.get("role") == "assistant")
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_chat_messages_to_responses_input_uses_call_id_for_function_call(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_abc123",
"type": "function",
"function": {"name": "terminal", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "call_abc123", "content": '{"ok":true}'},
]
)
function_call = next(item for item in items if item.get("type") == "function_call")
function_output = next(item for item in items if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_abc123"
assert "id" not in function_call
assert function_output["call_id"] == "call_abc123"
def test_chat_messages_to_responses_input_accepts_call_pipe_fc_ids(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_pair123|fc_pair123",
"type": "function",
"function": {"name": "terminal", "arguments": "{}"},
}
],
},
{"role": "tool", "tool_call_id": "call_pair123|fc_pair123", "content": '{"ok":true}'},
]
)
function_call = next(item for item in items if item.get("type") == "function_call")
function_output = next(item for item in items if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_pair123"
assert "id" not in function_call
assert function_output["call_id"] == "call_pair123"
def test_preflight_codex_api_kwargs_strips_optional_function_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
preflight = agent._preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [
{"role": "user", "content": "hi"},
{
"type": "function_call",
"id": "call_bad",
"call_id": "call_good",
"name": "terminal",
"arguments": "{}",
},
],
"tools": [],
"store": False,
}
)
fn_call = next(item for item in preflight["input"] if item.get("type") == "function_call")
assert fn_call["call_id"] == "call_good"
assert "id" not in fn_call
def test_preflight_codex_api_kwargs_rejects_function_call_output_without_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
with pytest.raises(ValueError, match="function_call_output is missing call_id"):
agent._preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
"input": [{"type": "function_call_output", "output": "{}"}],
"tools": [],
"store": False,
}
)
def test_preflight_codex_api_kwargs_rejects_unsupported_request_fields(monkeypatch):
agent = _build_agent(monkeypatch)
kwargs = _codex_request_kwargs()
kwargs["temperature"] = 0
with pytest.raises(ValueError, match="unsupported field"):
agent._preflight_codex_api_kwargs(kwargs)
def test_run_conversation_codex_replay_payload_keeps_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [_codex_tool_call_response(), _codex_message_response("done")]
requests = []
def _fake_api_call(api_kwargs):
requests.append(api_kwargs)
return responses.pop(0)
monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("run a command")
assert result["completed"] is True
assert result["final_response"] == "done"
assert len(requests) >= 2
replay_input = requests[1]["input"]
function_call = next(item for item in replay_input if item.get("type") == "function_call")
function_output = next(item for item in replay_input if item.get("type") == "function_call_output")
assert function_call["call_id"] == "call_1"
assert "id" not in function_call
assert function_output["call_id"] == "call_1"
def test_run_conversation_codex_continues_after_incomplete_interim_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_incomplete_message_response("I'll inspect the repo structure first."),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("analyze repo")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect the repo structure" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(monkeypatch):
agent = _build_agent(monkeypatch)
assistant_message, finish_reason = agent._normalize_codex_response(
_codex_commentary_message_response("I'll inspect the repository first.")
)
assert finish_reason == "incomplete"
assert "inspect the repository" in (assistant_message.content or "")
def test_run_conversation_codex_continues_after_commentary_phase_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_commentary_message_response("I'll inspect the repo structure first."),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("analyze repo")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect the repo structure" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_run_conversation_codex_continues_after_ack_stop_message(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_ack_message_response(
"Absolutely — I can do that. I'll inspect ~/openclaw-studio and report back with a walkthrough."
),
_codex_tool_call_response(),
_codex_message_response("Architecture summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("look into ~/openclaw-studio and tell me how it works")
assert result["completed"] is True
assert result["final_response"] == "Architecture summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "inspect ~/openclaw-studio" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(
msg.get("role") == "user"
and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
def test_run_conversation_codex_continues_after_ack_for_directory_listing_prompt(monkeypatch):
agent = _build_agent(monkeypatch)
responses = [
_codex_ack_message_response(
"I'll check what's in the current directory and call out 3 notable items."
),
_codex_tool_call_response(),
_codex_message_response("Directory summary complete."),
]
monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
for call in assistant_message.tool_calls:
messages.append(
{
"role": "tool",
"tool_call_id": call.id,
"content": '{"ok":true}',
}
)
monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
result = agent.run_conversation("look at current directory and list 3 notable things")
assert result["completed"] is True
assert result["final_response"] == "Directory summary complete."
assert any(
msg.get("role") == "assistant"
and msg.get("finish_reason") == "incomplete"
and "current directory" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(
msg.get("role") == "user"
and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
for msg in result["messages"]
)
assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])

View File

@@ -0,0 +1,95 @@
from hermes_cli import runtime_provider as rp
def test_resolve_runtime_provider_codex(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(
rp,
"resolve_codex_runtime_credentials",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
"source": "codex-auth-json",
"auth_file": "/tmp/auth.json",
"codex_home": "/tmp/codex",
"last_refresh": "2026-02-26T00:00:00Z",
},
)
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["provider"] == "openai-codex"
assert resolved["api_mode"] == "codex_responses"
assert resolved["base_url"] == "https://chatgpt.com/backend-api/codex"
assert resolved["api_key"] == "codex-token"
assert resolved["requested_provider"] == "openai-codex"
def test_resolve_runtime_provider_openrouter_explicit(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(
requested="openrouter",
explicit_api_key="test-key",
explicit_base_url="https://example.com/v1/",
)
assert resolved["provider"] == "openrouter"
assert resolved["api_mode"] == "chat_completions"
assert resolved["api_key"] == "test-key"
assert resolved["base_url"] == "https://example.com/v1"
assert resolved["source"] == "explicit"
def test_resolve_runtime_provider_openrouter_ignores_codex_config_base_url(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
},
)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="openrouter")
assert resolved["provider"] == "openrouter"
assert resolved["base_url"] == rp.OPENROUTER_BASE_URL
def test_resolve_runtime_provider_auto_uses_custom_config_base_url(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "auto",
"base_url": "https://custom.example/v1/",
},
)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="auto")
assert resolved["provider"] == "openrouter"
assert resolved["base_url"] == "https://custom.example/v1"
def test_resolve_requested_provider_precedence(monkeypatch):
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})
assert rp.resolve_requested_provider("openrouter") == "openrouter"

View File

@@ -30,6 +30,9 @@ def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
parent.api_key = "parent-key"
parent.provider = "openrouter"
parent.api_mode = "chat_completions"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
@@ -218,6 +221,30 @@ class TestDelegateTask(unittest.TestCase):
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
def test_child_inherits_runtime_credentials(self):
parent = _make_mock_parent(depth=0)
parent.base_url = "https://chatgpt.com/backend-api/codex"
parent.api_key = "codex-token"
parent.provider = "openai-codex"
parent.api_mode = "codex_responses"
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "ok",
"completed": True,
"api_calls": 1,
}
MockAgent.return_value = mock_child
delegate_task(goal="Test runtime inheritance", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["base_url"], parent.base_url)
self.assertEqual(kwargs["api_key"], parent.api_key)
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["api_mode"], parent.api_mode)
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):

View File

@@ -120,15 +120,17 @@ def _run_single_child(
pass
try:
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal)
parent_api_key = None
if hasattr(parent_agent, '_client_kwargs'):
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
parent_api_key = getattr(parent_agent, "api_key", None)
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
parent_api_key = parent_agent._client_kwargs.get("api_key")
child = AIAgent(
base_url=parent_agent.base_url,
api_key=parent_api_key,
model=model or parent_agent.model,
provider=getattr(parent_agent, "provider", None),
api_mode=getattr(parent_agent, "api_mode", None),
max_iterations=max_iterations,
enabled_toolsets=child_toolsets,
quiet_mode=True,