No model, base_url, or provider is assumed when the user hasn't configured one. Previously the defaults dict in cli.py, AIAgent constructor args, and several fallback paths all hardcoded anthropic/claude-opus-4.6 + openrouter.ai/api/v1 — silently routing unconfigured users to OpenRouter, which 404s for anyone using a different provider. Now empty defaults force the setup wizard to run, and existing users who already completed setup are unaffected (their config.yaml has the model they chose). Files changed: - cli.py: defaults dict, _DEFAULT_CONFIG_MODEL - run_agent.py: AIAgent.__init__ defaults, main() defaults - hermes_cli/config.py: DEFAULT_CONFIG - hermes_cli/runtime_provider.py: is_fallback sentinel - acp_adapter/session.py: default_model - tests: updated to reflect empty defaults
699 lines
26 KiB
Python
699 lines
26 KiB
Python
"""Shared runtime provider resolution for CLI, gateway, cron, and helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Any, Dict, Optional
|
|
|
|
from hermes_cli import auth as auth_mod
|
|
from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool
|
|
from hermes_cli.auth import (
|
|
AuthError,
|
|
DEFAULT_CODEX_BASE_URL,
|
|
PROVIDER_REGISTRY,
|
|
format_auth_error,
|
|
resolve_provider,
|
|
resolve_nous_runtime_credentials,
|
|
resolve_codex_runtime_credentials,
|
|
resolve_api_key_provider_credentials,
|
|
resolve_external_process_provider_credentials,
|
|
has_usable_secret,
|
|
)
|
|
from hermes_cli.config import load_config
|
|
from hermes_constants import OPENROUTER_BASE_URL
|
|
|
|
|
|
def _normalize_custom_provider_name(value: str) -> str:
|
|
return value.strip().lower().replace(" ", "-")
|
|
|
|
|
|
def _detect_api_mode_for_url(base_url: str) -> Optional[str]:
|
|
"""Auto-detect api_mode from the resolved base URL.
|
|
|
|
Direct api.openai.com endpoints need the Responses API for GPT-5.x
|
|
tool calls with reasoning (chat/completions returns 400).
|
|
"""
|
|
normalized = (base_url or "").strip().lower().rstrip("/")
|
|
if "api.openai.com" in normalized and "openrouter" not in normalized:
|
|
return "codex_responses"
|
|
return None
|
|
|
|
|
|
def _auto_detect_local_model(base_url: str) -> str:
|
|
"""Query a local server for its model name when only one model is loaded."""
|
|
if not base_url:
|
|
return ""
|
|
try:
|
|
import requests
|
|
url = base_url.rstrip("/")
|
|
if not url.endswith("/v1"):
|
|
url += "/v1"
|
|
resp = requests.get(url + "/models", timeout=5)
|
|
if resp.ok:
|
|
models = resp.json().get("data", [])
|
|
if len(models) == 1:
|
|
model_id = models[0].get("id", "")
|
|
if model_id:
|
|
return model_id
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
|
|
def _get_model_config() -> Dict[str, Any]:
|
|
config = load_config()
|
|
model_cfg = config.get("model")
|
|
if isinstance(model_cfg, dict):
|
|
cfg = dict(model_cfg)
|
|
# Accept "model" as alias for "default" (users intuitively write model.model)
|
|
if not cfg.get("default") and cfg.get("model"):
|
|
cfg["default"] = cfg["model"]
|
|
default = (cfg.get("default") or "").strip()
|
|
base_url = (cfg.get("base_url") or "").strip()
|
|
is_local = "localhost" in base_url or "127.0.0.1" in base_url
|
|
is_fallback = not default
|
|
if is_local and is_fallback and base_url:
|
|
detected = _auto_detect_local_model(base_url)
|
|
if detected:
|
|
cfg["default"] = detected
|
|
return cfg
|
|
if isinstance(model_cfg, str) and model_cfg.strip():
|
|
return {"default": model_cfg.strip()}
|
|
return {}
|
|
|
|
|
|
def _copilot_runtime_api_mode(model_cfg: Dict[str, Any], api_key: str) -> str:
|
|
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
|
|
if configured_mode:
|
|
return configured_mode
|
|
|
|
model_name = str(model_cfg.get("default") or "").strip()
|
|
if not model_name:
|
|
return "chat_completions"
|
|
|
|
try:
|
|
from hermes_cli.models import copilot_model_api_mode
|
|
|
|
return copilot_model_api_mode(model_name, api_key=api_key)
|
|
except Exception:
|
|
return "chat_completions"
|
|
|
|
|
|
_VALID_API_MODES = {"chat_completions", "codex_responses", "anthropic_messages"}
|
|
|
|
|
|
def _parse_api_mode(raw: Any) -> Optional[str]:
|
|
"""Validate an api_mode value from config. Returns None if invalid."""
|
|
if isinstance(raw, str):
|
|
normalized = raw.strip().lower()
|
|
if normalized in _VALID_API_MODES:
|
|
return normalized
|
|
return None
|
|
|
|
|
|
def _resolve_runtime_from_pool_entry(
|
|
*,
|
|
provider: str,
|
|
entry: PooledCredential,
|
|
requested_provider: str,
|
|
model_cfg: Optional[Dict[str, Any]] = None,
|
|
pool: Optional[CredentialPool] = None,
|
|
) -> Dict[str, Any]:
|
|
model_cfg = model_cfg or _get_model_config()
|
|
base_url = (getattr(entry, "runtime_base_url", None) or getattr(entry, "base_url", None) or "").rstrip("/")
|
|
api_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
|
|
api_mode = "chat_completions"
|
|
if provider == "openai-codex":
|
|
api_mode = "codex_responses"
|
|
base_url = base_url or DEFAULT_CODEX_BASE_URL
|
|
elif provider == "anthropic":
|
|
api_mode = "anthropic_messages"
|
|
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
|
|
cfg_base_url = ""
|
|
if cfg_provider == "anthropic":
|
|
cfg_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
|
|
base_url = cfg_base_url or base_url or "https://api.anthropic.com"
|
|
elif provider == "openrouter":
|
|
base_url = base_url or OPENROUTER_BASE_URL
|
|
elif provider == "nous":
|
|
api_mode = "chat_completions"
|
|
elif provider == "copilot":
|
|
api_mode = _copilot_runtime_api_mode(model_cfg, getattr(entry, "runtime_api_key", ""))
|
|
else:
|
|
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
|
|
if configured_mode:
|
|
api_mode = configured_mode
|
|
elif base_url.rstrip("/").endswith("/anthropic"):
|
|
api_mode = "anthropic_messages"
|
|
|
|
return {
|
|
"provider": provider,
|
|
"api_mode": api_mode,
|
|
"base_url": base_url,
|
|
"api_key": api_key,
|
|
"source": getattr(entry, "source", "pool"),
|
|
"credential_pool": pool,
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
|
|
def resolve_requested_provider(requested: Optional[str] = None) -> str:
|
|
"""Resolve provider request from explicit arg, config, then env."""
|
|
if requested and requested.strip():
|
|
return requested.strip().lower()
|
|
|
|
model_cfg = _get_model_config()
|
|
cfg_provider = model_cfg.get("provider")
|
|
if isinstance(cfg_provider, str) and cfg_provider.strip():
|
|
return cfg_provider.strip().lower()
|
|
|
|
# Prefer the persisted config selection over any stale shell/.env
|
|
# provider override so chat uses the endpoint the user last saved.
|
|
env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
|
|
if env_provider:
|
|
return env_provider
|
|
|
|
return "auto"
|
|
|
|
|
|
def _try_resolve_from_custom_pool(
|
|
base_url: str,
|
|
provider_label: str,
|
|
api_mode_override: Optional[str] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Check if a credential pool exists for a custom endpoint and return a runtime dict if so."""
|
|
pool_key = get_custom_provider_pool_key(base_url)
|
|
if not pool_key:
|
|
return None
|
|
try:
|
|
pool = load_pool(pool_key)
|
|
if not pool.has_credentials():
|
|
return None
|
|
entry = pool.select()
|
|
if entry is None:
|
|
return None
|
|
pool_api_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
|
|
if not pool_api_key:
|
|
return None
|
|
return {
|
|
"provider": provider_label,
|
|
"api_mode": api_mode_override or _detect_api_mode_for_url(base_url) or "chat_completions",
|
|
"base_url": base_url,
|
|
"api_key": pool_api_key,
|
|
"source": f"pool:{pool_key}",
|
|
"credential_pool": pool,
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, Any]]:
|
|
requested_norm = _normalize_custom_provider_name(requested_provider or "")
|
|
if not requested_norm or requested_norm == "custom":
|
|
return None
|
|
|
|
# Raw names should only map to custom providers when they are not already
|
|
# valid built-in providers or aliases. Explicit menu keys like
|
|
# ``custom:local`` always target the saved custom provider.
|
|
if requested_norm == "auto":
|
|
return None
|
|
if not requested_norm.startswith("custom:"):
|
|
try:
|
|
auth_mod.resolve_provider(requested_norm)
|
|
except AuthError:
|
|
pass
|
|
else:
|
|
return None
|
|
|
|
config = load_config()
|
|
custom_providers = config.get("custom_providers")
|
|
if not isinstance(custom_providers, list):
|
|
return None
|
|
|
|
for entry in custom_providers:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
name = entry.get("name")
|
|
base_url = entry.get("base_url")
|
|
if not isinstance(name, str) or not isinstance(base_url, str):
|
|
continue
|
|
name_norm = _normalize_custom_provider_name(name)
|
|
menu_key = f"custom:{name_norm}"
|
|
if requested_norm not in {name_norm, menu_key}:
|
|
continue
|
|
result = {
|
|
"name": name.strip(),
|
|
"base_url": base_url.strip(),
|
|
"api_key": str(entry.get("api_key", "") or "").strip(),
|
|
}
|
|
api_mode = _parse_api_mode(entry.get("api_mode"))
|
|
if api_mode:
|
|
result["api_mode"] = api_mode
|
|
return result
|
|
|
|
return None
|
|
|
|
|
|
def _resolve_named_custom_runtime(
|
|
*,
|
|
requested_provider: str,
|
|
explicit_api_key: Optional[str] = None,
|
|
explicit_base_url: Optional[str] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
custom_provider = _get_named_custom_provider(requested_provider)
|
|
if not custom_provider:
|
|
return None
|
|
|
|
base_url = (
|
|
(explicit_base_url or "").strip()
|
|
or custom_provider.get("base_url", "")
|
|
).rstrip("/")
|
|
if not base_url:
|
|
return None
|
|
|
|
# Check if a credential pool exists for this custom endpoint
|
|
pool_result = _try_resolve_from_custom_pool(base_url, "custom", custom_provider.get("api_mode"))
|
|
if pool_result:
|
|
return pool_result
|
|
|
|
api_key_candidates = [
|
|
(explicit_api_key or "").strip(),
|
|
str(custom_provider.get("api_key", "") or "").strip(),
|
|
os.getenv("OPENAI_API_KEY", "").strip(),
|
|
os.getenv("OPENROUTER_API_KEY", "").strip(),
|
|
]
|
|
api_key = next((candidate for candidate in api_key_candidates if has_usable_secret(candidate)), "")
|
|
|
|
return {
|
|
"provider": "custom",
|
|
"api_mode": custom_provider.get("api_mode")
|
|
or _detect_api_mode_for_url(base_url)
|
|
or "chat_completions",
|
|
"base_url": base_url,
|
|
"api_key": api_key or "no-key-required",
|
|
"source": f"custom_provider:{custom_provider.get('name', requested_provider)}",
|
|
}
|
|
|
|
|
|
def _resolve_openrouter_runtime(
|
|
*,
|
|
requested_provider: str,
|
|
explicit_api_key: Optional[str] = None,
|
|
explicit_base_url: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
model_cfg = _get_model_config()
|
|
cfg_base_url = model_cfg.get("base_url") if isinstance(model_cfg.get("base_url"), str) else ""
|
|
cfg_provider = model_cfg.get("provider") if isinstance(model_cfg.get("provider"), str) else ""
|
|
cfg_api_key = ""
|
|
for k in ("api_key", "api"):
|
|
v = model_cfg.get(k)
|
|
if isinstance(v, str) and v.strip():
|
|
cfg_api_key = v.strip()
|
|
break
|
|
requested_norm = (requested_provider or "").strip().lower()
|
|
cfg_provider = cfg_provider.strip().lower()
|
|
|
|
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
|
|
|
|
# Use config base_url when available and the provider context matches.
|
|
# OPENAI_BASE_URL env var is no longer consulted — config.yaml is
|
|
# the single source of truth for endpoint URLs.
|
|
use_config_base_url = False
|
|
if cfg_base_url.strip() and not explicit_base_url:
|
|
if requested_norm == "auto":
|
|
if not cfg_provider or cfg_provider == "auto":
|
|
use_config_base_url = True
|
|
elif requested_norm == "custom" and cfg_provider == "custom":
|
|
use_config_base_url = True
|
|
|
|
base_url = (
|
|
(explicit_base_url or "").strip()
|
|
or (cfg_base_url.strip() if use_config_base_url else "")
|
|
or env_openrouter_base_url
|
|
or OPENROUTER_BASE_URL
|
|
).rstrip("/")
|
|
|
|
# Choose API key based on whether the resolved base_url targets OpenRouter.
|
|
# When hitting OpenRouter, prefer OPENROUTER_API_KEY (issue #289).
|
|
# When hitting a custom endpoint (e.g. Z.ai, local LLM), prefer
|
|
# OPENAI_API_KEY so the OpenRouter key doesn't leak to an unrelated
|
|
# provider (issues #420, #560).
|
|
_is_openrouter_url = "openrouter.ai" in base_url
|
|
if _is_openrouter_url:
|
|
api_key_candidates = [
|
|
explicit_api_key,
|
|
os.getenv("OPENROUTER_API_KEY"),
|
|
os.getenv("OPENAI_API_KEY"),
|
|
]
|
|
else:
|
|
# Custom endpoint: use api_key from config when using config base_url (#1760).
|
|
api_key_candidates = [
|
|
explicit_api_key,
|
|
(cfg_api_key if use_config_base_url else ""),
|
|
os.getenv("OPENAI_API_KEY"),
|
|
os.getenv("OPENROUTER_API_KEY"),
|
|
]
|
|
api_key = next(
|
|
(str(candidate or "").strip() for candidate in api_key_candidates if has_usable_secret(candidate)),
|
|
"",
|
|
)
|
|
|
|
source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config"
|
|
|
|
# When "custom" was explicitly requested, preserve that as the provider
|
|
# name instead of silently relabeling to "openrouter" (#2562).
|
|
# Also provide a placeholder API key for local servers that don't require
|
|
# authentication — the OpenAI SDK requires a non-empty api_key string.
|
|
effective_provider = "custom" if requested_norm == "custom" else "openrouter"
|
|
|
|
# For custom endpoints, check if a credential pool exists
|
|
if effective_provider == "custom" and base_url:
|
|
pool_result = _try_resolve_from_custom_pool(
|
|
base_url, effective_provider, _parse_api_mode(model_cfg.get("api_mode")),
|
|
)
|
|
if pool_result:
|
|
return pool_result
|
|
|
|
if effective_provider == "custom" and not api_key and not _is_openrouter_url:
|
|
api_key = "no-key-required"
|
|
|
|
return {
|
|
"provider": effective_provider,
|
|
"api_mode": _parse_api_mode(model_cfg.get("api_mode"))
|
|
or _detect_api_mode_for_url(base_url)
|
|
or "chat_completions",
|
|
"base_url": base_url,
|
|
"api_key": api_key,
|
|
"source": source,
|
|
}
|
|
|
|
|
|
def _resolve_explicit_runtime(
|
|
*,
|
|
provider: str,
|
|
requested_provider: str,
|
|
model_cfg: Dict[str, Any],
|
|
explicit_api_key: Optional[str] = None,
|
|
explicit_base_url: Optional[str] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
explicit_api_key = str(explicit_api_key or "").strip()
|
|
explicit_base_url = str(explicit_base_url or "").strip().rstrip("/")
|
|
if not explicit_api_key and not explicit_base_url:
|
|
return None
|
|
|
|
if provider == "anthropic":
|
|
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
|
|
cfg_base_url = ""
|
|
if cfg_provider == "anthropic":
|
|
cfg_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
|
|
base_url = explicit_base_url or cfg_base_url or "https://api.anthropic.com"
|
|
api_key = explicit_api_key
|
|
if not api_key:
|
|
from agent.anthropic_adapter import resolve_anthropic_token
|
|
|
|
api_key = resolve_anthropic_token()
|
|
if not api_key:
|
|
raise AuthError(
|
|
"No Anthropic credentials found. Set ANTHROPIC_TOKEN or ANTHROPIC_API_KEY, "
|
|
"run 'claude setup-token', or authenticate with 'claude /login'."
|
|
)
|
|
return {
|
|
"provider": "anthropic",
|
|
"api_mode": "anthropic_messages",
|
|
"base_url": base_url,
|
|
"api_key": api_key,
|
|
"source": "explicit",
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
if provider == "openai-codex":
|
|
base_url = explicit_base_url or DEFAULT_CODEX_BASE_URL
|
|
api_key = explicit_api_key
|
|
last_refresh = None
|
|
if not api_key:
|
|
creds = resolve_codex_runtime_credentials()
|
|
api_key = creds.get("api_key", "")
|
|
last_refresh = creds.get("last_refresh")
|
|
if not explicit_base_url:
|
|
base_url = creds.get("base_url", "").rstrip("/") or base_url
|
|
return {
|
|
"provider": "openai-codex",
|
|
"api_mode": "codex_responses",
|
|
"base_url": base_url,
|
|
"api_key": api_key,
|
|
"source": "explicit",
|
|
"last_refresh": last_refresh,
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
if provider == "nous":
|
|
state = auth_mod.get_provider_auth_state("nous") or {}
|
|
base_url = (
|
|
explicit_base_url
|
|
or str(state.get("inference_base_url") or auth_mod.DEFAULT_NOUS_INFERENCE_URL).strip().rstrip("/")
|
|
)
|
|
api_key = explicit_api_key or str(state.get("agent_key") or state.get("access_token") or "").strip()
|
|
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
|
|
if not api_key:
|
|
creds = resolve_nous_runtime_credentials(
|
|
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
|
|
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
|
)
|
|
api_key = creds.get("api_key", "")
|
|
expires_at = creds.get("expires_at")
|
|
if not explicit_base_url:
|
|
base_url = creds.get("base_url", "").rstrip("/") or base_url
|
|
return {
|
|
"provider": "nous",
|
|
"api_mode": "chat_completions",
|
|
"base_url": base_url,
|
|
"api_key": api_key,
|
|
"source": "explicit",
|
|
"expires_at": expires_at,
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
pconfig = PROVIDER_REGISTRY.get(provider)
|
|
if pconfig and pconfig.auth_type == "api_key":
|
|
env_url = ""
|
|
if pconfig.base_url_env_var:
|
|
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
|
|
|
|
base_url = explicit_base_url
|
|
if not base_url:
|
|
if provider == "kimi-coding":
|
|
creds = resolve_api_key_provider_credentials(provider)
|
|
base_url = creds.get("base_url", "").rstrip("/")
|
|
else:
|
|
base_url = env_url or pconfig.inference_base_url
|
|
|
|
api_key = explicit_api_key
|
|
if not api_key:
|
|
creds = resolve_api_key_provider_credentials(provider)
|
|
api_key = creds.get("api_key", "")
|
|
if not base_url:
|
|
base_url = creds.get("base_url", "").rstrip("/")
|
|
|
|
api_mode = "chat_completions"
|
|
if provider == "copilot":
|
|
api_mode = _copilot_runtime_api_mode(model_cfg, api_key)
|
|
else:
|
|
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
|
|
if configured_mode:
|
|
api_mode = configured_mode
|
|
elif base_url.rstrip("/").endswith("/anthropic"):
|
|
api_mode = "anthropic_messages"
|
|
|
|
return {
|
|
"provider": provider,
|
|
"api_mode": api_mode,
|
|
"base_url": base_url.rstrip("/"),
|
|
"api_key": api_key,
|
|
"source": "explicit",
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
return None
|
|
|
|
|
|
def resolve_runtime_provider(
|
|
*,
|
|
requested: Optional[str] = None,
|
|
explicit_api_key: Optional[str] = None,
|
|
explicit_base_url: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Resolve runtime provider credentials for agent execution."""
|
|
requested_provider = resolve_requested_provider(requested)
|
|
|
|
custom_runtime = _resolve_named_custom_runtime(
|
|
requested_provider=requested_provider,
|
|
explicit_api_key=explicit_api_key,
|
|
explicit_base_url=explicit_base_url,
|
|
)
|
|
if custom_runtime:
|
|
custom_runtime["requested_provider"] = requested_provider
|
|
return custom_runtime
|
|
|
|
provider = resolve_provider(
|
|
requested_provider,
|
|
explicit_api_key=explicit_api_key,
|
|
explicit_base_url=explicit_base_url,
|
|
)
|
|
model_cfg = _get_model_config()
|
|
explicit_runtime = _resolve_explicit_runtime(
|
|
provider=provider,
|
|
requested_provider=requested_provider,
|
|
model_cfg=model_cfg,
|
|
explicit_api_key=explicit_api_key,
|
|
explicit_base_url=explicit_base_url,
|
|
)
|
|
if explicit_runtime:
|
|
return explicit_runtime
|
|
|
|
should_use_pool = provider != "openrouter"
|
|
if provider == "openrouter":
|
|
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
|
|
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
|
|
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
|
|
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
|
|
has_custom_endpoint = bool(
|
|
explicit_base_url
|
|
or env_openai_base_url
|
|
or env_openrouter_base_url
|
|
)
|
|
if cfg_base_url and cfg_provider in {"auto", "custom"}:
|
|
has_custom_endpoint = True
|
|
has_runtime_override = bool(explicit_api_key or explicit_base_url)
|
|
should_use_pool = (
|
|
requested_provider in {"openrouter", "auto"}
|
|
and not has_custom_endpoint
|
|
and not has_runtime_override
|
|
)
|
|
|
|
try:
|
|
pool = load_pool(provider) if should_use_pool else None
|
|
except Exception:
|
|
pool = None
|
|
if pool and pool.has_credentials():
|
|
entry = pool.select()
|
|
pool_api_key = ""
|
|
if entry is not None:
|
|
pool_api_key = (
|
|
getattr(entry, "runtime_api_key", None)
|
|
or getattr(entry, "access_token", "")
|
|
)
|
|
if entry is not None and pool_api_key:
|
|
return _resolve_runtime_from_pool_entry(
|
|
provider=provider,
|
|
entry=entry,
|
|
requested_provider=requested_provider,
|
|
model_cfg=model_cfg,
|
|
pool=pool,
|
|
)
|
|
|
|
if provider == "nous":
|
|
creds = resolve_nous_runtime_credentials(
|
|
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
|
|
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
|
)
|
|
return {
|
|
"provider": "nous",
|
|
"api_mode": "chat_completions",
|
|
"base_url": creds.get("base_url", "").rstrip("/"),
|
|
"api_key": creds.get("api_key", ""),
|
|
"source": creds.get("source", "portal"),
|
|
"expires_at": creds.get("expires_at"),
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
if provider == "openai-codex":
|
|
creds = resolve_codex_runtime_credentials()
|
|
return {
|
|
"provider": "openai-codex",
|
|
"api_mode": "codex_responses",
|
|
"base_url": creds.get("base_url", "").rstrip("/"),
|
|
"api_key": creds.get("api_key", ""),
|
|
"source": creds.get("source", "hermes-auth-store"),
|
|
"last_refresh": creds.get("last_refresh"),
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
if provider == "copilot-acp":
|
|
creds = resolve_external_process_provider_credentials(provider)
|
|
return {
|
|
"provider": "copilot-acp",
|
|
"api_mode": "chat_completions",
|
|
"base_url": creds.get("base_url", "").rstrip("/"),
|
|
"api_key": creds.get("api_key", ""),
|
|
"command": creds.get("command", ""),
|
|
"args": list(creds.get("args") or []),
|
|
"source": creds.get("source", "process"),
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
# Anthropic (native Messages API)
|
|
if provider == "anthropic":
|
|
from agent.anthropic_adapter import resolve_anthropic_token
|
|
token = resolve_anthropic_token()
|
|
if not token:
|
|
raise AuthError(
|
|
"No Anthropic credentials found. Set ANTHROPIC_TOKEN or ANTHROPIC_API_KEY, "
|
|
"run 'claude setup-token', or authenticate with 'claude /login'."
|
|
)
|
|
# Allow base URL override from config.yaml model.base_url, but only
|
|
# when the configured provider is anthropic — otherwise a non-Anthropic
|
|
# base_url (e.g. Codex endpoint) would leak into Anthropic requests.
|
|
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
|
|
cfg_base_url = ""
|
|
if cfg_provider == "anthropic":
|
|
cfg_base_url = (model_cfg.get("base_url") or "").strip().rstrip("/")
|
|
base_url = cfg_base_url or "https://api.anthropic.com"
|
|
return {
|
|
"provider": "anthropic",
|
|
"api_mode": "anthropic_messages",
|
|
"base_url": base_url,
|
|
"api_key": token,
|
|
"source": "env",
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
# API-key providers (z.ai/GLM, Kimi, MiniMax, MiniMax-CN)
|
|
pconfig = PROVIDER_REGISTRY.get(provider)
|
|
if pconfig and pconfig.auth_type == "api_key":
|
|
creds = resolve_api_key_provider_credentials(provider)
|
|
base_url = creds.get("base_url", "").rstrip("/")
|
|
api_mode = "chat_completions"
|
|
if provider == "copilot":
|
|
api_mode = _copilot_runtime_api_mode(model_cfg, creds.get("api_key", ""))
|
|
else:
|
|
# Check explicit api_mode from model config first
|
|
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
|
|
if configured_mode:
|
|
api_mode = configured_mode
|
|
# Auto-detect Anthropic-compatible endpoints by URL convention
|
|
# (e.g. https://api.minimax.io/anthropic, https://dashscope.../anthropic)
|
|
elif base_url.rstrip("/").endswith("/anthropic"):
|
|
api_mode = "anthropic_messages"
|
|
return {
|
|
"provider": provider,
|
|
"api_mode": api_mode,
|
|
"base_url": base_url,
|
|
"api_key": creds.get("api_key", ""),
|
|
"source": creds.get("source", "env"),
|
|
"requested_provider": requested_provider,
|
|
}
|
|
|
|
runtime = _resolve_openrouter_runtime(
|
|
requested_provider=requested_provider,
|
|
explicit_api_key=explicit_api_key,
|
|
explicit_base_url=explicit_base_url,
|
|
)
|
|
runtime["requested_provider"] = requested_provider
|
|
return runtime
|
|
|
|
|
|
def format_runtime_provider_error(error: Exception) -> str:
|
|
if isinstance(error, AuthError):
|
|
return format_auth_error(error)
|
|
return str(error)
|