diff --git a/cli.py b/cli.py
index a0ccdf55b..b85edc6b7 100755
--- a/cli.py
+++ b/cli.py
@@ -803,7 +803,7 @@ class HermesCLI:
Args:
model: Model to use (default: from env or claude-sonnet)
toolsets: List of toolsets to enable (default: all)
- provider: Inference provider ("auto", "openrouter", "nous")
+ provider: Inference provider ("auto", "openrouter", "nous", "openai-codex")
api_key: API key (default: from environment)
base_url: API base URL (default: OpenRouter)
max_turns: Maximum tool-calling iterations (default: 60)
@@ -821,26 +821,28 @@ class HermesCLI:
# Configuration - priority: CLI args > env vars > config file
# Model can come from: CLI arg, LLM_MODEL env, OPENAI_MODEL env (custom endpoint), or config
self.model = model or os.getenv("LLM_MODEL") or os.getenv("OPENAI_MODEL") or CLI_CONFIG["model"]["default"]
-
- # Base URL: custom endpoint (OPENAI_BASE_URL) takes precedence over OpenRouter
- self.base_url = base_url or os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
-
- # API key: custom endpoint (OPENAI_API_KEY) takes precedence over OpenRouter
- self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
- # Provider resolution: determines whether to use OAuth credentials or env var keys
- from hermes_cli.auth import resolve_provider
+ self._explicit_api_key = api_key
+ self._explicit_base_url = base_url
+
+ # Provider selection is resolved lazily at use-time via _ensure_runtime_credentials().
self.requested_provider = (
provider
or os.getenv("HERMES_INFERENCE_PROVIDER")
or CLI_CONFIG["model"].get("provider")
or "auto"
)
- self.provider = resolve_provider(
- self.requested_provider,
- explicit_api_key=api_key,
- explicit_base_url=base_url,
+ self._provider_source: Optional[str] = None
+ self.provider = self.requested_provider
+ self.api_mode = "chat_completions"
+ self.base_url = (
+ base_url
+ or os.getenv("OPENAI_BASE_URL")
+ or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
)
+ self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
+ # Max turns priority: CLI arg > env var > config file (agent.max_turns or root max_turns) > default
+ if max_turns != 60: # CLI arg was explicitly set
self._nous_key_expires_at: Optional[str] = None
self._nous_key_source: Optional[str] = None
# Max turns priority: CLI arg > config file > env var > default
@@ -903,45 +905,51 @@ class HermesCLI:
def _ensure_runtime_credentials(self) -> bool:
"""
- Ensure OAuth provider credentials are fresh before agent use.
- For Nous Portal: checks agent key TTL, refreshes/re-mints as needed.
- If the key changed, tears down the agent so it rebuilds with new creds.
+ Ensure runtime credentials are resolved before agent use.
+ Re-resolves provider credentials so key rotation and token refresh
+ are picked up without restarting the CLI.
Returns True if credentials are ready, False on auth failure.
"""
- if self.provider != "nous":
- return True
-
- from hermes_cli.auth import format_auth_error, resolve_nous_runtime_credentials
+ from hermes_cli.runtime_provider import (
+ resolve_runtime_provider,
+ format_runtime_provider_error,
+ )
try:
- credentials = resolve_nous_runtime_credentials(
- min_key_ttl_seconds=max(
- 60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))
- ),
- timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
+ runtime = resolve_runtime_provider(
+ requested=self.requested_provider,
+ explicit_api_key=self._explicit_api_key,
+ explicit_base_url=self._explicit_base_url,
)
except Exception as exc:
- message = format_auth_error(exc)
+ message = format_runtime_provider_error(exc)
self.console.print(f"[bold red]{message}[/]")
return False
- api_key = credentials.get("api_key")
- base_url = credentials.get("base_url")
+ api_key = runtime.get("api_key")
+ base_url = runtime.get("base_url")
+ resolved_provider = runtime.get("provider", "openrouter")
+ resolved_api_mode = runtime.get("api_mode", self.api_mode)
if not isinstance(api_key, str) or not api_key:
- self.console.print("[bold red]Nous credential resolver returned an empty API key.[/]")
+ self.console.print("[bold red]Provider resolver returned an empty API key.[/]")
return False
if not isinstance(base_url, str) or not base_url:
- self.console.print("[bold red]Nous credential resolver returned an empty base URL.[/]")
+ self.console.print("[bold red]Provider resolver returned an empty base URL.[/]")
return False
credentials_changed = api_key != self.api_key or base_url != self.base_url
+ routing_changed = (
+ resolved_provider != self.provider
+ or resolved_api_mode != self.api_mode
+ )
+ self.provider = resolved_provider
+ self.api_mode = resolved_api_mode
+ self._provider_source = runtime.get("source")
self.api_key = api_key
self.base_url = base_url
- self._nous_key_expires_at = credentials.get("expires_at")
- self._nous_key_source = credentials.get("source")
# AIAgent/OpenAI client holds auth at init time, so rebuild if key rotated
- if credentials_changed and self.agent is not None:
+ if (credentials_changed or routing_changed) and self.agent is not None:
self.agent = None
return True
@@ -957,7 +965,7 @@ class HermesCLI:
if self.agent is not None:
return True
- if self.provider == "nous" and not self._ensure_runtime_credentials():
+ if not self._ensure_runtime_credentials():
return False
# Initialize SQLite session store for CLI sessions
@@ -1001,6 +1009,8 @@ class HermesCLI:
model=self.model,
api_key=self.api_key,
base_url=self.base_url,
+ provider=self.provider,
+ api_mode=self.api_mode,
max_iterations=self.max_turns,
enabled_toolsets=self.enabled_toolsets,
verbose_logging=self.verbose,
@@ -1093,8 +1103,8 @@ class HermesCLI:
toolsets_info = f" [dim #B8860B]·[/] [#CD7F32]toolsets: {', '.join(self.enabled_toolsets)}[/]"
provider_info = f" [dim #B8860B]·[/] [dim]provider: {self.provider}[/]"
- if self.provider == "nous" and self._nous_key_source:
- provider_info += f" [dim #B8860B]·[/] [dim]key: {self._nous_key_source}[/]"
+ if self._provider_source:
+ provider_info += f" [dim #B8860B]·[/] [dim]auth: {self._provider_source}[/]"
self.console.print(
f" {api_indicator} [#FFBF00]{model_short}[/] "
@@ -1929,8 +1939,8 @@ class HermesCLI:
Returns:
The agent's response, or None on error
"""
- # Refresh OAuth credentials if needed (handles key rotation transparently)
- if self.provider == "nous" and not self._ensure_runtime_credentials():
+ # Refresh provider credentials if needed (handles key rotation transparently)
+ if not self._ensure_runtime_credentials():
return None
# Initialize agent if needed
diff --git a/cron/scheduler.py b/cron/scheduler.py
index 23cf5cd61..df88e56b7 100644
--- a/cron/scheduler.py
+++ b/cron/scheduler.py
@@ -172,10 +172,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except UnicodeDecodeError:
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
- model = os.getenv("HERMES_MODEL", "anthropic/claude-opus-4.6")
- # Custom endpoint (OPENAI_*) takes precedence, matching CLI behavior
- api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
- base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
+ model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
import yaml
@@ -188,24 +185,27 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
- base_url = _model_cfg.get("base_url", base_url)
- # Check if provider is nous — resolve OAuth credentials
- provider = _model_cfg.get("provider", "") if isinstance(_model_cfg, dict) else ""
- if provider == "nous":
- try:
- from hermes_cli.auth import resolve_nous_runtime_credentials
- creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
- api_key = creds.get("api_key", api_key)
- base_url = creds.get("base_url", base_url)
- except Exception as nous_err:
- logging.warning("Nous Portal credential resolution failed for cron: %s", nous_err)
except Exception:
pass
+ from hermes_cli.runtime_provider import (
+ resolve_runtime_provider,
+ format_runtime_provider_error,
+ )
+ try:
+ runtime = resolve_runtime_provider(
+ requested=os.getenv("HERMES_INFERENCE_PROVIDER"),
+ )
+ except Exception as exc:
+ message = format_runtime_provider_error(exc)
+ raise RuntimeError(message) from exc
+
agent = AIAgent(
model=model,
- api_key=api_key,
- base_url=base_url,
+ api_key=runtime.get("api_key"),
+ base_url=runtime.get("base_url"),
+ provider=runtime.get("provider"),
+ api_mode=runtime.get("api_mode"),
quiet_mode=True,
session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
diff --git a/gateway/run.py b/gateway/run.py
index 8ed487ffe..942c72bbc 100644
--- a/gateway/run.py
+++ b/gateway/run.py
@@ -125,6 +125,28 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp
logger = logging.getLogger(__name__)
+def _resolve_runtime_agent_kwargs() -> dict:
+ """Resolve provider credentials for gateway-created AIAgent instances."""
+ from hermes_cli.runtime_provider import (
+ resolve_runtime_provider,
+ format_runtime_provider_error,
+ )
+
+ try:
+ runtime = resolve_runtime_provider(
+ requested=os.getenv("HERMES_INFERENCE_PROVIDER"),
+ )
+ except Exception as exc:
+ raise RuntimeError(format_runtime_provider_error(exc)) from exc
+
+ return {
+ "api_key": runtime.get("api_key"),
+ "base_url": runtime.get("base_url"),
+ "provider": runtime.get("provider"),
+ "api_mode": runtime.get("api_mode"),
+ }
+
+
class GatewayRunner:
"""
Main gateway controller.
@@ -958,14 +980,11 @@ class GatewayRunner:
from run_agent import AIAgent
loop = asyncio.get_event_loop()
# Resolve credentials so the flush agent can reach the LLM
- _flush_api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
- _flush_base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
- _flush_model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL", "anthropic/claude-opus-4.6")
+ _flush_model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
def _do_flush():
tmp_agent = AIAgent(
model=_flush_model,
- api_key=_flush_api_key,
- base_url=_flush_base_url,
+ **_resolve_runtime_agent_kwargs(),
max_iterations=5,
quiet_mode=True,
enabled_toolsets=["memory"],
@@ -1678,7 +1697,7 @@ class GatewayRunner:
combined_ephemeral = context_prompt or ""
if self._ephemeral_system_prompt:
combined_ephemeral = (combined_ephemeral + "\n\n" + self._ephemeral_system_prompt).strip()
-
+
# Re-read .env and config for fresh credentials (gateway is long-lived,
# keys may change without restart).
try:
@@ -1688,9 +1707,6 @@ class GatewayRunner:
except Exception:
pass
- # Custom endpoint (OPENAI_*) takes precedence, matching CLI behavior
- api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY", "")
- base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1")
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
try:
@@ -1704,24 +1720,22 @@ class GatewayRunner:
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
- base_url = _model_cfg.get("base_url", base_url)
- # Check if provider is nous — resolve OAuth credentials
- provider = _model_cfg.get("provider", "") if isinstance(_model_cfg, dict) else ""
- if provider == "nous":
- try:
- from hermes_cli.auth import resolve_nous_runtime_credentials
- creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
- api_key = creds.get("api_key", api_key)
- base_url = creds.get("base_url", base_url)
- except Exception as nous_err:
- logger.warning("Nous Portal credential resolution failed: %s", nous_err)
except Exception:
pass
+ try:
+ runtime_kwargs = _resolve_runtime_agent_kwargs()
+ except Exception as exc:
+ return {
+ "final_response": f"⚠️ Provider authentication failed: {exc}",
+ "messages": [],
+ "api_calls": 0,
+ "tools": [],
+ }
+
agent = AIAgent(
model=model,
- api_key=api_key,
- base_url=base_url,
+ **runtime_kwargs,
max_iterations=max_iterations,
quiet_mode=True,
verbose_logging=False,
diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py
index 0941c6d91..864916b32 100644
--- a/hermes_cli/auth.py
+++ b/hermes_cli/auth.py
@@ -18,7 +18,10 @@ from __future__ import annotations
import json
import logging
import os
+import shutil
import stat
+import base64
+import subprocess
import time
import webbrowser
from contextlib import contextmanager
@@ -55,6 +58,10 @@ DEFAULT_NOUS_SCOPE = "inference:mint_agent_key"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60 # 30 minutes
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 # refresh 2 min before expiry
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1 # poll at most every 1s
+DEFAULT_CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
+CODEX_OAUTH_CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
+CODEX_OAUTH_TOKEN_URL = "https://auth.openai.com/oauth/token"
+CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
# =============================================================================
@@ -84,7 +91,12 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
client_id=DEFAULT_NOUS_CLIENT_ID,
scope=DEFAULT_NOUS_SCOPE,
),
- # Future: "openai_codex", "anthropic", etc.
+ "openai-codex": ProviderConfig(
+ id="openai-codex",
+ name="OpenAI Codex",
+ auth_type="oauth_external",
+ inference_base_url=DEFAULT_CODEX_BASE_URL,
+ ),
}
@@ -298,12 +310,15 @@ def resolve_provider(
"""
normalized = (requested or "auto").strip().lower()
+ if normalized in {"openrouter", "custom"}:
+ return "openrouter"
if normalized in PROVIDER_REGISTRY:
return normalized
- if normalized == "openrouter":
- return "openrouter"
if normalized != "auto":
- return "openrouter"
+ raise AuthError(
+ f"Unknown provider '{normalized}'.",
+ code="invalid_provider",
+ )
# Explicit one-off CLI creds always mean openrouter/custom
if explicit_api_key or explicit_base_url:
@@ -314,8 +329,8 @@ def resolve_provider(
auth_store = _load_auth_store()
active = auth_store.get("active_provider")
if active and active in PROVIDER_REGISTRY:
- state = _load_provider_state(auth_store, active)
- if state and (state.get("access_token") or state.get("refresh_token")):
+ status = get_auth_status(active)
+ if status.get("logged_in"):
return active
except Exception as e:
logger.debug("Could not detect active auth provider: %s", e)
@@ -369,6 +384,27 @@ def _optional_base_url(value: Any) -> Optional[str]:
return cleaned if cleaned else None
+def _decode_jwt_claims(token: Any) -> Dict[str, Any]:
+ if not isinstance(token, str) or token.count(".") != 2:
+ return {}
+ payload = token.split(".")[1]
+ payload += "=" * ((4 - len(payload) % 4) % 4)
+ try:
+ raw = base64.urlsafe_b64decode(payload.encode("utf-8"))
+ claims = json.loads(raw.decode("utf-8"))
+ except Exception:
+ return {}
+ return claims if isinstance(claims, dict) else {}
+
+
+def _codex_access_token_is_expiring(access_token: Any, skew_seconds: int) -> bool:
+ claims = _decode_jwt_claims(access_token)
+ exp = claims.get("exp")
+ if not isinstance(exp, (int, float)):
+ return False
+ return float(exp) <= (time.time() + max(0, int(skew_seconds)))
+
+
# =============================================================================
# SSH / remote session detection
# =============================================================================
@@ -378,6 +414,302 @@ def _is_remote_session() -> bool:
return bool(os.getenv("SSH_CLIENT") or os.getenv("SSH_TTY"))
+# =============================================================================
+# OpenAI Codex auth file helpers
+# =============================================================================
+
+def resolve_codex_home_path() -> Path:
+ """Resolve CODEX_HOME, defaulting to ~/.codex."""
+ codex_home = os.getenv("CODEX_HOME", "").strip()
+ if not codex_home:
+ codex_home = str(Path.home() / ".codex")
+ return Path(codex_home).expanduser()
+
+
+def _codex_auth_file_path() -> Path:
+ return resolve_codex_home_path() / "auth.json"
+
+
+def _codex_auth_lock_path(auth_path: Path) -> Path:
+ return auth_path.with_suffix(auth_path.suffix + ".lock")
+
+
+@contextmanager
+def _codex_auth_file_lock(
+ auth_path: Path,
+ timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS,
+):
+ lock_path = _codex_auth_lock_path(auth_path)
+ lock_path.parent.mkdir(parents=True, exist_ok=True)
+
+ with lock_path.open("a+") as lock_file:
+ if fcntl is None:
+ yield
+ return
+
+ deadline = time.time() + max(1.0, timeout_seconds)
+ while True:
+ try:
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ break
+ except BlockingIOError:
+ if time.time() >= deadline:
+ raise TimeoutError(f"Timed out waiting for Codex auth lock: {lock_path}")
+ time.sleep(0.05)
+
+ try:
+ yield
+ finally:
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
+
+
+def read_codex_auth_file() -> Dict[str, Any]:
+ """Read and validate Codex auth.json shape."""
+ codex_home = resolve_codex_home_path()
+ if not codex_home.exists():
+ raise AuthError(
+ f"Codex home directory not found at {codex_home}.",
+ provider="openai-codex",
+ code="codex_home_missing",
+ relogin_required=True,
+ )
+
+ auth_path = codex_home / "auth.json"
+ if not auth_path.exists():
+ raise AuthError(
+ f"Codex auth file not found at {auth_path}.",
+ provider="openai-codex",
+ code="codex_auth_missing",
+ relogin_required=True,
+ )
+
+ try:
+ payload = json.loads(auth_path.read_text())
+ except Exception as exc:
+ raise AuthError(
+ f"Failed to parse Codex auth file at {auth_path}.",
+ provider="openai-codex",
+ code="codex_auth_invalid_json",
+ relogin_required=True,
+ ) from exc
+
+ tokens = payload.get("tokens")
+ if not isinstance(tokens, dict):
+ raise AuthError(
+ "Codex auth file is missing a valid 'tokens' object.",
+ provider="openai-codex",
+ code="codex_auth_invalid_shape",
+ relogin_required=True,
+ )
+
+ access_token = tokens.get("access_token")
+ refresh_token = tokens.get("refresh_token")
+ if not isinstance(access_token, str) or not access_token.strip():
+ raise AuthError(
+ "Codex auth file is missing tokens.access_token.",
+ provider="openai-codex",
+ code="codex_auth_missing_access_token",
+ relogin_required=True,
+ )
+ if not isinstance(refresh_token, str) or not refresh_token.strip():
+ raise AuthError(
+ "Codex auth file is missing tokens.refresh_token.",
+ provider="openai-codex",
+ code="codex_auth_missing_refresh_token",
+ relogin_required=True,
+ )
+
+ return {
+ "payload": payload,
+ "tokens": tokens,
+ "auth_path": auth_path,
+ "codex_home": codex_home,
+ }
+
+
+def _persist_codex_auth_payload(
+ auth_path: Path,
+ payload: Dict[str, Any],
+ *,
+ lock_held: bool = False,
+) -> None:
+ auth_path.parent.mkdir(parents=True, exist_ok=True)
+
+ def _write() -> None:
+ serialized = json.dumps(payload, indent=2, ensure_ascii=False) + "\n"
+ tmp_path = auth_path.parent / f".{auth_path.name}.{os.getpid()}.{time.time_ns()}.tmp"
+ try:
+ with tmp_path.open("w", encoding="utf-8") as tmp_file:
+ tmp_file.write(serialized)
+ tmp_file.flush()
+ os.fsync(tmp_file.fileno())
+ os.replace(tmp_path, auth_path)
+ finally:
+ if tmp_path.exists():
+ try:
+ tmp_path.unlink()
+ except OSError:
+ pass
+
+ try:
+ auth_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
+ except OSError:
+ pass
+
+ if lock_held:
+ _write()
+ return
+
+ with _codex_auth_file_lock(auth_path):
+ _write()
+
+
+def _refresh_codex_auth_tokens(
+ *,
+ payload: Dict[str, Any],
+ auth_path: Path,
+ timeout_seconds: float,
+ lock_held: bool = False,
+) -> Dict[str, Any]:
+ tokens = payload.get("tokens")
+ if not isinstance(tokens, dict):
+ raise AuthError(
+ "Codex auth file is missing a valid 'tokens' object.",
+ provider="openai-codex",
+ code="codex_auth_invalid_shape",
+ relogin_required=True,
+ )
+
+ refresh_token = tokens.get("refresh_token")
+ if not isinstance(refresh_token, str) or not refresh_token.strip():
+ raise AuthError(
+ "Codex auth file is missing tokens.refresh_token.",
+ provider="openai-codex",
+ code="codex_auth_missing_refresh_token",
+ relogin_required=True,
+ )
+
+ timeout = httpx.Timeout(max(5.0, float(timeout_seconds)))
+ with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}) as client:
+ response = client.post(
+ CODEX_OAUTH_TOKEN_URL,
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data={
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ "client_id": CODEX_OAUTH_CLIENT_ID,
+ },
+ )
+
+ if response.status_code != 200:
+ code = "codex_refresh_failed"
+ message = f"Codex token refresh failed with status {response.status_code}."
+ relogin_required = False
+ try:
+ err = response.json()
+ if isinstance(err, dict):
+ err_code = err.get("error")
+ if isinstance(err_code, str) and err_code.strip():
+ code = err_code.strip()
+ err_desc = err.get("error_description") or err.get("message")
+ if isinstance(err_desc, str) and err_desc.strip():
+ message = f"Codex token refresh failed: {err_desc.strip()}"
+ except Exception:
+ pass
+ if code in {"invalid_grant", "invalid_token", "invalid_request"}:
+ relogin_required = True
+ raise AuthError(
+ message,
+ provider="openai-codex",
+ code=code,
+ relogin_required=relogin_required,
+ )
+
+ try:
+ refresh_payload = response.json()
+ except Exception as exc:
+ raise AuthError(
+ "Codex token refresh returned invalid JSON.",
+ provider="openai-codex",
+ code="codex_refresh_invalid_json",
+ relogin_required=True,
+ ) from exc
+
+ access_token = refresh_payload.get("access_token")
+ if not isinstance(access_token, str) or not access_token.strip():
+ raise AuthError(
+ "Codex token refresh response was missing access_token.",
+ provider="openai-codex",
+ code="codex_refresh_missing_access_token",
+ relogin_required=True,
+ )
+
+ updated_tokens = dict(tokens)
+ updated_tokens["access_token"] = access_token.strip()
+ next_refresh = refresh_payload.get("refresh_token")
+ if isinstance(next_refresh, str) and next_refresh.strip():
+ updated_tokens["refresh_token"] = next_refresh.strip()
+ payload["tokens"] = updated_tokens
+ payload["last_refresh"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
+ _persist_codex_auth_payload(auth_path, payload, lock_held=lock_held)
+ return updated_tokens
+
+
+def resolve_codex_runtime_credentials(
+ *,
+ force_refresh: bool = False,
+ refresh_if_expiring: bool = True,
+ refresh_skew_seconds: int = CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
+) -> Dict[str, Any]:
+ """Resolve runtime credentials from Codex CLI auth state."""
+ data = read_codex_auth_file()
+ payload = data["payload"]
+ tokens = dict(data["tokens"])
+ auth_path = data["auth_path"]
+ access_token = str(tokens.get("access_token", "") or "").strip()
+ refresh_timeout_seconds = float(os.getenv("HERMES_CODEX_REFRESH_TIMEOUT_SECONDS", "20"))
+
+ should_refresh = bool(force_refresh)
+ if (not should_refresh) and refresh_if_expiring:
+ should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
+ if should_refresh:
+ lock_timeout = max(float(AUTH_LOCK_TIMEOUT_SECONDS), refresh_timeout_seconds + 5.0)
+ with _codex_auth_file_lock(auth_path, timeout_seconds=lock_timeout):
+ data = read_codex_auth_file()
+ payload = data["payload"]
+ tokens = dict(data["tokens"])
+ access_token = str(tokens.get("access_token", "") or "").strip()
+
+ should_refresh = bool(force_refresh)
+ if (not should_refresh) and refresh_if_expiring:
+ should_refresh = _codex_access_token_is_expiring(access_token, refresh_skew_seconds)
+
+ if should_refresh:
+ tokens = _refresh_codex_auth_tokens(
+ payload=payload,
+ auth_path=auth_path,
+ timeout_seconds=refresh_timeout_seconds,
+ lock_held=True,
+ )
+ access_token = str(tokens.get("access_token", "") or "").strip()
+
+ base_url = (
+ os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/")
+ or DEFAULT_CODEX_BASE_URL
+ )
+
+ return {
+ "provider": "openai-codex",
+ "base_url": base_url,
+ "api_key": access_token,
+ "source": "codex-auth-json",
+ "last_refresh": payload.get("last_refresh"),
+ "auth_mode": payload.get("auth_mode"),
+ "auth_file": str(auth_path),
+ "codex_home": str(data["codex_home"]),
+ }
+
+
# =============================================================================
# TLS verification helper
# =============================================================================
@@ -806,11 +1138,37 @@ def get_nous_auth_status() -> Dict[str, Any]:
}
+def get_codex_auth_status() -> Dict[str, Any]:
+ """Status snapshot for Codex auth."""
+ state = get_provider_auth_state("openai-codex") or {}
+ auth_file = state.get("auth_file") or str(_codex_auth_file_path())
+ codex_home = state.get("codex_home") or str(resolve_codex_home_path())
+ try:
+ creds = resolve_codex_runtime_credentials()
+ return {
+ "logged_in": True,
+ "auth_file": creds.get("auth_file"),
+ "codex_home": creds.get("codex_home"),
+ "last_refresh": creds.get("last_refresh"),
+ "auth_mode": creds.get("auth_mode"),
+ "source": creds.get("source"),
+ }
+ except AuthError as exc:
+ return {
+ "logged_in": False,
+ "auth_file": auth_file,
+ "codex_home": codex_home,
+ "error": str(exc),
+ }
+
+
def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Generic auth status dispatcher."""
target = provider_id or get_active_provider()
if target == "nous":
return get_nous_auth_status()
+ if target == "openai-codex":
+ return get_codex_auth_status()
return {"logged_in": False}
@@ -982,11 +1340,64 @@ def login_command(args) -> None:
if provider_id == "nous":
_login_nous(args, pconfig)
+ elif provider_id == "openai-codex":
+ _login_openai_codex(args, pconfig)
else:
print(f"Login for provider '{provider_id}' is not yet implemented.")
raise SystemExit(1)
+def _login_openai_codex(args, pconfig: ProviderConfig) -> None:
+ """OpenAI Codex login flow using Codex CLI auth state."""
+ codex_path = shutil.which("codex")
+ if not codex_path:
+ print("Codex CLI was not found in PATH.")
+ print("Install Codex CLI, then retry `hermes login --provider openai-codex`.")
+ raise SystemExit(1)
+
+ print(f"Starting Hermes login via {pconfig.name}...")
+ print(f"Using Codex CLI: {codex_path}")
+ print(f"Codex home: {resolve_codex_home_path()}")
+
+ creds: Dict[str, Any]
+ try:
+ creds = resolve_codex_runtime_credentials()
+ except AuthError:
+ print("No usable Codex auth found. Running `codex login`...")
+ try:
+ subprocess.run(["codex", "login"], check=True)
+ except subprocess.CalledProcessError as exc:
+ print(f"Codex login failed with exit code {exc.returncode}.")
+ raise SystemExit(1)
+ except KeyboardInterrupt:
+ print("\nLogin cancelled.")
+ raise SystemExit(130)
+ try:
+ creds = resolve_codex_runtime_credentials()
+ except AuthError as exc:
+ print(format_auth_error(exc))
+ raise SystemExit(1)
+
+ auth_state = {
+ "auth_file": creds.get("auth_file"),
+ "codex_home": creds.get("codex_home"),
+ "last_refresh": creds.get("last_refresh"),
+ "auth_mode": creds.get("auth_mode"),
+ "source": creds.get("source"),
+ }
+
+ with _auth_store_lock():
+ auth_store = _load_auth_store()
+ _save_provider_state(auth_store, "openai-codex", auth_state)
+ saved_to = _save_auth_store(auth_store)
+
+ config_path = _update_config_for_provider("openai-codex", creds["base_url"])
+ print()
+ print("Login successful!")
+ print(f" Auth state: {saved_to}")
+ print(f" Config updated: {config_path} (model.provider=openai-codex)")
+
+
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
portal_base_url = (
diff --git a/hermes_cli/codex_models.py b/hermes_cli/codex_models.py
new file mode 100644
index 000000000..ed1009c54
--- /dev/null
+++ b/hermes_cli/codex_models.py
@@ -0,0 +1,91 @@
+"""Codex model discovery from local Codex CLI cache/config."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import List, Optional
+
+from hermes_cli.auth import resolve_codex_home_path
+
+DEFAULT_CODEX_MODELS: List[str] = [
+ "gpt-5-codex",
+ "gpt-5.3-codex",
+ "gpt-5.2-codex",
+ "gpt-5.1-codex",
+]
+
+
+def _read_default_model(codex_home: Path) -> Optional[str]:
+ config_path = codex_home / "config.toml"
+ if not config_path.exists():
+ return None
+ try:
+ import tomllib
+ except Exception:
+ return None
+ try:
+ payload = tomllib.loads(config_path.read_text(encoding="utf-8"))
+ except Exception:
+ return None
+ model = payload.get("model") if isinstance(payload, dict) else None
+ if isinstance(model, str) and model.strip():
+ return model.strip()
+ return None
+
+
+def _read_cache_models(codex_home: Path) -> List[str]:
+ cache_path = codex_home / "models_cache.json"
+ if not cache_path.exists():
+ return []
+ try:
+ raw = json.loads(cache_path.read_text(encoding="utf-8"))
+ except Exception:
+ return []
+
+ entries = raw.get("models") if isinstance(raw, dict) else None
+ sortable = []
+ if isinstance(entries, list):
+ for item in entries:
+ if not isinstance(item, dict):
+ continue
+ slug = item.get("slug")
+ if not isinstance(slug, str) or not slug.strip():
+ continue
+ slug = slug.strip()
+ if "codex" not in slug.lower():
+ continue
+ if item.get("supported_in_api") is False:
+ continue
+ visibility = item.get("visibility")
+ if isinstance(visibility, str) and visibility.strip().lower() == "hidden":
+ continue
+ priority = item.get("priority")
+ rank = int(priority) if isinstance(priority, (int, float)) else 10_000
+ sortable.append((rank, slug))
+
+ sortable.sort(key=lambda item: (item[0], item[1]))
+ deduped: List[str] = []
+ for _, slug in sortable:
+ if slug not in deduped:
+ deduped.append(slug)
+ return deduped
+
+
+def get_codex_model_ids() -> List[str]:
+ codex_home = resolve_codex_home_path()
+ ordered: List[str] = []
+
+ default_model = _read_default_model(codex_home)
+ if default_model:
+ ordered.append(default_model)
+
+ for model_id in _read_cache_models(codex_home):
+ if model_id not in ordered:
+ ordered.append(model_id)
+
+ for model_id in DEFAULT_CODEX_MODELS:
+ if model_id not in ordered:
+ ordered.append(model_id)
+
+ return ordered
diff --git a/hermes_cli/doctor.py b/hermes_cli/doctor.py
index 742675d03..031c6eaf8 100644
--- a/hermes_cli/doctor.py
+++ b/hermes_cli/doctor.py
@@ -175,6 +175,36 @@ def run_doctor(args):
else:
check_warn("config.yaml not found", "(using defaults)")
+ # =========================================================================
+ # Check: Auth providers
+ # =========================================================================
+ print()
+ print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
+
+ try:
+ from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
+
+ nous_status = get_nous_auth_status()
+ if nous_status.get("logged_in"):
+ check_ok("Nous Portal auth", "(logged in)")
+ else:
+ check_warn("Nous Portal auth", "(not logged in)")
+
+ codex_status = get_codex_auth_status()
+ if codex_status.get("logged_in"):
+ check_ok("OpenAI Codex auth", "(logged in)")
+ else:
+ check_warn("OpenAI Codex auth", "(not logged in)")
+ if codex_status.get("error"):
+ check_info(codex_status["error"])
+ except Exception as e:
+ check_warn("Auth provider status", f"(could not check: {e})")
+
+ if shutil.which("codex"):
+ check_ok("codex CLI")
+ else:
+ check_warn("codex CLI not found", "(required for openai-codex login)")
+
# =========================================================================
# Check: Directory structure
# =========================================================================
diff --git a/hermes_cli/main.py b/hermes_cli/main.py
index b232d5b55..b0965e547 100644
--- a/hermes_cli/main.py
+++ b/hermes_cli/main.py
@@ -60,6 +60,7 @@ logger = logging.getLogger(__name__)
def _has_any_provider_configured() -> bool:
"""Check if at least one inference provider is usable."""
from hermes_cli.config import get_env_path, get_hermes_home
+ from hermes_cli.auth import get_auth_status
# Check env vars (may be set by .env or shell).
# OPENAI_BASE_URL alone counts — local models (vLLM, llama.cpp, etc.)
@@ -91,8 +92,8 @@ def _has_any_provider_configured() -> bool:
auth = json.loads(auth_file.read_text())
active = auth.get("active_provider")
if active:
- state = auth.get("providers", {}).get(active, {})
- if state.get("access_token") or state.get("refresh_token"):
+ status = get_auth_status(active)
+ if status.get("logged_in"):
return True
except Exception:
pass
@@ -289,7 +290,7 @@ def cmd_model(args):
resolve_provider, get_provider_auth_state, PROVIDER_REGISTRY,
_prompt_model_selection, _save_model_choice, _update_config_for_provider,
resolve_nous_runtime_credentials, fetch_nous_models, AuthError, format_auth_error,
- _login_nous, ProviderConfig,
+ _login_nous,
)
from hermes_cli.config import load_config, save_config, get_env_value, save_env_value
@@ -312,7 +313,12 @@ def cmd_model(args):
or config_provider
or "auto"
)
- active = resolve_provider(effective_provider)
+ try:
+ active = resolve_provider(effective_provider)
+ except AuthError as exc:
+ warning = format_auth_error(exc)
+ print(f"Warning: {warning} Falling back to auto provider detection.")
+ active = resolve_provider("auto")
# Detect custom endpoint
if active == "openrouter" and get_env_value("OPENAI_BASE_URL"):
@@ -321,6 +327,7 @@ def cmd_model(args):
provider_labels = {
"openrouter": "OpenRouter",
"nous": "Nous Portal",
+ "openai-codex": "OpenAI Codex",
"custom": "Custom endpoint",
}
active_label = provider_labels.get(active, active)
@@ -334,11 +341,12 @@ def cmd_model(args):
providers = [
("openrouter", "OpenRouter (100+ models, pay-per-use)"),
("nous", "Nous Portal (Nous Research subscription)"),
+ ("openai-codex", "OpenAI Codex"),
("custom", "Custom endpoint (self-hosted / VLLM / etc.)"),
]
# Reorder so the active provider is at the top
- active_key = active if active in ("openrouter", "nous") else "custom"
+ active_key = active if active in ("openrouter", "nous", "openai-codex") else "custom"
ordered = []
for key, label in providers:
if key == active_key:
@@ -359,6 +367,8 @@ def cmd_model(args):
_model_flow_openrouter(config, current_model)
elif selected_provider == "nous":
_model_flow_nous(config, current_model)
+ elif selected_provider == "openai-codex":
+ _model_flow_openai_codex(config, current_model)
elif selected_provider == "custom":
_model_flow_custom(config)
@@ -512,6 +522,46 @@ def _model_flow_nous(config, current_model=""):
print("No change.")
+def _model_flow_openai_codex(config, current_model=""):
+ """OpenAI Codex provider: ensure logged in, then pick model."""
+ from hermes_cli.auth import (
+ get_codex_auth_status, _prompt_model_selection, _save_model_choice,
+ _update_config_for_provider, _login_openai_codex,
+ PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
+ )
+ from hermes_cli.codex_models import get_codex_model_ids
+ from hermes_cli.config import get_env_value, save_env_value
+ import argparse
+
+ status = get_codex_auth_status()
+ if not status.get("logged_in"):
+ print("Not logged into OpenAI Codex. Starting login...")
+ print()
+ try:
+ mock_args = argparse.Namespace()
+ _login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
+ except SystemExit:
+ print("Login cancelled or failed.")
+ return
+ except Exception as exc:
+ print(f"Login failed: {exc}")
+ return
+
+ codex_models = get_codex_model_ids()
+
+ selected = _prompt_model_selection(codex_models, current_model=current_model)
+ if selected:
+ _save_model_choice(selected)
+ _update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
+ # Clear custom endpoint env vars that would otherwise override Codex.
+ if get_env_value("OPENAI_BASE_URL"):
+ save_env_value("OPENAI_BASE_URL", "")
+ save_env_value("OPENAI_API_KEY", "")
+ print(f"Default model set to: {selected} (via OpenAI Codex)")
+ else:
+ print("No change.")
+
+
def _model_flow_custom(config):
"""Custom endpoint: collect URL, API key, and model name."""
from hermes_cli.auth import _save_model_choice, deactivate_provider
@@ -857,7 +907,7 @@ For more help on a command:
)
chat_parser.add_argument(
"--provider",
- choices=["auto", "openrouter", "nous"],
+ choices=["auto", "openrouter", "nous", "openai-codex"],
default=None,
help="Inference provider (default: auto)"
)
@@ -966,9 +1016,9 @@ For more help on a command:
)
login_parser.add_argument(
"--provider",
- choices=["nous"],
+ choices=["nous", "openai-codex"],
default=None,
- help="Provider to authenticate with (default: interactive selection)"
+ help="Provider to authenticate with (default: nous)"
)
login_parser.add_argument(
"--portal-url",
@@ -1020,7 +1070,7 @@ For more help on a command:
)
logout_parser.add_argument(
"--provider",
- choices=["nous"],
+ choices=["nous", "openai-codex"],
default=None,
help="Provider to log out from (default: active provider)"
)
diff --git a/hermes_cli/runtime_provider.py b/hermes_cli/runtime_provider.py
new file mode 100644
index 000000000..1f070ac22
--- /dev/null
+++ b/hermes_cli/runtime_provider.py
@@ -0,0 +1,149 @@
+"""Shared runtime provider resolution for CLI, gateway, cron, and helpers."""
+
+from __future__ import annotations
+
+import os
+from typing import Any, Dict, Optional
+
+from hermes_cli.auth import (
+ AuthError,
+ format_auth_error,
+ resolve_provider,
+ resolve_nous_runtime_credentials,
+ resolve_codex_runtime_credentials,
+)
+from hermes_cli.config import load_config
+from hermes_constants import OPENROUTER_BASE_URL
+
+
+def _get_model_config() -> Dict[str, Any]:
+ config = load_config()
+ model_cfg = config.get("model")
+ if isinstance(model_cfg, dict):
+ return dict(model_cfg)
+ if isinstance(model_cfg, str) and model_cfg.strip():
+ return {"default": model_cfg.strip()}
+ return {}
+
+
+def resolve_requested_provider(requested: Optional[str] = None) -> str:
+ """Resolve provider request from explicit arg, env, then config."""
+ if requested and requested.strip():
+ return requested.strip().lower()
+
+ env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
+ if env_provider:
+ return env_provider
+
+ model_cfg = _get_model_config()
+ cfg_provider = model_cfg.get("provider")
+ if isinstance(cfg_provider, str) and cfg_provider.strip():
+ return cfg_provider.strip().lower()
+
+ return "auto"
+
+
+def _resolve_openrouter_runtime(
+ *,
+ requested_provider: str,
+ explicit_api_key: Optional[str] = None,
+ explicit_base_url: Optional[str] = None,
+) -> Dict[str, Any]:
+ model_cfg = _get_model_config()
+ cfg_base_url = model_cfg.get("base_url") if isinstance(model_cfg.get("base_url"), str) else ""
+ cfg_provider = model_cfg.get("provider") if isinstance(model_cfg.get("provider"), str) else ""
+ requested_norm = (requested_provider or "").strip().lower()
+ cfg_provider = cfg_provider.strip().lower()
+
+ env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
+ env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
+
+ use_config_base_url = False
+ if requested_norm == "auto":
+ if cfg_base_url.strip() and not explicit_base_url and not env_openai_base_url:
+ if not cfg_provider or cfg_provider == "auto":
+ use_config_base_url = True
+
+ base_url = (
+ (explicit_base_url or "").strip()
+ or env_openai_base_url
+ or (cfg_base_url.strip() if use_config_base_url else "")
+ or env_openrouter_base_url
+ or OPENROUTER_BASE_URL
+ ).rstrip("/")
+
+ api_key = (
+ explicit_api_key
+ or os.getenv("OPENAI_API_KEY")
+ or os.getenv("OPENROUTER_API_KEY")
+ or ""
+ )
+
+ source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config"
+
+ return {
+ "provider": "openrouter",
+ "api_mode": "chat_completions",
+ "base_url": base_url,
+ "api_key": api_key,
+ "source": source,
+ }
+
+
+def resolve_runtime_provider(
+ *,
+ requested: Optional[str] = None,
+ explicit_api_key: Optional[str] = None,
+ explicit_base_url: Optional[str] = None,
+) -> Dict[str, Any]:
+ """Resolve runtime provider credentials for agent execution."""
+ requested_provider = resolve_requested_provider(requested)
+
+ provider = resolve_provider(
+ requested_provider,
+ explicit_api_key=explicit_api_key,
+ explicit_base_url=explicit_base_url,
+ )
+
+ if provider == "nous":
+ creds = resolve_nous_runtime_credentials(
+ min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
+ timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
+ )
+ return {
+ "provider": "nous",
+ "api_mode": "chat_completions",
+ "base_url": creds.get("base_url", "").rstrip("/"),
+ "api_key": creds.get("api_key", ""),
+ "source": creds.get("source", "portal"),
+ "expires_at": creds.get("expires_at"),
+ "requested_provider": requested_provider,
+ }
+
+ if provider == "openai-codex":
+ creds = resolve_codex_runtime_credentials()
+ return {
+ "provider": "openai-codex",
+ "api_mode": "codex_responses",
+ "base_url": creds.get("base_url", "").rstrip("/"),
+ "api_key": creds.get("api_key", ""),
+ "source": creds.get("source", "codex-auth-json"),
+ "auth_file": creds.get("auth_file"),
+ "codex_home": creds.get("codex_home"),
+ "last_refresh": creds.get("last_refresh"),
+ "requested_provider": requested_provider,
+ }
+
+ runtime = _resolve_openrouter_runtime(
+ requested_provider=requested_provider,
+ explicit_api_key=explicit_api_key,
+ explicit_base_url=explicit_base_url,
+ )
+ runtime["requested_provider"] = requested_provider
+ return runtime
+
+
+def format_runtime_provider_error(error: Exception) -> str:
+ if isinstance(error, AuthError):
+ return format_auth_error(error)
+ return str(error)
diff --git a/hermes_cli/setup.py b/hermes_cli/setup.py
index 6ed9fb64a..d11cb4b7a 100644
--- a/hermes_cli/setup.py
+++ b/hermes_cli/setup.py
@@ -620,6 +620,7 @@ def run_setup_wizard(args):
get_active_provider, get_provider_auth_state, PROVIDER_REGISTRY,
format_auth_error, AuthError, fetch_nous_models,
resolve_nous_runtime_credentials, _update_config_for_provider,
+ _login_openai_codex, get_codex_auth_status, DEFAULT_CODEX_BASE_URL,
)
existing_custom = get_env_value("OPENAI_BASE_URL")
existing_or = get_env_value("OPENROUTER_API_KEY")
@@ -640,6 +641,7 @@ def run_setup_wizard(args):
provider_choices = [
"Login with Nous Portal (Nous Research subscription)",
+ "Login with OpenAI Codex",
"OpenRouter API key (100+ models, pay-per-use)",
"Custom OpenAI-compatible endpoint (self-hosted / VLLM / etc.)",
]
@@ -647,7 +649,7 @@ def run_setup_wizard(args):
provider_choices.append(keep_label)
# Default to "Keep current" if a provider exists, otherwise OpenRouter (most common)
- default_provider = len(provider_choices) - 1 if has_any_provider else 1
+ default_provider = len(provider_choices) - 1 if has_any_provider else 2
if not has_any_provider:
print_warning("An inference provider is required for Hermes to work.")
@@ -656,7 +658,7 @@ def run_setup_wizard(args):
provider_idx = prompt_choice("Select your inference provider:", provider_choices, default_provider)
# Track which provider was selected for model step
- selected_provider = None # "nous", "openrouter", "custom", or None (keep)
+ selected_provider = None # "nous", "openai-codex", "openrouter", "custom", or None (keep)
nous_models = [] # populated if Nous login succeeds
if provider_idx == 0: # Nous Portal
@@ -699,7 +701,31 @@ def run_setup_wizard(args):
print_info("You can try again later with: hermes login")
selected_provider = None
- elif provider_idx == 1: # OpenRouter
+ elif provider_idx == 1: # OpenAI Codex
+ selected_provider = "openai-codex"
+ print()
+ print_header("OpenAI Codex Login")
+ print()
+
+ try:
+ import argparse
+ mock_args = argparse.Namespace()
+ _login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
+ # Clear custom endpoint vars that would override provider routing.
+ if existing_custom:
+ save_env_value("OPENAI_BASE_URL", "")
+ save_env_value("OPENAI_API_KEY", "")
+ _update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
+ except SystemExit:
+ print_warning("OpenAI Codex login was cancelled or failed.")
+ print_info("You can try again later with: hermes login --provider openai-codex")
+ selected_provider = None
+ except Exception as e:
+ print_error(f"Login failed: {e}")
+ print_info("You can try again later with: hermes login --provider openai-codex")
+ selected_provider = None
+
+ elif provider_idx == 2: # OpenRouter
selected_provider = "openrouter"
print()
print_header("OpenRouter API Key")
@@ -726,7 +752,7 @@ def run_setup_wizard(args):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
- elif provider_idx == 2: # Custom endpoint
+ elif provider_idx == 3: # Custom endpoint
selected_provider = "custom"
print()
print_header("Custom OpenAI-Compatible Endpoint")
@@ -753,14 +779,14 @@ def run_setup_wizard(args):
config['model'] = model_name
save_env_value("LLM_MODEL", model_name)
print_success("Custom endpoint configured")
- # else: provider_idx == 3 (Keep current) — only shown when a provider already exists
+ # else: provider_idx == 4 (Keep current) — only shown when a provider already exists
# =========================================================================
# Step 1b: OpenRouter API Key for tools (if not already set)
# =========================================================================
# Tools (vision, web, MoA) use OpenRouter independently of the main provider.
# Prompt for OpenRouter key if not set and a non-OpenRouter provider was chosen.
- if selected_provider in ("nous", "custom") and not get_env_value("OPENROUTER_API_KEY"):
+ if selected_provider in ("nous", "openai-codex", "custom") and not get_env_value("OPENROUTER_API_KEY"):
print()
print_header("OpenRouter API Key (for tools)")
print_info("Tools like vision analysis, web search, and MoA use OpenRouter")
@@ -806,6 +832,25 @@ def run_setup_wizard(args):
config['model'] = custom
save_env_value("LLM_MODEL", custom)
# else: keep current
+ elif selected_provider == "openai-codex":
+ from hermes_cli.codex_models import get_codex_model_ids
+ codex_models = get_codex_model_ids()
+ model_choices = [f"{m}" for m in codex_models]
+ model_choices.append("Custom model")
+ model_choices.append(f"Keep current ({current_model})")
+
+ keep_idx = len(model_choices) - 1
+ model_idx = prompt_choice("Select default model:", model_choices, keep_idx)
+
+ if model_idx < len(codex_models):
+ config['model'] = codex_models[model_idx]
+ save_env_value("LLM_MODEL", codex_models[model_idx])
+ elif model_idx == len(codex_models):
+ custom = prompt("Enter model name")
+ if custom:
+ config['model'] = custom
+ save_env_value("LLM_MODEL", custom)
+ _update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
else:
# Static list for OpenRouter / fallback (from canonical list)
from hermes_cli.models import model_ids, menu_labels
diff --git a/hermes_cli/status.py b/hermes_cli/status.py
index ec50c6d62..81b55cab7 100644
--- a/hermes_cli/status.py
+++ b/hermes_cli/status.py
@@ -101,10 +101,12 @@ def show_status(args):
print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
try:
- from hermes_cli.auth import get_nous_auth_status
+ from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
nous_status = get_nous_auth_status()
+ codex_status = get_codex_auth_status()
except Exception:
nous_status = {}
+ codex_status = {}
nous_logged_in = bool(nous_status.get("logged_in"))
print(
@@ -121,6 +123,20 @@ def show_status(args):
print(f" Key exp: {key_exp}")
print(f" Refresh: {refresh_label}")
+ codex_logged_in = bool(codex_status.get("logged_in"))
+ print(
+ f" {'OpenAI Codex':<12} {check_mark(codex_logged_in)} "
+ f"{'logged in' if codex_logged_in else 'not logged in (run: hermes login --provider openai-codex)'}"
+ )
+ codex_auth_file = codex_status.get("auth_file")
+ if codex_auth_file:
+ print(f" Auth file: {codex_auth_file}")
+ codex_last_refresh = _format_iso_timestamp(codex_status.get("last_refresh"))
+ if codex_status.get("last_refresh"):
+ print(f" Refreshed: {codex_last_refresh}")
+ if codex_status.get("error") and not codex_logged_in:
+ print(f" Error: {codex_status.get('error')}")
+
# =========================================================================
# Terminal Configuration
# =========================================================================
diff --git a/run_agent.py b/run_agent.py
index 61c9669f7..ec634b7ab 100644
--- a/run_agent.py
+++ b/run_agent.py
@@ -21,6 +21,7 @@ Usage:
"""
import copy
+import hashlib
import json
import logging
logger = logging.getLogger(__name__)
@@ -30,6 +31,7 @@ import re
import sys
import time
import threading
+from types import SimpleNamespace
import uuid
from typing import List, Dict, Any, Optional
from openai import OpenAI
@@ -106,6 +108,8 @@ class AIAgent:
self,
base_url: str = None,
api_key: str = None,
+ provider: str = None,
+ api_mode: str = None,
model: str = "anthropic/claude-opus-4.6", # OpenRouter format
max_iterations: int = 60, # Default tool-calling iterations
tool_delay: float = 1.0,
@@ -140,6 +144,8 @@ class AIAgent:
Args:
base_url (str): Base URL for the model API (optional)
api_key (str): API key for authentication (optional, uses env var if not provided)
+ provider (str): Provider identifier (optional; used for telemetry/routing hints)
+ api_mode (str): API mode override: "chat_completions" or "codex_responses"
model (str): Model name to use (default: "anthropic/claude-opus-4.6")
max_iterations (int): Maximum number of tool calling iterations (default: 60)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
@@ -187,6 +193,17 @@ class AIAgent:
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
# When no base_url is provided, the client defaults to OpenRouter, so reflect that here.
self.base_url = base_url or OPENROUTER_BASE_URL
+ provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None
+ self.provider = provider_name or "openrouter"
+ if api_mode in {"chat_completions", "codex_responses"}:
+ self.api_mode = api_mode
+ elif self.provider == "openai-codex":
+ self.api_mode = "codex_responses"
+ elif (provider_name is None) and "chatgpt.com/backend-api/codex" in self.base_url.lower():
+ self.api_mode = "codex_responses"
+ self.provider = "openai-codex"
+ else:
+ self.api_mode = "chat_completions"
if base_url and "api.anthropic.com" in base_url.strip().lower():
raise ValueError(
"Anthropic's native /v1/messages API is not supported yet (planned for a future release). "
@@ -546,6 +563,77 @@ class AIAgent:
if not content:
return ""
return re.sub(r'.*?', '', content, flags=re.DOTALL)
+
+ def _looks_like_codex_intermediate_ack(
+ self,
+ user_message: str,
+ assistant_content: str,
+ messages: List[Dict[str, Any]],
+ ) -> bool:
+ """Detect a planning/ack message that should continue instead of ending the turn."""
+ if any(isinstance(msg, dict) and msg.get("role") == "tool" for msg in messages):
+ return False
+
+ assistant_text = self._strip_think_blocks(assistant_content or "").strip().lower()
+ if not assistant_text:
+ return False
+ if len(assistant_text) > 1200:
+ return False
+
+ has_future_ack = bool(
+ re.search(r"\b(i['’]ll|i will|let me|i can do that|i can help with that)\b", assistant_text)
+ )
+ if not has_future_ack:
+ return False
+
+ action_markers = (
+ "look into",
+ "look at",
+ "inspect",
+ "scan",
+ "check",
+ "analyz",
+ "review",
+ "explore",
+ "read",
+ "open",
+ "run",
+ "test",
+ "fix",
+ "debug",
+ "search",
+ "find",
+ "walkthrough",
+ "report back",
+ "summarize",
+ )
+ workspace_markers = (
+ "directory",
+ "current directory",
+ "current dir",
+ "cwd",
+ "repo",
+ "repository",
+ "codebase",
+ "project",
+ "folder",
+ "filesystem",
+ "file tree",
+ "files",
+ "path",
+ )
+
+ user_text = (user_message or "").strip().lower()
+ user_targets_workspace = (
+ any(marker in user_text for marker in workspace_markers)
+ or "~/" in user_text
+ or "/" in user_text
+ )
+ assistant_mentions_action = any(marker in assistant_text for marker in action_markers)
+ assistant_targets_workspace = any(
+ marker in assistant_text for marker in workspace_markers
+ )
+ return (user_targets_workspace or assistant_targets_workspace) and assistant_mentions_action
def _extract_reasoning(self, assistant_message) -> Optional[str]:
@@ -1261,6 +1349,577 @@ class AIAgent:
if self._memory_store:
self._memory_store.load_from_disk()
+ def _responses_tools(self, tools: Optional[List[Dict[str, Any]]] = None) -> Optional[List[Dict[str, Any]]]:
+ """Convert chat-completions tool schemas to Responses function-tool schemas."""
+ source_tools = tools if tools is not None else self.tools
+ if not source_tools:
+ return None
+
+ converted: List[Dict[str, Any]] = []
+ for item in source_tools:
+ fn = item.get("function", {}) if isinstance(item, dict) else {}
+ name = fn.get("name")
+ if not isinstance(name, str) or not name.strip():
+ continue
+ converted.append({
+ "type": "function",
+ "name": name,
+ "description": fn.get("description", ""),
+ "strict": False,
+ "parameters": fn.get("parameters", {"type": "object", "properties": {}}),
+ })
+ return converted or None
+
+ @staticmethod
+ def _split_responses_tool_id(raw_id: Any) -> tuple[Optional[str], Optional[str]]:
+ """Split a stored tool id into (call_id, response_item_id)."""
+ if not isinstance(raw_id, str):
+ return None, None
+ value = raw_id.strip()
+ if not value:
+ return None, None
+ if "|" in value:
+ call_id, response_item_id = value.split("|", 1)
+ call_id = call_id.strip() or None
+ response_item_id = response_item_id.strip() or None
+ return call_id, response_item_id
+ if value.startswith("fc_"):
+ return None, value
+ return value, None
+
+ def _derive_responses_function_call_id(
+ self,
+ call_id: str,
+ response_item_id: Optional[str] = None,
+ ) -> str:
+ """Build a valid Responses `function_call.id` (must start with `fc_`)."""
+ if isinstance(response_item_id, str):
+ candidate = response_item_id.strip()
+ if candidate.startswith("fc_"):
+ return candidate
+
+ source = (call_id or "").strip()
+ if source.startswith("fc_"):
+ return source
+ if source.startswith("call_") and len(source) > len("call_"):
+ return f"fc_{source[len('call_'):]}"
+
+ sanitized = re.sub(r"[^A-Za-z0-9_-]", "", source)
+ if sanitized.startswith("fc_"):
+ return sanitized
+ if sanitized.startswith("call_") and len(sanitized) > len("call_"):
+ return f"fc_{sanitized[len('call_'):]}"
+ if sanitized:
+ return f"fc_{sanitized[:48]}"
+
+ seed = source or str(response_item_id or "") or uuid.uuid4().hex
+ digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:24]
+ return f"fc_{digest}"
+
+ def _chat_messages_to_responses_input(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Convert internal chat-style messages to Responses input items."""
+ items: List[Dict[str, Any]] = []
+
+ for msg in messages:
+ if not isinstance(msg, dict):
+ continue
+ role = msg.get("role")
+ if role == "system":
+ continue
+
+ if role in {"user", "assistant"}:
+ content = msg.get("content", "")
+ content_text = str(content) if content is not None else ""
+
+ if role == "assistant":
+ if content_text.strip():
+ items.append({"role": "assistant", "content": content_text})
+
+ tool_calls = msg.get("tool_calls")
+ if isinstance(tool_calls, list):
+ for tc in tool_calls:
+ if not isinstance(tc, dict):
+ continue
+ fn = tc.get("function", {})
+ fn_name = fn.get("name")
+ if not isinstance(fn_name, str) or not fn_name.strip():
+ continue
+
+ embedded_call_id, embedded_response_item_id = self._split_responses_tool_id(
+ tc.get("id")
+ )
+ call_id = tc.get("call_id")
+ if not isinstance(call_id, str) or not call_id.strip():
+ call_id = embedded_call_id
+ if not isinstance(call_id, str) or not call_id.strip():
+ if (
+ isinstance(embedded_response_item_id, str)
+ and embedded_response_item_id.startswith("fc_")
+ and len(embedded_response_item_id) > len("fc_")
+ ):
+ call_id = f"call_{embedded_response_item_id[len('fc_'):]}"
+ else:
+ call_id = f"call_{uuid.uuid4().hex[:12]}"
+ call_id = call_id.strip()
+
+ arguments = fn.get("arguments", "{}")
+ if isinstance(arguments, dict):
+ arguments = json.dumps(arguments, ensure_ascii=False)
+ elif not isinstance(arguments, str):
+ arguments = str(arguments)
+ arguments = arguments.strip() or "{}"
+
+ items.append({
+ "type": "function_call",
+ "call_id": call_id,
+ "name": fn_name,
+ "arguments": arguments,
+ })
+ continue
+
+ items.append({"role": role, "content": content_text})
+ continue
+
+ if role == "tool":
+ raw_tool_call_id = msg.get("tool_call_id")
+ call_id, _ = self._split_responses_tool_id(raw_tool_call_id)
+ if not isinstance(call_id, str) or not call_id.strip():
+ if isinstance(raw_tool_call_id, str) and raw_tool_call_id.strip():
+ call_id = raw_tool_call_id.strip()
+ if not isinstance(call_id, str) or not call_id.strip():
+ continue
+ items.append({
+ "type": "function_call_output",
+ "call_id": call_id,
+ "output": str(msg.get("content", "") or ""),
+ })
+
+ return items
+
+ def _preflight_codex_input_items(self, raw_items: Any) -> List[Dict[str, Any]]:
+ if not isinstance(raw_items, list):
+ raise ValueError("Codex Responses input must be a list of input items.")
+
+ normalized: List[Dict[str, Any]] = []
+ for idx, item in enumerate(raw_items):
+ if not isinstance(item, dict):
+ raise ValueError(f"Codex Responses input[{idx}] must be an object.")
+
+ item_type = item.get("type")
+ if item_type == "function_call":
+ call_id = item.get("call_id")
+ name = item.get("name")
+ if not isinstance(call_id, str) or not call_id.strip():
+ raise ValueError(f"Codex Responses input[{idx}] function_call is missing call_id.")
+ if not isinstance(name, str) or not name.strip():
+ raise ValueError(f"Codex Responses input[{idx}] function_call is missing name.")
+
+ arguments = item.get("arguments", "{}")
+ if isinstance(arguments, dict):
+ arguments = json.dumps(arguments, ensure_ascii=False)
+ elif not isinstance(arguments, str):
+ arguments = str(arguments)
+ arguments = arguments.strip() or "{}"
+
+ normalized.append(
+ {
+ "type": "function_call",
+ "call_id": call_id.strip(),
+ "name": name.strip(),
+ "arguments": arguments,
+ }
+ )
+ continue
+
+ if item_type == "function_call_output":
+ call_id = item.get("call_id")
+ if not isinstance(call_id, str) or not call_id.strip():
+ raise ValueError(f"Codex Responses input[{idx}] function_call_output is missing call_id.")
+ output = item.get("output", "")
+ if output is None:
+ output = ""
+ if not isinstance(output, str):
+ output = str(output)
+
+ normalized.append(
+ {
+ "type": "function_call_output",
+ "call_id": call_id.strip(),
+ "output": output,
+ }
+ )
+ continue
+
+ role = item.get("role")
+ if role in {"user", "assistant"}:
+ content = item.get("content", "")
+ if content is None:
+ content = ""
+ if not isinstance(content, str):
+ content = str(content)
+
+ normalized.append({"role": role, "content": content})
+ continue
+
+ raise ValueError(
+ f"Codex Responses input[{idx}] has unsupported item shape (type={item_type!r}, role={role!r})."
+ )
+
+ return normalized
+
+ def _preflight_codex_api_kwargs(
+ self,
+ api_kwargs: Any,
+ *,
+ allow_stream: bool = False,
+ ) -> Dict[str, Any]:
+ if not isinstance(api_kwargs, dict):
+ raise ValueError("Codex Responses request must be a dict.")
+
+ required = {"model", "instructions", "input"}
+ missing = [key for key in required if key not in api_kwargs]
+ if missing:
+ raise ValueError(f"Codex Responses request missing required field(s): {', '.join(sorted(missing))}.")
+
+ model = api_kwargs.get("model")
+ if not isinstance(model, str) or not model.strip():
+ raise ValueError("Codex Responses request 'model' must be a non-empty string.")
+ model = model.strip()
+
+ instructions = api_kwargs.get("instructions")
+ if instructions is None:
+ instructions = ""
+ if not isinstance(instructions, str):
+ instructions = str(instructions)
+ instructions = instructions.strip() or DEFAULT_AGENT_IDENTITY
+
+ normalized_input = self._preflight_codex_input_items(api_kwargs.get("input"))
+
+ tools = api_kwargs.get("tools")
+ normalized_tools = None
+ if tools is not None:
+ if not isinstance(tools, list):
+ raise ValueError("Codex Responses request 'tools' must be a list when provided.")
+ normalized_tools = []
+ for idx, tool in enumerate(tools):
+ if not isinstance(tool, dict):
+ raise ValueError(f"Codex Responses tools[{idx}] must be an object.")
+ if tool.get("type") != "function":
+ raise ValueError(f"Codex Responses tools[{idx}] has unsupported type {tool.get('type')!r}.")
+
+ name = tool.get("name")
+ parameters = tool.get("parameters")
+ if not isinstance(name, str) or not name.strip():
+ raise ValueError(f"Codex Responses tools[{idx}] is missing a valid name.")
+ if not isinstance(parameters, dict):
+ raise ValueError(f"Codex Responses tools[{idx}] is missing valid parameters.")
+
+ description = tool.get("description", "")
+ if description is None:
+ description = ""
+ if not isinstance(description, str):
+ description = str(description)
+
+ strict = tool.get("strict", False)
+ if not isinstance(strict, bool):
+ strict = bool(strict)
+
+ normalized_tools.append(
+ {
+ "type": "function",
+ "name": name.strip(),
+ "description": description,
+ "strict": strict,
+ "parameters": parameters,
+ }
+ )
+
+ store = api_kwargs.get("store", False)
+ if store is not False:
+ raise ValueError("Codex Responses contract requires 'store' to be false.")
+
+ allowed_keys = {"model", "instructions", "input", "tools", "store"}
+ normalized: Dict[str, Any] = {
+ "model": model,
+ "instructions": instructions,
+ "input": normalized_input,
+ "tools": normalized_tools,
+ "store": False,
+ }
+
+ if allow_stream:
+ stream = api_kwargs.get("stream")
+ if stream is not None and stream is not True:
+ raise ValueError("Codex Responses 'stream' must be true when set.")
+ if stream is True:
+ normalized["stream"] = True
+ allowed_keys.add("stream")
+ elif "stream" in api_kwargs:
+ raise ValueError("Codex Responses stream flag is only allowed in fallback streaming requests.")
+
+ unexpected = sorted(key for key in api_kwargs.keys() if key not in allowed_keys)
+ if unexpected:
+ raise ValueError(
+ f"Codex Responses request has unsupported field(s): {', '.join(unexpected)}."
+ )
+
+ return normalized
+
+ def _extract_responses_message_text(self, item: Any) -> str:
+ """Extract assistant text from a Responses message output item."""
+ content = getattr(item, "content", None)
+ if not isinstance(content, list):
+ return ""
+
+ chunks: List[str] = []
+ for part in content:
+ ptype = getattr(part, "type", None)
+ if ptype not in {"output_text", "text"}:
+ continue
+ text = getattr(part, "text", None)
+ if isinstance(text, str) and text:
+ chunks.append(text)
+ return "".join(chunks).strip()
+
+ def _extract_responses_reasoning_text(self, item: Any) -> str:
+ """Extract a compact reasoning text from a Responses reasoning item."""
+ summary = getattr(item, "summary", None)
+ if isinstance(summary, list):
+ chunks: List[str] = []
+ for part in summary:
+ text = getattr(part, "text", None)
+ if isinstance(text, str) and text:
+ chunks.append(text)
+ if chunks:
+ return "\n".join(chunks).strip()
+ text = getattr(item, "text", None)
+ if isinstance(text, str) and text:
+ return text.strip()
+ return ""
+
+ def _normalize_codex_response(self, response: Any) -> tuple[Any, str]:
+ """Normalize a Responses API object to an assistant_message-like object."""
+ output = getattr(response, "output", None)
+ if not isinstance(output, list) or not output:
+ raise RuntimeError("Responses API returned no output items")
+
+ response_status = getattr(response, "status", None)
+ if isinstance(response_status, str):
+ response_status = response_status.strip().lower()
+ else:
+ response_status = None
+
+ if response_status in {"failed", "cancelled"}:
+ error_obj = getattr(response, "error", None)
+ if isinstance(error_obj, dict):
+ error_msg = error_obj.get("message") or str(error_obj)
+ else:
+ error_msg = str(error_obj) if error_obj else f"Responses API returned status '{response_status}'"
+ raise RuntimeError(error_msg)
+
+ content_parts: List[str] = []
+ reasoning_parts: List[str] = []
+ tool_calls: List[Any] = []
+ has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"}
+ saw_commentary_phase = False
+ saw_final_answer_phase = False
+
+ for item in output:
+ item_type = getattr(item, "type", None)
+ item_status = getattr(item, "status", None)
+ if isinstance(item_status, str):
+ item_status = item_status.strip().lower()
+ else:
+ item_status = None
+
+ if item_status in {"queued", "in_progress", "incomplete"}:
+ has_incomplete_items = True
+
+ if item_type == "message":
+ item_phase = getattr(item, "phase", None)
+ if isinstance(item_phase, str):
+ normalized_phase = item_phase.strip().lower()
+ if normalized_phase in {"commentary", "analysis"}:
+ saw_commentary_phase = True
+ elif normalized_phase in {"final_answer", "final"}:
+ saw_final_answer_phase = True
+ message_text = self._extract_responses_message_text(item)
+ if message_text:
+ content_parts.append(message_text)
+ elif item_type == "reasoning":
+ reasoning_text = self._extract_responses_reasoning_text(item)
+ if reasoning_text:
+ reasoning_parts.append(reasoning_text)
+ elif item_type == "function_call":
+ if item_status in {"queued", "in_progress", "incomplete"}:
+ continue
+ fn_name = getattr(item, "name", "") or ""
+ arguments = getattr(item, "arguments", "{}")
+ if not isinstance(arguments, str):
+ arguments = str(arguments)
+ raw_call_id = getattr(item, "call_id", None)
+ raw_item_id = getattr(item, "id", None)
+ embedded_call_id, _ = self._split_responses_tool_id(raw_item_id)
+ call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
+ if not isinstance(call_id, str) or not call_id.strip():
+ call_id = f"call_{uuid.uuid4().hex[:12]}"
+ call_id = call_id.strip()
+ response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
+ response_item_id = self._derive_responses_function_call_id(call_id, response_item_id)
+ tool_calls.append(SimpleNamespace(
+ id=call_id,
+ call_id=call_id,
+ response_item_id=response_item_id,
+ type="function",
+ function=SimpleNamespace(name=fn_name, arguments=arguments),
+ ))
+ elif item_type == "custom_tool_call":
+ fn_name = getattr(item, "name", "") or ""
+ arguments = getattr(item, "input", "{}")
+ if not isinstance(arguments, str):
+ arguments = str(arguments)
+ raw_call_id = getattr(item, "call_id", None)
+ raw_item_id = getattr(item, "id", None)
+ embedded_call_id, _ = self._split_responses_tool_id(raw_item_id)
+ call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
+ if not isinstance(call_id, str) or not call_id.strip():
+ call_id = f"call_{uuid.uuid4().hex[:12]}"
+ call_id = call_id.strip()
+ response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
+ response_item_id = self._derive_responses_function_call_id(call_id, response_item_id)
+ tool_calls.append(SimpleNamespace(
+ id=call_id,
+ call_id=call_id,
+ response_item_id=response_item_id,
+ type="function",
+ function=SimpleNamespace(name=fn_name, arguments=arguments),
+ ))
+
+ final_text = "\n".join([p for p in content_parts if p]).strip()
+ if not final_text and hasattr(response, "output_text"):
+ out_text = getattr(response, "output_text", "")
+ if isinstance(out_text, str):
+ final_text = out_text.strip()
+
+ assistant_message = SimpleNamespace(
+ content=final_text,
+ tool_calls=tool_calls,
+ reasoning="\n\n".join(reasoning_parts).strip() if reasoning_parts else None,
+ reasoning_content=None,
+ reasoning_details=None,
+ )
+
+ if tool_calls:
+ finish_reason = "tool_calls"
+ elif has_incomplete_items or (saw_commentary_phase and not saw_final_answer_phase):
+ finish_reason = "incomplete"
+ else:
+ finish_reason = "stop"
+ return assistant_message, finish_reason
+
+ def _run_codex_stream(self, api_kwargs: dict):
+ """Execute one streaming Responses API request and return the final response."""
+ api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False)
+ max_stream_retries = 1
+ for attempt in range(max_stream_retries + 1):
+ try:
+ with self.client.responses.stream(**api_kwargs) as stream:
+ for _ in stream:
+ pass
+ return stream.get_final_response()
+ except RuntimeError as exc:
+ err_text = str(exc)
+ missing_completed = "response.completed" in err_text
+ if missing_completed and attempt < max_stream_retries:
+ logger.debug(
+ "Responses stream closed before completion (attempt %s/%s); retrying.",
+ attempt + 1,
+ max_stream_retries + 1,
+ )
+ continue
+ if missing_completed:
+ logger.debug(
+ "Responses stream did not emit response.completed; falling back to create(stream=True)."
+ )
+ return self._run_codex_create_stream_fallback(api_kwargs)
+ raise
+
+ def _run_codex_create_stream_fallback(self, api_kwargs: dict):
+ """Fallback path for stream completion edge cases on Codex-style Responses backends."""
+ fallback_kwargs = dict(api_kwargs)
+ fallback_kwargs["stream"] = True
+ fallback_kwargs = self._preflight_codex_api_kwargs(fallback_kwargs, allow_stream=True)
+ stream_or_response = self.client.responses.create(**fallback_kwargs)
+
+ # Compatibility shim for mocks or providers that still return a concrete response.
+ if hasattr(stream_or_response, "output"):
+ return stream_or_response
+ if not hasattr(stream_or_response, "__iter__"):
+ return stream_or_response
+
+ terminal_response = None
+ try:
+ for event in stream_or_response:
+ event_type = getattr(event, "type", None)
+ if not event_type and isinstance(event, dict):
+ event_type = event.get("type")
+ if event_type not in {"response.completed", "response.incomplete", "response.failed"}:
+ continue
+
+ terminal_response = getattr(event, "response", None)
+ if terminal_response is None and isinstance(event, dict):
+ terminal_response = event.get("response")
+ if terminal_response is not None:
+ return terminal_response
+ finally:
+ close_fn = getattr(stream_or_response, "close", None)
+ if callable(close_fn):
+ try:
+ close_fn()
+ except Exception:
+ pass
+
+ if terminal_response is not None:
+ return terminal_response
+ raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.")
+
+ def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
+ if self.api_mode != "codex_responses" or self.provider != "openai-codex":
+ return False
+
+ try:
+ from hermes_cli.auth import resolve_codex_runtime_credentials
+
+ creds = resolve_codex_runtime_credentials(force_refresh=force)
+ except Exception as exc:
+ logger.debug("Codex credential refresh failed: %s", exc)
+ return False
+
+ api_key = creds.get("api_key")
+ base_url = creds.get("base_url")
+ if not isinstance(api_key, str) or not api_key.strip():
+ return False
+ if not isinstance(base_url, str) or not base_url.strip():
+ return False
+
+ self.api_key = api_key.strip()
+ self.base_url = base_url.strip().rstrip("/")
+ self._client_kwargs["api_key"] = self.api_key
+ self._client_kwargs["base_url"] = self.base_url
+
+ try:
+ self.client.close()
+ except Exception:
+ pass
+
+ try:
+ self.client = OpenAI(**self._client_kwargs)
+ except Exception as exc:
+ logger.warning("Failed to rebuild OpenAI client after Codex refresh: %s", exc)
+ return False
+
+ return True
+
def _interruptible_api_call(self, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
@@ -1274,7 +1933,10 @@ class AIAgent:
def _call():
try:
- result["response"] = self.client.chat.completions.create(**api_kwargs)
+ if self.api_mode == "codex_responses":
+ result["response"] = self._run_codex_stream(api_kwargs)
+ else:
+ result["response"] = self.client.chat.completions.create(**api_kwargs)
except Exception as e:
result["error"] = e
@@ -1299,7 +1961,24 @@ class AIAgent:
return result["response"]
def _build_api_kwargs(self, api_messages: list) -> dict:
- """Build the keyword arguments dict for the chat completions API call."""
+ """Build the keyword arguments dict for the active API mode."""
+ if self.api_mode == "codex_responses":
+ instructions = ""
+ payload_messages = api_messages
+ if api_messages and api_messages[0].get("role") == "system":
+ instructions = str(api_messages[0].get("content") or "").strip()
+ payload_messages = api_messages[1:]
+ if not instructions:
+ instructions = DEFAULT_AGENT_IDENTITY
+
+ return {
+ "model": self.model,
+ "instructions": instructions,
+ "input": self._chat_messages_to_responses_input(payload_messages),
+ "tools": self._responses_tools(),
+ "store": False,
+ }
+
provider_preferences = {}
if self.providers_allowed:
provider_preferences["only"] = self.providers_allowed
@@ -1373,17 +2052,42 @@ class AIAgent:
]
if assistant_message.tool_calls:
- msg["tool_calls"] = [
- {
- "id": tool_call.id,
+ tool_calls = []
+ for tool_call in assistant_message.tool_calls:
+ raw_id = getattr(tool_call, "id", None)
+ call_id = getattr(tool_call, "call_id", None)
+ if not isinstance(call_id, str) or not call_id.strip():
+ embedded_call_id, _ = self._split_responses_tool_id(raw_id)
+ call_id = embedded_call_id
+ if not isinstance(call_id, str) or not call_id.strip():
+ if isinstance(raw_id, str) and raw_id.strip():
+ call_id = raw_id.strip()
+ else:
+ call_id = f"call_{uuid.uuid4().hex[:12]}"
+ call_id = call_id.strip()
+
+ response_item_id = getattr(tool_call, "response_item_id", None)
+ if not isinstance(response_item_id, str) or not response_item_id.strip():
+ _, embedded_response_item_id = self._split_responses_tool_id(raw_id)
+ response_item_id = embedded_response_item_id
+
+ response_item_id = self._derive_responses_function_call_id(
+ call_id,
+ response_item_id if isinstance(response_item_id, str) else None,
+ )
+
+ tool_calls.append({
+ "id": call_id,
+ "call_id": call_id,
+ "response_item_id": response_item_id,
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments
- }
+ },
}
- for tool_call in assistant_message.tool_calls
- ]
+ )
+ msg["tool_calls"] = tool_calls
return msg
@@ -1804,9 +2508,29 @@ class AIAgent:
final_response = summary_response.choices[0].message.content
if "" in final_response:
final_response = re.sub(r'.*?\s*', '', final_response, flags=re.DOTALL).strip()
- messages.append({"role": "assistant", "content": final_response})
+ if final_response:
+ messages.append({"role": "assistant", "content": final_response})
+ else:
+ final_response = "I reached the iteration limit and couldn't generate a summary."
else:
- final_response = "I reached the iteration limit and couldn't generate a summary."
+ summary_kwargs = {
+ "model": self.model,
+ "messages": api_messages,
+ }
+ if self.max_tokens is not None:
+ summary_kwargs["max_tokens"] = self.max_tokens
+ if summary_extra_body:
+ summary_kwargs["extra_body"] = summary_extra_body
+
+ summary_response = self.client.chat.completions.create(**summary_kwargs)
+
+ if summary_response.choices and summary_response.choices[0].message.content:
+ final_response = summary_response.choices[0].message.content
+ if "" in final_response:
+ final_response = re.sub(r'.*?\s*', '', final_response, flags=re.DOTALL).strip()
+ messages.append({"role": "assistant", "content": final_response})
+ else:
+ final_response = "I reached the iteration limit and couldn't generate a summary."
except Exception as e:
logging.warning(f"Failed to get summary response: {e}")
@@ -1924,6 +2648,7 @@ class AIAgent:
api_call_count = 0
final_response = None
interrupted = False
+ codex_ack_continuations = 0
# Clear any stale interrupt state at start
self.clear_interrupt()
@@ -2038,10 +2763,15 @@ class AIAgent:
api_start_time = time.time()
retry_count = 0
max_retries = 6 # Increased to allow longer backoff periods
+ codex_auth_retry_attempted = False
+
+ finish_reason = "stop"
while retry_count < max_retries:
try:
api_kwargs = self._build_api_kwargs(api_messages)
+ if self.api_mode == "codex_responses":
+ api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False)
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
@@ -2064,8 +2794,33 @@ class AIAgent:
resp_model = getattr(response, 'model', 'N/A') if response else 'N/A'
logging.debug(f"API Response received - Model: {resp_model}, Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}")
- # Validate response has valid choices before proceeding
- if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
+ # Validate response shape before proceeding
+ response_invalid = False
+ error_details = []
+ if self.api_mode == "codex_responses":
+ output_items = getattr(response, "output", None) if response is not None else None
+ if response is None:
+ response_invalid = True
+ error_details.append("response is None")
+ elif not isinstance(output_items, list):
+ response_invalid = True
+ error_details.append("response.output is not a list")
+ elif len(output_items) == 0:
+ response_invalid = True
+ error_details.append("response.output is empty")
+ else:
+ if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
+ response_invalid = True
+ if response is None:
+ error_details.append("response is None")
+ elif not hasattr(response, 'choices'):
+ error_details.append("response has no 'choices' attribute")
+ elif response.choices is None:
+ error_details.append("response.choices is None")
+ else:
+ error_details.append("response.choices is empty")
+
+ if response_invalid:
# Stop spinner before printing error messages
if thinking_spinner:
thinking_spinner.stop(f"(´;ω;`) oops, retrying...")
@@ -2073,15 +2828,6 @@ class AIAgent:
# This is often rate limiting or provider returning malformed response
retry_count += 1
- error_details = []
- if response is None:
- error_details.append("response is None")
- elif not hasattr(response, 'choices'):
- error_details.append("response has no 'choices' attribute")
- elif response.choices is None:
- error_details.append("response.choices is None")
- else:
- error_details.append("response.choices is empty")
# Check for error field in response (some providers include this)
error_msg = "Unknown"
@@ -2118,7 +2864,7 @@ class AIAgent:
"messages": messages,
"completed": False,
"api_calls": api_call_count,
- "error": f"Invalid API response (choices is None/empty). Likely rate limited by provider.",
+ "error": "Invalid API response shape. Likely rate limited or malformed provider response.",
"failed": True # Mark as failure for filtering
}
@@ -2145,7 +2891,20 @@ class AIAgent:
continue # Retry the API call
# Check finish_reason before proceeding
- finish_reason = response.choices[0].finish_reason
+ if self.api_mode == "codex_responses":
+ status = getattr(response, "status", None)
+ incomplete_details = getattr(response, "incomplete_details", None)
+ incomplete_reason = None
+ if isinstance(incomplete_details, dict):
+ incomplete_reason = incomplete_details.get("reason")
+ else:
+ incomplete_reason = getattr(incomplete_details, "reason", None)
+ if status == "incomplete" and incomplete_reason in {"max_output_tokens", "length"}:
+ finish_reason = "length"
+ else:
+ finish_reason = "stop"
+ else:
+ finish_reason = response.choices[0].finish_reason
# Handle "length" finish_reason - response was truncated
if finish_reason == "length":
@@ -2182,10 +2941,21 @@ class AIAgent:
# Track actual token usage from response for context management
if hasattr(response, 'usage') and response.usage:
+ if self.api_mode == "codex_responses":
+ prompt_tokens = getattr(response.usage, 'input_tokens', 0) or 0
+ completion_tokens = getattr(response.usage, 'output_tokens', 0) or 0
+ total_tokens = (
+ getattr(response.usage, 'total_tokens', None)
+ or (prompt_tokens + completion_tokens)
+ )
+ else:
+ prompt_tokens = getattr(response.usage, 'prompt_tokens', 0) or 0
+ completion_tokens = getattr(response.usage, 'completion_tokens', 0) or 0
+ total_tokens = getattr(response.usage, 'total_tokens', 0) or 0
usage_dict = {
- "prompt_tokens": getattr(response.usage, 'prompt_tokens', 0),
- "completion_tokens": getattr(response.usage, 'completion_tokens', 0),
- "total_tokens": getattr(response.usage, 'total_tokens', 0),
+ "prompt_tokens": prompt_tokens,
+ "completion_tokens": completion_tokens,
+ "total_tokens": total_tokens,
}
self.context_compressor.update_from_response(usage_dict)
@@ -2219,6 +2989,18 @@ class AIAgent:
if thinking_spinner:
thinking_spinner.stop(f"(╥_╥) error, retrying...")
thinking_spinner = None
+
+ status_code = getattr(api_error, "status_code", None)
+ if (
+ self.api_mode == "codex_responses"
+ and self.provider == "openai-codex"
+ and status_code == 401
+ and not codex_auth_retry_attempted
+ ):
+ codex_auth_retry_attempted = True
+ if self._try_refresh_codex_client_credentials(force=True):
+ print(f"{self.log_prefix}🔐 Codex auth refreshed after 401. Retrying request...")
+ continue
retry_count += 1
elapsed_time = time.time() - api_start_time
@@ -2375,7 +3157,10 @@ class AIAgent:
break
try:
- assistant_message = response.choices[0].message
+ if self.api_mode == "codex_responses":
+ assistant_message, finish_reason = self._normalize_codex_response(response)
+ else:
+ assistant_message = response.choices[0].message
# Handle assistant response
if assistant_message.content and not self.quiet_mode:
@@ -2415,6 +3200,48 @@ class AIAgent:
# Reset incomplete scratchpad counter on clean response
if hasattr(self, '_incomplete_scratchpad_retries'):
self._incomplete_scratchpad_retries = 0
+
+ if self.api_mode == "codex_responses" and finish_reason == "incomplete":
+ if not hasattr(self, "_codex_incomplete_retries"):
+ self._codex_incomplete_retries = 0
+ self._codex_incomplete_retries += 1
+
+ interim_msg = self._build_assistant_message(assistant_message, finish_reason)
+ interim_has_content = bool(interim_msg.get("content", "").strip())
+ interim_has_reasoning = bool(interim_msg.get("reasoning", "").strip()) if isinstance(interim_msg.get("reasoning"), str) else False
+
+ if interim_has_content or interim_has_reasoning:
+ last_msg = messages[-1] if messages else None
+ duplicate_interim = (
+ isinstance(last_msg, dict)
+ and last_msg.get("role") == "assistant"
+ and last_msg.get("finish_reason") == "incomplete"
+ and (last_msg.get("content") or "") == (interim_msg.get("content") or "")
+ and (last_msg.get("reasoning") or "") == (interim_msg.get("reasoning") or "")
+ )
+ if not duplicate_interim:
+ messages.append(interim_msg)
+ self._log_msg_to_db(interim_msg)
+
+ if self._codex_incomplete_retries < 3:
+ if not self.quiet_mode:
+ print(f"{self.log_prefix}↻ Codex response incomplete; continuing turn ({self._codex_incomplete_retries}/3)")
+ self._session_messages = messages
+ self._save_session_log(messages)
+ continue
+
+ self._codex_incomplete_retries = 0
+ self._persist_session(messages, conversation_history)
+ return {
+ "final_response": None,
+ "messages": messages,
+ "api_calls": api_call_count,
+ "completed": False,
+ "partial": True,
+ "error": "Codex response remained incomplete after 3 continuation attempts",
+ }
+ elif hasattr(self, "_codex_incomplete_retries"):
+ self._codex_incomplete_retries = 0
# Check for tool calls
if assistant_message.tool_calls:
@@ -2616,6 +3443,36 @@ class AIAgent:
# Reset retry counter on successful content
if hasattr(self, '_empty_content_retries'):
self._empty_content_retries = 0
+
+ if (
+ self.api_mode == "codex_responses"
+ and self.valid_tool_names
+ and codex_ack_continuations < 2
+ and self._looks_like_codex_intermediate_ack(
+ user_message=user_message,
+ assistant_content=final_response,
+ messages=messages,
+ )
+ ):
+ codex_ack_continuations += 1
+ interim_msg = self._build_assistant_message(assistant_message, "incomplete")
+ messages.append(interim_msg)
+ self._log_msg_to_db(interim_msg)
+
+ continue_msg = {
+ "role": "user",
+ "content": (
+ "[System: Continue now. Execute the required tool calls and only "
+ "send your final answer after completing the task.]"
+ ),
+ }
+ messages.append(continue_msg)
+ self._log_msg_to_db(continue_msg)
+ self._session_messages = messages
+ self._save_session_log(messages)
+ continue
+
+ codex_ack_continuations = 0
# Strip blocks from user-facing response (keep raw in messages for trajectory)
final_response = self._strip_think_blocks(final_response).strip()
diff --git a/tests/test_auth_codex_provider.py b/tests/test_auth_codex_provider.py
new file mode 100644
index 000000000..de490754c
--- /dev/null
+++ b/tests/test_auth_codex_provider.py
@@ -0,0 +1,219 @@
+import json
+import time
+import base64
+from contextlib import contextmanager
+from pathlib import Path
+from types import SimpleNamespace
+
+import pytest
+import yaml
+
+from hermes_cli.auth import (
+ AuthError,
+ DEFAULT_CODEX_BASE_URL,
+ PROVIDER_REGISTRY,
+ _persist_codex_auth_payload,
+ _login_openai_codex,
+ login_command,
+ get_codex_auth_status,
+ get_provider_auth_state,
+ read_codex_auth_file,
+ resolve_codex_runtime_credentials,
+ resolve_provider,
+)
+
+
+def _write_codex_auth(codex_home: Path, *, access_token: str = "access", refresh_token: str = "refresh") -> Path:
+ codex_home.mkdir(parents=True, exist_ok=True)
+ auth_file = codex_home / "auth.json"
+ auth_file.write_text(
+ json.dumps(
+ {
+ "auth_mode": "oauth",
+ "last_refresh": "2026-02-26T00:00:00Z",
+ "tokens": {
+ "access_token": access_token,
+ "refresh_token": refresh_token,
+ },
+ }
+ )
+ )
+ return auth_file
+
+
+def _jwt_with_exp(exp_epoch: int) -> str:
+ payload = {"exp": exp_epoch}
+ encoded = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8")).rstrip(b"=").decode("utf-8")
+ return f"h.{encoded}.s"
+
+
+def test_read_codex_auth_file_success(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ auth_file = _write_codex_auth(codex_home)
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ payload = read_codex_auth_file()
+
+ assert payload["auth_path"] == auth_file
+ assert payload["tokens"]["access_token"] == "access"
+ assert payload["tokens"]["refresh_token"] == "refresh"
+
+
+def test_resolve_codex_runtime_credentials_missing_access_token(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ _write_codex_auth(codex_home, access_token="")
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ with pytest.raises(AuthError) as exc:
+ resolve_codex_runtime_credentials()
+
+ assert exc.value.code == "codex_auth_missing_access_token"
+ assert exc.value.relogin_required is True
+
+
+def test_resolve_codex_runtime_credentials_refreshes_expiring_token(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ expiring_token = _jwt_with_exp(int(time.time()) - 10)
+ _write_codex_auth(codex_home, access_token=expiring_token, refresh_token="refresh-old")
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ called = {"count": 0}
+
+ def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
+ called["count"] += 1
+ assert auth_path == codex_home / "auth.json"
+ assert lock_held is True
+ return {"access_token": "access-new", "refresh_token": "refresh-new"}
+
+ monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
+
+ resolved = resolve_codex_runtime_credentials()
+
+ assert called["count"] == 1
+ assert resolved["api_key"] == "access-new"
+
+
+def test_resolve_codex_runtime_credentials_force_refresh(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ _write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ called = {"count": 0}
+
+ def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
+ called["count"] += 1
+ assert lock_held is True
+ return {"access_token": "access-forced", "refresh_token": "refresh-new"}
+
+ monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
+
+ resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
+
+ assert called["count"] == 1
+ assert resolved["api_key"] == "access-forced"
+
+
+def test_resolve_codex_runtime_credentials_uses_file_lock_on_refresh(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ _write_codex_auth(codex_home, access_token="access-current", refresh_token="refresh-old")
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ lock_calls = {"enter": 0, "exit": 0}
+
+ @contextmanager
+ def _fake_lock(auth_path, timeout_seconds=15.0):
+ assert auth_path == codex_home / "auth.json"
+ lock_calls["enter"] += 1
+ try:
+ yield
+ finally:
+ lock_calls["exit"] += 1
+
+ refresh_calls = {"count": 0}
+
+ def _fake_refresh(*, payload, auth_path, timeout_seconds, lock_held=False):
+ refresh_calls["count"] += 1
+ assert lock_held is True
+ return {"access_token": "access-updated", "refresh_token": "refresh-updated"}
+
+ monkeypatch.setattr("hermes_cli.auth._codex_auth_file_lock", _fake_lock)
+ monkeypatch.setattr("hermes_cli.auth._refresh_codex_auth_tokens", _fake_refresh)
+
+ resolved = resolve_codex_runtime_credentials(force_refresh=True, refresh_if_expiring=False)
+
+ assert refresh_calls["count"] == 1
+ assert lock_calls["enter"] == 1
+ assert lock_calls["exit"] == 1
+ assert resolved["api_key"] == "access-updated"
+
+
+def test_resolve_provider_explicit_codex_does_not_fallback(monkeypatch):
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+ assert resolve_provider("openai-codex") == "openai-codex"
+
+
+def test_persist_codex_auth_payload_writes_atomically(tmp_path):
+ auth_path = tmp_path / "auth.json"
+ auth_path.write_text('{"stale":true}\n')
+ payload = {
+ "auth_mode": "oauth",
+ "tokens": {
+ "access_token": "next-access",
+ "refresh_token": "next-refresh",
+ },
+ "last_refresh": "2026-02-26T00:00:00Z",
+ }
+
+ _persist_codex_auth_payload(auth_path, payload)
+
+ stored = json.loads(auth_path.read_text())
+ assert stored == payload
+ assert list(tmp_path.glob(".auth.json.*.tmp")) == []
+
+
+def test_get_codex_auth_status_not_logged_in(tmp_path, monkeypatch):
+ monkeypatch.setenv("CODEX_HOME", str(tmp_path / "missing-codex-home"))
+ status = get_codex_auth_status()
+ assert status["logged_in"] is False
+ assert "error" in status
+
+
+def test_login_openai_codex_persists_provider_state(tmp_path, monkeypatch):
+ hermes_home = tmp_path / "hermes-home"
+ codex_home = tmp_path / "codex-home"
+ _write_codex_auth(codex_home)
+ monkeypatch.setenv("HERMES_HOME", str(hermes_home))
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+ monkeypatch.setattr("hermes_cli.auth.shutil.which", lambda _: "/usr/local/bin/codex")
+ monkeypatch.setattr("hermes_cli.auth.subprocess.run", lambda *a, **k: None)
+
+ _login_openai_codex(SimpleNamespace(), PROVIDER_REGISTRY["openai-codex"])
+
+ state = get_provider_auth_state("openai-codex")
+ assert state is not None
+ assert state["source"] == "codex-auth-json"
+ assert state["auth_file"].endswith("auth.json")
+
+ config_path = hermes_home / "config.yaml"
+ config = yaml.safe_load(config_path.read_text())
+ assert config["model"]["provider"] == "openai-codex"
+ assert config["model"]["base_url"] == DEFAULT_CODEX_BASE_URL
+
+
+def test_login_command_defaults_to_nous(monkeypatch):
+ calls = {"nous": 0, "codex": 0}
+
+ def _fake_nous(args, pconfig):
+ calls["nous"] += 1
+
+ def _fake_codex(args, pconfig):
+ calls["codex"] += 1
+
+ monkeypatch.setattr("hermes_cli.auth._login_nous", _fake_nous)
+ monkeypatch.setattr("hermes_cli.auth._login_openai_codex", _fake_codex)
+
+ login_command(SimpleNamespace())
+
+ assert calls["nous"] == 1
+ assert calls["codex"] == 0
diff --git a/tests/test_cli_provider_resolution.py b/tests/test_cli_provider_resolution.py
new file mode 100644
index 000000000..3c8fe14a5
--- /dev/null
+++ b/tests/test_cli_provider_resolution.py
@@ -0,0 +1,187 @@
+import importlib
+import sys
+import types
+from contextlib import nullcontext
+from types import SimpleNamespace
+
+from hermes_cli.auth import AuthError
+from hermes_cli import main as hermes_main
+
+
+def _install_prompt_toolkit_stubs():
+ class _Dummy:
+ def __init__(self, *args, **kwargs):
+ pass
+
+ class _Condition:
+ def __init__(self, func):
+ self.func = func
+
+ def __bool__(self):
+ return bool(self.func())
+
+ class _ANSI(str):
+ pass
+
+ root = types.ModuleType("prompt_toolkit")
+ history = types.ModuleType("prompt_toolkit.history")
+ styles = types.ModuleType("prompt_toolkit.styles")
+ patch_stdout = types.ModuleType("prompt_toolkit.patch_stdout")
+ application = types.ModuleType("prompt_toolkit.application")
+ layout = types.ModuleType("prompt_toolkit.layout")
+ processors = types.ModuleType("prompt_toolkit.layout.processors")
+ filters = types.ModuleType("prompt_toolkit.filters")
+ dimension = types.ModuleType("prompt_toolkit.layout.dimension")
+ menus = types.ModuleType("prompt_toolkit.layout.menus")
+ widgets = types.ModuleType("prompt_toolkit.widgets")
+ key_binding = types.ModuleType("prompt_toolkit.key_binding")
+ completion = types.ModuleType("prompt_toolkit.completion")
+ formatted_text = types.ModuleType("prompt_toolkit.formatted_text")
+
+ history.FileHistory = _Dummy
+ styles.Style = _Dummy
+ patch_stdout.patch_stdout = lambda *args, **kwargs: nullcontext()
+ application.Application = _Dummy
+ layout.Layout = _Dummy
+ layout.HSplit = _Dummy
+ layout.Window = _Dummy
+ layout.FormattedTextControl = _Dummy
+ layout.ConditionalContainer = _Dummy
+ processors.Processor = _Dummy
+ processors.Transformation = _Dummy
+ processors.PasswordProcessor = _Dummy
+ processors.ConditionalProcessor = _Dummy
+ filters.Condition = _Condition
+ dimension.Dimension = _Dummy
+ menus.CompletionsMenu = _Dummy
+ widgets.TextArea = _Dummy
+ key_binding.KeyBindings = _Dummy
+ completion.Completer = _Dummy
+ completion.Completion = _Dummy
+ formatted_text.ANSI = _ANSI
+ root.print_formatted_text = lambda *args, **kwargs: None
+
+ sys.modules.setdefault("prompt_toolkit", root)
+ sys.modules.setdefault("prompt_toolkit.history", history)
+ sys.modules.setdefault("prompt_toolkit.styles", styles)
+ sys.modules.setdefault("prompt_toolkit.patch_stdout", patch_stdout)
+ sys.modules.setdefault("prompt_toolkit.application", application)
+ sys.modules.setdefault("prompt_toolkit.layout", layout)
+ sys.modules.setdefault("prompt_toolkit.layout.processors", processors)
+ sys.modules.setdefault("prompt_toolkit.filters", filters)
+ sys.modules.setdefault("prompt_toolkit.layout.dimension", dimension)
+ sys.modules.setdefault("prompt_toolkit.layout.menus", menus)
+ sys.modules.setdefault("prompt_toolkit.widgets", widgets)
+ sys.modules.setdefault("prompt_toolkit.key_binding", key_binding)
+ sys.modules.setdefault("prompt_toolkit.completion", completion)
+ sys.modules.setdefault("prompt_toolkit.formatted_text", formatted_text)
+
+
+def _import_cli():
+ try:
+ importlib.import_module("prompt_toolkit")
+ except ModuleNotFoundError:
+ _install_prompt_toolkit_stubs()
+ return importlib.import_module("cli")
+
+
+def test_hermes_cli_init_does_not_eagerly_resolve_runtime_provider(monkeypatch):
+ cli = _import_cli()
+ calls = {"count": 0}
+
+ def _unexpected_runtime_resolve(**kwargs):
+ calls["count"] += 1
+ raise AssertionError("resolve_runtime_provider should not be called in HermesCLI.__init__")
+
+ monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _unexpected_runtime_resolve)
+ monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
+
+ shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
+
+ assert shell is not None
+ assert calls["count"] == 0
+
+
+def test_runtime_resolution_failure_is_not_sticky(monkeypatch):
+ cli = _import_cli()
+ calls = {"count": 0}
+
+ def _runtime_resolve(**kwargs):
+ calls["count"] += 1
+ if calls["count"] == 1:
+ raise RuntimeError("temporary auth failure")
+ return {
+ "provider": "openrouter",
+ "api_mode": "chat_completions",
+ "base_url": "https://openrouter.ai/api/v1",
+ "api_key": "test-key",
+ "source": "env/config",
+ }
+
+ class _DummyAgent:
+ def __init__(self, *args, **kwargs):
+ self.kwargs = kwargs
+
+ monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
+ monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
+ monkeypatch.setattr(cli, "AIAgent", _DummyAgent)
+
+ shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
+
+ assert shell._init_agent() is False
+ assert shell._init_agent() is True
+ assert calls["count"] == 2
+ assert shell.agent is not None
+
+
+def test_runtime_resolution_rebuilds_agent_on_routing_change(monkeypatch):
+ cli = _import_cli()
+
+ def _runtime_resolve(**kwargs):
+ return {
+ "provider": "openai-codex",
+ "api_mode": "codex_responses",
+ "base_url": "https://same-endpoint.example/v1",
+ "api_key": "same-key",
+ "source": "env/config",
+ }
+
+ monkeypatch.setattr("hermes_cli.runtime_provider.resolve_runtime_provider", _runtime_resolve)
+ monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
+
+ shell = cli.HermesCLI(model="gpt-5", compact=True, max_turns=1)
+ shell.provider = "openrouter"
+ shell.api_mode = "chat_completions"
+ shell.base_url = "https://same-endpoint.example/v1"
+ shell.api_key = "same-key"
+ shell.agent = object()
+
+ assert shell._ensure_runtime_credentials() is True
+ assert shell.agent is None
+ assert shell.provider == "openai-codex"
+ assert shell.api_mode == "codex_responses"
+
+
+def test_cmd_model_falls_back_to_auto_on_invalid_provider(monkeypatch, capsys):
+ monkeypatch.setattr(
+ "hermes_cli.config.load_config",
+ lambda: {"model": {"default": "gpt-5", "provider": "invalid-provider"}},
+ )
+ monkeypatch.setattr("hermes_cli.config.save_config", lambda cfg: None)
+ monkeypatch.setattr("hermes_cli.config.get_env_value", lambda key: "")
+ monkeypatch.setattr("hermes_cli.config.save_env_value", lambda key, value: None)
+
+ def _resolve_provider(requested, **kwargs):
+ if requested == "invalid-provider":
+ raise AuthError("Unknown provider 'invalid-provider'.", code="invalid_provider")
+ return "openrouter"
+
+ monkeypatch.setattr("hermes_cli.auth.resolve_provider", _resolve_provider)
+ monkeypatch.setattr(hermes_main, "_prompt_provider_choice", lambda choices: len(choices) - 1)
+
+ hermes_main.cmd_model(SimpleNamespace())
+ output = capsys.readouterr().out
+
+ assert "Warning:" in output
+ assert "falling back to auto provider detection" in output.lower()
+ assert "No change." in output
diff --git a/tests/test_codex_execution_paths.py b/tests/test_codex_execution_paths.py
new file mode 100644
index 000000000..13ce5d7ac
--- /dev/null
+++ b/tests/test_codex_execution_paths.py
@@ -0,0 +1,175 @@
+import asyncio
+import sys
+import types
+from types import SimpleNamespace
+
+
+sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
+sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
+sys.modules.setdefault("fal_client", types.SimpleNamespace())
+
+import cron.scheduler as cron_scheduler
+import gateway.run as gateway_run
+import run_agent
+from gateway.config import Platform
+from gateway.session import SessionSource
+
+
+def _patch_agent_bootstrap(monkeypatch):
+ monkeypatch.setattr(
+ run_agent,
+ "get_tool_definitions",
+ lambda **kwargs: [
+ {
+ "type": "function",
+ "function": {
+ "name": "terminal",
+ "description": "Run shell commands.",
+ "parameters": {"type": "object", "properties": {}},
+ },
+ }
+ ],
+ )
+ monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
+
+
+def _codex_message_response(text: str):
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="message",
+ content=[SimpleNamespace(type="output_text", text=text)],
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
+ status="completed",
+ model="gpt-5-codex",
+ )
+
+
+class _UnauthorizedError(RuntimeError):
+ def __init__(self):
+ super().__init__("Error code: 401 - unauthorized")
+ self.status_code = 401
+
+
+class _FakeOpenAI:
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+
+ def close(self):
+ return None
+
+
+class _Codex401ThenSuccessAgent(run_agent.AIAgent):
+ refresh_attempts = 0
+ last_init = {}
+
+ def __init__(self, *args, **kwargs):
+ kwargs.setdefault("skip_context_files", True)
+ kwargs.setdefault("skip_memory", True)
+ kwargs.setdefault("max_iterations", 4)
+ type(self).last_init = dict(kwargs)
+ super().__init__(*args, **kwargs)
+ self._cleanup_task_resources = lambda task_id: None
+ self._persist_session = lambda messages, history=None: None
+ self._save_trajectory = lambda messages, user_message, completed: None
+ self._save_session_log = lambda messages: None
+
+ def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
+ type(self).refresh_attempts += 1
+ return True
+
+ def run_conversation(self, user_message: str, conversation_history=None):
+ calls = {"api": 0}
+
+ def _fake_api_call(api_kwargs):
+ calls["api"] += 1
+ if calls["api"] == 1:
+ raise _UnauthorizedError()
+ return _codex_message_response("Recovered via refresh")
+
+ self._interruptible_api_call = _fake_api_call
+ return super().run_conversation(user_message, conversation_history=conversation_history)
+
+
+def test_cron_run_job_codex_path_handles_internal_401_refresh(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+ monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
+ monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
+ monkeypatch.setattr(
+ "hermes_cli.runtime_provider.resolve_runtime_provider",
+ lambda requested=None: {
+ "provider": "openai-codex",
+ "api_mode": "codex_responses",
+ "base_url": "https://chatgpt.com/backend-api/codex",
+ "api_key": "codex-token",
+ },
+ )
+ monkeypatch.setattr("hermes_cli.runtime_provider.format_runtime_provider_error", lambda exc: str(exc))
+
+ _Codex401ThenSuccessAgent.refresh_attempts = 0
+ _Codex401ThenSuccessAgent.last_init = {}
+
+ success, output, final_response, error = cron_scheduler.run_job(
+ {"id": "job-1", "name": "Codex Refresh Test", "prompt": "ping"}
+ )
+
+ assert success is True
+ assert error is None
+ assert final_response == "Recovered via refresh"
+ assert "Recovered via refresh" in output
+ assert _Codex401ThenSuccessAgent.refresh_attempts == 1
+ assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
+ assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"
+
+
+def test_gateway_run_agent_codex_path_handles_internal_401_refresh(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+ monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
+ monkeypatch.setattr(run_agent, "AIAgent", _Codex401ThenSuccessAgent)
+ monkeypatch.setattr(
+ gateway_run,
+ "_resolve_runtime_agent_kwargs",
+ lambda: {
+ "provider": "openai-codex",
+ "api_mode": "codex_responses",
+ "base_url": "https://chatgpt.com/backend-api/codex",
+ "api_key": "codex-token",
+ },
+ )
+ monkeypatch.setenv("HERMES_TOOL_PROGRESS", "false")
+
+ _Codex401ThenSuccessAgent.refresh_attempts = 0
+ _Codex401ThenSuccessAgent.last_init = {}
+
+ runner = gateway_run.GatewayRunner.__new__(gateway_run.GatewayRunner)
+ runner.adapters = {}
+ runner._ephemeral_system_prompt = ""
+ runner._prefill_messages = []
+ runner._reasoning_config = None
+ runner._running_agents = {}
+
+ source = SessionSource(
+ platform=Platform.LOCAL,
+ chat_id="cli",
+ chat_name="CLI",
+ chat_type="dm",
+ user_id="user-1",
+ )
+
+ result = asyncio.run(
+ runner._run_agent(
+ message="ping",
+ context_prompt="",
+ history=[],
+ source=source,
+ session_id="session-1",
+ session_key="agent:main:local:dm",
+ )
+ )
+
+ assert result["final_response"] == "Recovered via refresh"
+ assert _Codex401ThenSuccessAgent.refresh_attempts == 1
+ assert _Codex401ThenSuccessAgent.last_init["provider"] == "openai-codex"
+ assert _Codex401ThenSuccessAgent.last_init["api_mode"] == "codex_responses"
diff --git a/tests/test_codex_models.py b/tests/test_codex_models.py
new file mode 100644
index 000000000..e6cc2fdec
--- /dev/null
+++ b/tests/test_codex_models.py
@@ -0,0 +1,40 @@
+import json
+
+from hermes_cli.codex_models import DEFAULT_CODEX_MODELS, get_codex_model_ids
+
+
+def test_get_codex_model_ids_prioritizes_default_and_cache(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ codex_home.mkdir(parents=True, exist_ok=True)
+ (codex_home / "config.toml").write_text('model = "gpt-5.2-codex"\n')
+ (codex_home / "models_cache.json").write_text(
+ json.dumps(
+ {
+ "models": [
+ {"slug": "gpt-5.3-codex", "priority": 20, "supported_in_api": True},
+ {"slug": "gpt-5.1-codex", "priority": 5, "supported_in_api": True},
+ {"slug": "gpt-4o", "priority": 1, "supported_in_api": True},
+ {"slug": "gpt-5-hidden-codex", "priority": 2, "visibility": "hidden"},
+ ]
+ }
+ )
+ )
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ models = get_codex_model_ids()
+
+ assert models[0] == "gpt-5.2-codex"
+ assert "gpt-5.1-codex" in models
+ assert "gpt-5.3-codex" in models
+ assert "gpt-4o" not in models
+ assert "gpt-5-hidden-codex" not in models
+
+
+def test_get_codex_model_ids_falls_back_to_curated_defaults(tmp_path, monkeypatch):
+ codex_home = tmp_path / "codex-home"
+ codex_home.mkdir(parents=True, exist_ok=True)
+ monkeypatch.setenv("CODEX_HOME", str(codex_home))
+
+ models = get_codex_model_ids()
+
+ assert models[: len(DEFAULT_CODEX_MODELS)] == DEFAULT_CODEX_MODELS
diff --git a/tests/test_run_agent_codex_responses.py b/tests/test_run_agent_codex_responses.py
new file mode 100644
index 000000000..b3d3f552f
--- /dev/null
+++ b/tests/test_run_agent_codex_responses.py
@@ -0,0 +1,733 @@
+import sys
+import types
+from types import SimpleNamespace
+
+import pytest
+
+
+sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
+sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
+sys.modules.setdefault("fal_client", types.SimpleNamespace())
+
+import run_agent
+
+
+def _patch_agent_bootstrap(monkeypatch):
+ monkeypatch.setattr(
+ run_agent,
+ "get_tool_definitions",
+ lambda **kwargs: [
+ {
+ "type": "function",
+ "function": {
+ "name": "terminal",
+ "description": "Run shell commands.",
+ "parameters": {"type": "object", "properties": {}},
+ },
+ }
+ ],
+ )
+ monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
+
+
+def _build_agent(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+
+ agent = run_agent.AIAgent(
+ model="gpt-5-codex",
+ base_url="https://chatgpt.com/backend-api/codex",
+ api_key="codex-token",
+ quiet_mode=True,
+ max_iterations=4,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ agent._cleanup_task_resources = lambda task_id: None
+ agent._persist_session = lambda messages, history=None: None
+ agent._save_trajectory = lambda messages, user_message, completed: None
+ agent._save_session_log = lambda messages: None
+ return agent
+
+
+def _codex_message_response(text: str):
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="message",
+ content=[SimpleNamespace(type="output_text", text=text)],
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=5, output_tokens=3, total_tokens=8),
+ status="completed",
+ model="gpt-5-codex",
+ )
+
+
+def _codex_tool_call_response():
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="function_call",
+ id="fc_1",
+ call_id="call_1",
+ name="terminal",
+ arguments="{}",
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=12, output_tokens=4, total_tokens=16),
+ status="completed",
+ model="gpt-5-codex",
+ )
+
+
+def _codex_incomplete_message_response(text: str):
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="message",
+ status="in_progress",
+ content=[SimpleNamespace(type="output_text", text=text)],
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
+ status="in_progress",
+ model="gpt-5-codex",
+ )
+
+
+def _codex_commentary_message_response(text: str):
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="message",
+ phase="commentary",
+ status="completed",
+ content=[SimpleNamespace(type="output_text", text=text)],
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
+ status="completed",
+ model="gpt-5-codex",
+ )
+
+
+def _codex_ack_message_response(text: str):
+ return SimpleNamespace(
+ output=[
+ SimpleNamespace(
+ type="message",
+ status="completed",
+ content=[SimpleNamespace(type="output_text", text=text)],
+ )
+ ],
+ usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6),
+ status="completed",
+ model="gpt-5-codex",
+ )
+
+
+class _FakeResponsesStream:
+ def __init__(self, *, final_response=None, final_error=None):
+ self._final_response = final_response
+ self._final_error = final_error
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc, tb):
+ return False
+
+ def __iter__(self):
+ return iter(())
+
+ def get_final_response(self):
+ if self._final_error is not None:
+ raise self._final_error
+ return self._final_response
+
+
+class _FakeCreateStream:
+ def __init__(self, events):
+ self._events = list(events)
+ self.closed = False
+
+ def __iter__(self):
+ return iter(self._events)
+
+ def close(self):
+ self.closed = True
+
+
+def _codex_request_kwargs():
+ return {
+ "model": "gpt-5-codex",
+ "instructions": "You are Hermes.",
+ "input": [{"role": "user", "content": "Ping"}],
+ "tools": None,
+ "store": False,
+ }
+
+
+def test_api_mode_uses_explicit_provider_when_codex(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+ agent = run_agent.AIAgent(
+ model="gpt-5-codex",
+ base_url="https://openrouter.ai/api/v1",
+ provider="openai-codex",
+ api_key="codex-token",
+ quiet_mode=True,
+ max_iterations=1,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ assert agent.api_mode == "codex_responses"
+ assert agent.provider == "openai-codex"
+
+
+def test_api_mode_normalizes_provider_case(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+ agent = run_agent.AIAgent(
+ model="gpt-5-codex",
+ base_url="https://openrouter.ai/api/v1",
+ provider="OpenAI-Codex",
+ api_key="codex-token",
+ quiet_mode=True,
+ max_iterations=1,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ assert agent.provider == "openai-codex"
+ assert agent.api_mode == "codex_responses"
+
+
+def test_api_mode_respects_explicit_openrouter_provider_over_codex_url(monkeypatch):
+ _patch_agent_bootstrap(monkeypatch)
+ agent = run_agent.AIAgent(
+ model="gpt-5-codex",
+ base_url="https://chatgpt.com/backend-api/codex",
+ provider="openrouter",
+ api_key="test-token",
+ quiet_mode=True,
+ max_iterations=1,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ assert agent.api_mode == "chat_completions"
+ assert agent.provider == "openrouter"
+
+
+def test_build_api_kwargs_codex(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ kwargs = agent._build_api_kwargs(
+ [
+ {"role": "system", "content": "You are Hermes."},
+ {"role": "user", "content": "Ping"},
+ ]
+ )
+
+ assert kwargs["model"] == "gpt-5-codex"
+ assert kwargs["instructions"] == "You are Hermes."
+ assert kwargs["store"] is False
+ assert isinstance(kwargs["input"], list)
+ assert kwargs["input"][0]["role"] == "user"
+ assert kwargs["tools"][0]["type"] == "function"
+ assert kwargs["tools"][0]["name"] == "terminal"
+ assert kwargs["tools"][0]["strict"] is False
+ assert "function" not in kwargs["tools"][0]
+ assert kwargs["store"] is False
+ assert "timeout" not in kwargs
+ assert "max_tokens" not in kwargs
+ assert "extra_body" not in kwargs
+
+
+def test_run_codex_stream_retries_when_completed_event_missing(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ calls = {"stream": 0}
+
+ def _fake_stream(**kwargs):
+ calls["stream"] += 1
+ if calls["stream"] == 1:
+ return _FakeResponsesStream(
+ final_error=RuntimeError("Didn't receive a `response.completed` event.")
+ )
+ return _FakeResponsesStream(final_response=_codex_message_response("stream ok"))
+
+ agent.client = SimpleNamespace(
+ responses=SimpleNamespace(
+ stream=_fake_stream,
+ create=lambda **kwargs: _codex_message_response("fallback"),
+ )
+ )
+
+ response = agent._run_codex_stream(_codex_request_kwargs())
+ assert calls["stream"] == 2
+ assert response.output[0].content[0].text == "stream ok"
+
+
+def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ calls = {"stream": 0, "create": 0}
+
+ def _fake_stream(**kwargs):
+ calls["stream"] += 1
+ return _FakeResponsesStream(
+ final_error=RuntimeError("Didn't receive a `response.completed` event.")
+ )
+
+ def _fake_create(**kwargs):
+ calls["create"] += 1
+ return _codex_message_response("create fallback ok")
+
+ agent.client = SimpleNamespace(
+ responses=SimpleNamespace(
+ stream=_fake_stream,
+ create=_fake_create,
+ )
+ )
+
+ response = agent._run_codex_stream(_codex_request_kwargs())
+ assert calls["stream"] == 2
+ assert calls["create"] == 1
+ assert response.output[0].content[0].text == "create fallback ok"
+
+
+def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ calls = {"stream": 0, "create": 0}
+ create_stream = _FakeCreateStream(
+ [
+ SimpleNamespace(type="response.created"),
+ SimpleNamespace(type="response.in_progress"),
+ SimpleNamespace(type="response.completed", response=_codex_message_response("streamed create ok")),
+ ]
+ )
+
+ def _fake_stream(**kwargs):
+ calls["stream"] += 1
+ return _FakeResponsesStream(
+ final_error=RuntimeError("Didn't receive a `response.completed` event.")
+ )
+
+ def _fake_create(**kwargs):
+ calls["create"] += 1
+ assert kwargs.get("stream") is True
+ return create_stream
+
+ agent.client = SimpleNamespace(
+ responses=SimpleNamespace(
+ stream=_fake_stream,
+ create=_fake_create,
+ )
+ )
+
+ response = agent._run_codex_stream(_codex_request_kwargs())
+ assert calls["stream"] == 2
+ assert calls["create"] == 1
+ assert create_stream.closed is True
+ assert response.output[0].content[0].text == "streamed create ok"
+
+
+def test_run_conversation_codex_plain_text(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))
+
+ result = agent.run_conversation("Say OK")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "OK"
+ assert result["messages"][-1]["role"] == "assistant"
+ assert result["messages"][-1]["content"] == "OK"
+
+
+def test_run_conversation_codex_refreshes_after_401_and_retries(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ calls = {"api": 0, "refresh": 0}
+
+ class _UnauthorizedError(RuntimeError):
+ def __init__(self):
+ super().__init__("Error code: 401 - unauthorized")
+ self.status_code = 401
+
+ def _fake_api_call(api_kwargs):
+ calls["api"] += 1
+ if calls["api"] == 1:
+ raise _UnauthorizedError()
+ return _codex_message_response("Recovered after refresh")
+
+ def _fake_refresh(*, force=True):
+ calls["refresh"] += 1
+ assert force is True
+ return True
+
+ monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
+ monkeypatch.setattr(agent, "_try_refresh_codex_client_credentials", _fake_refresh)
+
+ result = agent.run_conversation("Say OK")
+
+ assert calls["api"] == 2
+ assert calls["refresh"] == 1
+ assert result["completed"] is True
+ assert result["final_response"] == "Recovered after refresh"
+
+
+def test_try_refresh_codex_client_credentials_rebuilds_client(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ closed = {"value": False}
+ rebuilt = {"kwargs": None}
+
+ class _ExistingClient:
+ def close(self):
+ closed["value"] = True
+
+ class _RebuiltClient:
+ pass
+
+ def _fake_openai(**kwargs):
+ rebuilt["kwargs"] = kwargs
+ return _RebuiltClient()
+
+ monkeypatch.setattr(
+ "hermes_cli.auth.resolve_codex_runtime_credentials",
+ lambda force_refresh=True: {
+ "api_key": "new-codex-token",
+ "base_url": "https://chatgpt.com/backend-api/codex",
+ },
+ )
+ monkeypatch.setattr(run_agent, "OpenAI", _fake_openai)
+
+ agent.client = _ExistingClient()
+ ok = agent._try_refresh_codex_client_credentials(force=True)
+
+ assert ok is True
+ assert closed["value"] is True
+ assert rebuilt["kwargs"]["api_key"] == "new-codex-token"
+ assert rebuilt["kwargs"]["base_url"] == "https://chatgpt.com/backend-api/codex"
+ assert isinstance(agent.client, _RebuiltClient)
+
+
+def test_run_conversation_codex_tool_round_trip(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [_codex_tool_call_response(), _codex_message_response("done")]
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("run a command")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "done"
+ assert any(msg.get("tool_calls") for msg in result["messages"] if msg.get("role") == "assistant")
+ assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
+
+
+def test_chat_messages_to_responses_input_uses_call_id_for_function_call(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ items = agent._chat_messages_to_responses_input(
+ [
+ {"role": "user", "content": "Run terminal"},
+ {
+ "role": "assistant",
+ "content": "",
+ "tool_calls": [
+ {
+ "id": "call_abc123",
+ "type": "function",
+ "function": {"name": "terminal", "arguments": "{}"},
+ }
+ ],
+ },
+ {"role": "tool", "tool_call_id": "call_abc123", "content": '{"ok":true}'},
+ ]
+ )
+
+ function_call = next(item for item in items if item.get("type") == "function_call")
+ function_output = next(item for item in items if item.get("type") == "function_call_output")
+
+ assert function_call["call_id"] == "call_abc123"
+ assert "id" not in function_call
+ assert function_output["call_id"] == "call_abc123"
+
+
+def test_chat_messages_to_responses_input_accepts_call_pipe_fc_ids(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ items = agent._chat_messages_to_responses_input(
+ [
+ {"role": "user", "content": "Run terminal"},
+ {
+ "role": "assistant",
+ "content": "",
+ "tool_calls": [
+ {
+ "id": "call_pair123|fc_pair123",
+ "type": "function",
+ "function": {"name": "terminal", "arguments": "{}"},
+ }
+ ],
+ },
+ {"role": "tool", "tool_call_id": "call_pair123|fc_pair123", "content": '{"ok":true}'},
+ ]
+ )
+
+ function_call = next(item for item in items if item.get("type") == "function_call")
+ function_output = next(item for item in items if item.get("type") == "function_call_output")
+
+ assert function_call["call_id"] == "call_pair123"
+ assert "id" not in function_call
+ assert function_output["call_id"] == "call_pair123"
+
+
+def test_preflight_codex_api_kwargs_strips_optional_function_call_id(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ preflight = agent._preflight_codex_api_kwargs(
+ {
+ "model": "gpt-5-codex",
+ "instructions": "You are Hermes.",
+ "input": [
+ {"role": "user", "content": "hi"},
+ {
+ "type": "function_call",
+ "id": "call_bad",
+ "call_id": "call_good",
+ "name": "terminal",
+ "arguments": "{}",
+ },
+ ],
+ "tools": [],
+ "store": False,
+ }
+ )
+
+ fn_call = next(item for item in preflight["input"] if item.get("type") == "function_call")
+ assert fn_call["call_id"] == "call_good"
+ assert "id" not in fn_call
+
+
+def test_preflight_codex_api_kwargs_rejects_function_call_output_without_call_id(monkeypatch):
+ agent = _build_agent(monkeypatch)
+
+ with pytest.raises(ValueError, match="function_call_output is missing call_id"):
+ agent._preflight_codex_api_kwargs(
+ {
+ "model": "gpt-5-codex",
+ "instructions": "You are Hermes.",
+ "input": [{"type": "function_call_output", "output": "{}"}],
+ "tools": [],
+ "store": False,
+ }
+ )
+
+
+def test_preflight_codex_api_kwargs_rejects_unsupported_request_fields(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ kwargs = _codex_request_kwargs()
+ kwargs["temperature"] = 0
+
+ with pytest.raises(ValueError, match="unsupported field"):
+ agent._preflight_codex_api_kwargs(kwargs)
+
+
+def test_run_conversation_codex_replay_payload_keeps_call_id(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [_codex_tool_call_response(), _codex_message_response("done")]
+ requests = []
+
+ def _fake_api_call(api_kwargs):
+ requests.append(api_kwargs)
+ return responses.pop(0)
+
+ monkeypatch.setattr(agent, "_interruptible_api_call", _fake_api_call)
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("run a command")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "done"
+ assert len(requests) >= 2
+
+ replay_input = requests[1]["input"]
+ function_call = next(item for item in replay_input if item.get("type") == "function_call")
+ function_output = next(item for item in replay_input if item.get("type") == "function_call_output")
+ assert function_call["call_id"] == "call_1"
+ assert "id" not in function_call
+ assert function_output["call_id"] == "call_1"
+
+
+def test_run_conversation_codex_continues_after_incomplete_interim_message(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [
+ _codex_incomplete_message_response("I'll inspect the repo structure first."),
+ _codex_tool_call_response(),
+ _codex_message_response("Architecture summary complete."),
+ ]
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("analyze repo")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "Architecture summary complete."
+ assert any(
+ msg.get("role") == "assistant"
+ and msg.get("finish_reason") == "incomplete"
+ and "inspect the repo structure" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
+
+
+def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ assistant_message, finish_reason = agent._normalize_codex_response(
+ _codex_commentary_message_response("I'll inspect the repository first.")
+ )
+
+ assert finish_reason == "incomplete"
+ assert "inspect the repository" in (assistant_message.content or "")
+
+
+def test_run_conversation_codex_continues_after_commentary_phase_message(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [
+ _codex_commentary_message_response("I'll inspect the repo structure first."),
+ _codex_tool_call_response(),
+ _codex_message_response("Architecture summary complete."),
+ ]
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("analyze repo")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "Architecture summary complete."
+ assert any(
+ msg.get("role") == "assistant"
+ and msg.get("finish_reason") == "incomplete"
+ and "inspect the repo structure" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
+
+
+def test_run_conversation_codex_continues_after_ack_stop_message(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [
+ _codex_ack_message_response(
+ "Absolutely — I can do that. I'll inspect ~/openclaw-studio and report back with a walkthrough."
+ ),
+ _codex_tool_call_response(),
+ _codex_message_response("Architecture summary complete."),
+ ]
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("look into ~/openclaw-studio and tell me how it works")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "Architecture summary complete."
+ assert any(
+ msg.get("role") == "assistant"
+ and msg.get("finish_reason") == "incomplete"
+ and "inspect ~/openclaw-studio" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(
+ msg.get("role") == "user"
+ and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
+
+
+def test_run_conversation_codex_continues_after_ack_for_directory_listing_prompt(monkeypatch):
+ agent = _build_agent(monkeypatch)
+ responses = [
+ _codex_ack_message_response(
+ "I'll check what's in the current directory and call out 3 notable items."
+ ),
+ _codex_tool_call_response(),
+ _codex_message_response("Directory summary complete."),
+ ]
+ monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0))
+
+ def _fake_execute_tool_calls(assistant_message, messages, effective_task_id):
+ for call in assistant_message.tool_calls:
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": call.id,
+ "content": '{"ok":true}',
+ }
+ )
+
+ monkeypatch.setattr(agent, "_execute_tool_calls", _fake_execute_tool_calls)
+
+ result = agent.run_conversation("look at current directory and list 3 notable things")
+
+ assert result["completed"] is True
+ assert result["final_response"] == "Directory summary complete."
+ assert any(
+ msg.get("role") == "assistant"
+ and msg.get("finish_reason") == "incomplete"
+ and "current directory" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(
+ msg.get("role") == "user"
+ and "Continue now. Execute the required tool calls" in (msg.get("content") or "")
+ for msg in result["messages"]
+ )
+ assert any(msg.get("role") == "tool" and msg.get("tool_call_id") == "call_1" for msg in result["messages"])
diff --git a/tests/test_runtime_provider_resolution.py b/tests/test_runtime_provider_resolution.py
new file mode 100644
index 000000000..af6914092
--- /dev/null
+++ b/tests/test_runtime_provider_resolution.py
@@ -0,0 +1,95 @@
+from hermes_cli import runtime_provider as rp
+
+
+def test_resolve_runtime_provider_codex(monkeypatch):
+ monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
+ monkeypatch.setattr(
+ rp,
+ "resolve_codex_runtime_credentials",
+ lambda: {
+ "provider": "openai-codex",
+ "base_url": "https://chatgpt.com/backend-api/codex",
+ "api_key": "codex-token",
+ "source": "codex-auth-json",
+ "auth_file": "/tmp/auth.json",
+ "codex_home": "/tmp/codex",
+ "last_refresh": "2026-02-26T00:00:00Z",
+ },
+ )
+
+ resolved = rp.resolve_runtime_provider(requested="openai-codex")
+
+ assert resolved["provider"] == "openai-codex"
+ assert resolved["api_mode"] == "codex_responses"
+ assert resolved["base_url"] == "https://chatgpt.com/backend-api/codex"
+ assert resolved["api_key"] == "codex-token"
+ assert resolved["requested_provider"] == "openai-codex"
+
+
+def test_resolve_runtime_provider_openrouter_explicit(monkeypatch):
+ monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
+ monkeypatch.setattr(rp, "_get_model_config", lambda: {})
+ monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+
+ resolved = rp.resolve_runtime_provider(
+ requested="openrouter",
+ explicit_api_key="test-key",
+ explicit_base_url="https://example.com/v1/",
+ )
+
+ assert resolved["provider"] == "openrouter"
+ assert resolved["api_mode"] == "chat_completions"
+ assert resolved["api_key"] == "test-key"
+ assert resolved["base_url"] == "https://example.com/v1"
+ assert resolved["source"] == "explicit"
+
+
+def test_resolve_runtime_provider_openrouter_ignores_codex_config_base_url(monkeypatch):
+ monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
+ monkeypatch.setattr(
+ rp,
+ "_get_model_config",
+ lambda: {
+ "provider": "openai-codex",
+ "base_url": "https://chatgpt.com/backend-api/codex",
+ },
+ )
+ monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+
+ resolved = rp.resolve_runtime_provider(requested="openrouter")
+
+ assert resolved["provider"] == "openrouter"
+ assert resolved["base_url"] == rp.OPENROUTER_BASE_URL
+
+
+def test_resolve_runtime_provider_auto_uses_custom_config_base_url(monkeypatch):
+ monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
+ monkeypatch.setattr(
+ rp,
+ "_get_model_config",
+ lambda: {
+ "provider": "auto",
+ "base_url": "https://custom.example/v1/",
+ },
+ )
+ monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+
+ resolved = rp.resolve_runtime_provider(requested="auto")
+
+ assert resolved["provider"] == "openrouter"
+ assert resolved["base_url"] == "https://custom.example/v1"
+
+
+def test_resolve_requested_provider_precedence(monkeypatch):
+ monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
+ monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})
+ assert rp.resolve_requested_provider("openrouter") == "openrouter"
diff --git a/tests/tools/test_delegate.py b/tests/tools/test_delegate.py
index 5d5bb2c7c..948af4d0f 100644
--- a/tests/tools/test_delegate.py
+++ b/tests/tools/test_delegate.py
@@ -30,6 +30,9 @@ def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
+ parent.api_key = "parent-key"
+ parent.provider = "openrouter"
+ parent.api_mode = "chat_completions"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
@@ -218,6 +221,30 @@ class TestDelegateTask(unittest.TestCase):
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
+ def test_child_inherits_runtime_credentials(self):
+ parent = _make_mock_parent(depth=0)
+ parent.base_url = "https://chatgpt.com/backend-api/codex"
+ parent.api_key = "codex-token"
+ parent.provider = "openai-codex"
+ parent.api_mode = "codex_responses"
+
+ with patch("run_agent.AIAgent") as MockAgent:
+ mock_child = MagicMock()
+ mock_child.run_conversation.return_value = {
+ "final_response": "ok",
+ "completed": True,
+ "api_calls": 1,
+ }
+ MockAgent.return_value = mock_child
+
+ delegate_task(goal="Test runtime inheritance", parent_agent=parent)
+
+ _, kwargs = MockAgent.call_args
+ self.assertEqual(kwargs["base_url"], parent.base_url)
+ self.assertEqual(kwargs["api_key"], parent.api_key)
+ self.assertEqual(kwargs["provider"], parent.provider)
+ self.assertEqual(kwargs["api_mode"], parent.api_mode)
+
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):
diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py
index ad308c2e4..77659d3c2 100644
--- a/tools/delegate_tool.py
+++ b/tools/delegate_tool.py
@@ -120,15 +120,17 @@ def _run_single_child(
pass
try:
- # Extract parent's API key so subagents inherit auth (e.g. Nous Portal)
- parent_api_key = None
- if hasattr(parent_agent, '_client_kwargs'):
+ # Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
+ parent_api_key = getattr(parent_agent, "api_key", None)
+ if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
parent_api_key = parent_agent._client_kwargs.get("api_key")
child = AIAgent(
base_url=parent_agent.base_url,
api_key=parent_api_key,
model=model or parent_agent.model,
+ provider=getattr(parent_agent, "provider", None),
+ api_mode=getattr(parent_agent, "api_mode", None),
max_iterations=max_iterations,
enabled_toolsets=child_toolsets,
quiet_mode=True,