Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
8141bf8ba3 feat: verify AI Gateway provider UX and attribution headers (#950)
All checks were successful
Lint / lint (pull_request) Successful in 9s
Closes #950

- promote Vercel AI Gateway near the top of the provider picker
- add dedicated AI Gateway model flow with Vercel API-key deep link and live pricing
- use curated AI Gateway catalog refresh with free Moonshot auto-promotion
- apply AI Gateway attribution headers on runtime clients
- add targeted QA tests for provider UX and attribution headers
2026-04-22 11:40:49 -04:00
Alexander Whitestone
892c4ab70a wip: add failing AI Gateway QA tests (#950)
- add ai-gateway provider UX, pricing, and Moonshot promotion tests
- add attribution-header regression tests for run_agent base-url handling
2026-04-22 11:30:05 -04:00
7 changed files with 550 additions and 335 deletions

View File

@@ -1,4 +1,4 @@
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
"""Shared auxiliary client router for side tasks.
Provides a single resolution chain so every consumer (context compression,
session search, web extraction, vision analysis, browser vision) picks up
@@ -38,6 +38,7 @@ import json
import logging
import os
import threading
from agent.telemetry_logger import log_token_usage
import time
from pathlib import Path # noqa: F401 — used by test mocks
from types import SimpleNamespace
@@ -122,6 +123,16 @@ _OR_HEADERS = {
"X-OpenRouter-Categories": "productivity,cli-agent",
}
# Vercel AI Gateway app attribution headers. HTTP-Referer maps to
# referrerUrl and X-Title maps to appName in the gateway analytics.
from hermes_cli import __version__ as _HERMES_VERSION
_AI_GATEWAY_HEADERS = {
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-Title": "Hermes Agent",
"User-Agent": f"HermesAgent/{_HERMES_VERSION}",
}
# Nous Portal extra_body for product attribution.
# Callers should pass this as extra_body in chat.completions.create()
# when the auxiliary client is backed by Nous Portal.
@@ -396,7 +407,8 @@ class _CodexCompletionsAdapter:
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
completion_tokens=getattr(resp_usage, "output_tokens", 0),
total_tokens=getattr(resp_usage, "total_tokens", 0),
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
except Exception as exc:
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
raise
@@ -529,7 +541,8 @@ class _AnthropicCompletionsAdapter:
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
choice = SimpleNamespace(
index=0,

View File

@@ -168,7 +168,7 @@ import time as _time
from datetime import datetime
from hermes_cli import __version__, __release_date__
from hermes_constants import OPENROUTER_BASE_URL
from hermes_constants import AI_GATEWAY_BASE_URL, OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
@@ -1112,6 +1112,8 @@ def select_provider_and_model(args=None):
# Step 2: Provider-specific setup + model selection
if selected_provider == "openrouter":
_model_flow_openrouter(config, current_model)
elif selected_provider == "ai-gateway":
_model_flow_ai_gateway(config, current_model)
elif selected_provider == "nous":
_model_flow_nous(config, current_model, args=args)
elif selected_provider == "openai-codex":
@@ -1267,6 +1269,55 @@ def _model_flow_openrouter(config, current_model=""):
print("No change.")
def _model_flow_ai_gateway(config, current_model=""):
"""Vercel AI Gateway provider: ensure API key, then pick model with pricing."""
from hermes_cli.auth import _prompt_model_selection, _save_model_choice, deactivate_provider
from hermes_cli.config import get_env_value, save_env_value
from hermes_cli.models import ai_gateway_model_ids, get_pricing_for_provider
api_key = get_env_value("AI_GATEWAY_API_KEY")
if not api_key:
print("No Vercel AI Gateway API key configured.")
print("Create API key here: https://vercel.com/d?to=%2F%5Bteam%5D%2F%7E%2Fai-gateway&title=AI+Gateway")
print("Add a payment method to get $5 in free credits.")
print()
try:
import getpass
key = getpass.getpass("AI Gateway API key (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not key:
print("Cancelled.")
return
save_env_value("AI_GATEWAY_API_KEY", key)
print("API key saved.")
print()
models_list = ai_gateway_model_ids(force_refresh=True)
pricing = get_pricing_for_provider("ai-gateway", force_refresh=True)
selected = _prompt_model_selection(models_list, current_model=current_model, pricing=pricing)
if selected:
_save_model_choice(selected)
from hermes_cli.config import load_config, save_config
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "ai-gateway"
model["base_url"] = AI_GATEWAY_BASE_URL
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via Vercel AI Gateway)")
else:
print("No change.")
def _model_flow_nous(config, current_model="", args=None):
"""Nous Portal provider: ensure logged in, then pick model."""
from hermes_cli.auth import (

View File

@@ -58,6 +58,28 @@ OPENROUTER_MODELS: list[tuple[str, str]] = [
_openrouter_catalog_cache: list[tuple[str, str]] | None = None
# Fallback Vercel AI Gateway snapshot used when the live catalog is unavailable.
# OSS / open-weight models prioritized first, then closed-source by family.
VERCEL_AI_GATEWAY_MODELS: list[tuple[str, str]] = [
("moonshotai/kimi-k2.6", "recommended"),
("alibaba/qwen3.6-plus", ""),
("zai/glm-5.1", ""),
("minimax/minimax-m2.7", ""),
("anthropic/claude-sonnet-4.6", ""),
("anthropic/claude-opus-4.7", ""),
("anthropic/claude-opus-4.6", ""),
("anthropic/claude-haiku-4.5", ""),
("openai/gpt-5.4", ""),
("openai/gpt-5.4-mini", ""),
("openai/gpt-5.3-codex", ""),
("google/gemini-3.1-pro-preview", ""),
("google/gemini-3-flash", ""),
("google/gemini-3.1-flash-lite-preview", ""),
("xai/grok-4.20-reasoning", ""),
]
_ai_gateway_catalog_cache: list[tuple[str, str]] | None = None
def _codex_curated_models() -> list[str]:
"""Derive the openai-codex curated list from codex_models.py.
@@ -258,18 +280,21 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"minimax-m2.5",
],
"ai-gateway": [
"anthropic/claude-opus-4.6",
"moonshotai/kimi-k2.6",
"alibaba/qwen3.6-plus",
"zai/glm-5.1",
"minimax/minimax-m2.7",
"anthropic/claude-sonnet-4.6",
"anthropic/claude-sonnet-4.5",
"anthropic/claude-opus-4.7",
"anthropic/claude-opus-4.6",
"anthropic/claude-haiku-4.5",
"openai/gpt-5",
"openai/gpt-4.1",
"openai/gpt-4.1-mini",
"google/gemini-3-pro-preview",
"openai/gpt-5.4",
"openai/gpt-5.4-mini",
"openai/gpt-5.3-codex",
"google/gemini-3.1-pro-preview",
"google/gemini-3-flash",
"google/gemini-2.5-pro",
"google/gemini-2.5-flash",
"deepseek/deepseek-v3.2",
"google/gemini-3.1-flash-lite-preview",
"xai/grok-4.20-reasoning",
],
"kilocode": [
"anthropic/claude-opus-4.6",
@@ -516,6 +541,7 @@ class ProviderEntry(NamedTuple):
CANONICAL_PROVIDERS: list[ProviderEntry] = [
ProviderEntry("nous", "Nous Portal", "Nous Portal (Nous Research subscription)"),
ProviderEntry("openrouter", "OpenRouter", "OpenRouter (100+ models, pay-per-use)"),
ProviderEntry("ai-gateway", "Vercel AI Gateway", "Vercel AI Gateway (200+ models, $5 free credit, no markup)"),
ProviderEntry("anthropic", "Anthropic", "Anthropic (Claude models — API key or Claude Code)"),
ProviderEntry("openai-codex", "OpenAI Codex", "OpenAI Codex"),
ProviderEntry("xiaomi", "Xiaomi MiMo", "Xiaomi MiMo (MiMo-V2 models — pro, omni, flash)"),
@@ -536,7 +562,6 @@ CANONICAL_PROVIDERS: list[ProviderEntry] = [
ProviderEntry("kilocode", "Kilo Code", "Kilo Code (Kilo Gateway API)"),
ProviderEntry("opencode-zen", "OpenCode Zen", "OpenCode Zen (35+ curated models, pay-as-you-go)"),
ProviderEntry("opencode-go", "OpenCode Go", "OpenCode Go (open models, $10/month subscription)"),
ProviderEntry("ai-gateway", "Vercel AI Gateway", "Vercel AI Gateway (200+ models, pay-per-use)"),
]
# Derived dicts — used throughout the codebase
@@ -679,6 +704,90 @@ def model_ids(*, force_refresh: bool = False) -> list[str]:
def _ai_gateway_model_is_free(pricing: Any) -> bool:
"""Return True if an AI Gateway model has $0 input AND output pricing."""
if not isinstance(pricing, dict):
return False
try:
return float(pricing.get("input", "0")) == 0 and float(pricing.get("output", "0")) == 0
except (TypeError, ValueError):
return False
def fetch_ai_gateway_models(
timeout: float = 8.0,
*,
force_refresh: bool = False,
) -> list[tuple[str, str]]:
"""Return the curated AI Gateway picker list, refreshed from the live catalog when possible."""
global _ai_gateway_catalog_cache
if _ai_gateway_catalog_cache is not None and not force_refresh:
return list(_ai_gateway_catalog_cache)
from hermes_constants import AI_GATEWAY_BASE_URL
fallback = list(VERCEL_AI_GATEWAY_MODELS)
preferred_ids = [mid for mid, _ in fallback]
try:
req = urllib.request.Request(
f"{AI_GATEWAY_BASE_URL.rstrip('/')}/models",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read().decode())
except Exception:
return list(_ai_gateway_catalog_cache or fallback)
live_items = payload.get("data", [])
if not isinstance(live_items, list):
return list(_ai_gateway_catalog_cache or fallback)
live_by_id: dict[str, dict[str, Any]] = {}
for item in live_items:
if not isinstance(item, dict):
continue
mid = str(item.get("id") or "").strip()
if not mid:
continue
live_by_id[mid] = item
curated: list[tuple[str, str]] = []
for preferred_id in preferred_ids:
live_item = live_by_id.get(preferred_id)
if live_item is None:
continue
desc = "free" if _ai_gateway_model_is_free(live_item.get("pricing")) else ""
curated.append((preferred_id, desc))
if not curated:
return list(_ai_gateway_catalog_cache or fallback)
free_moonshot = next(
(
mid
for mid, item in live_by_id.items()
if mid.startswith("moonshotai/") and _ai_gateway_model_is_free(item.get("pricing"))
),
None,
)
if free_moonshot:
curated = [(mid, desc) for mid, desc in curated if mid != free_moonshot]
curated.insert(0, (free_moonshot, "recommended"))
else:
first_id, _ = curated[0]
curated[0] = (first_id, "recommended")
_ai_gateway_catalog_cache = curated
return list(curated)
def ai_gateway_model_ids(*, force_refresh: bool = False) -> list[str]:
"""Return just the AI Gateway model-id strings."""
return [mid for mid, _ in fetch_ai_gateway_models(force_refresh=force_refresh)]
# ---------------------------------------------------------------------------
# Pricing helpers — fetch live pricing from OpenRouter-compatible /v1/models
# ---------------------------------------------------------------------------
@@ -821,6 +930,51 @@ def fetch_models_with_pricing(
return result
def fetch_ai_gateway_pricing(
timeout: float = 8.0,
*,
force_refresh: bool = False,
) -> dict[str, dict[str, str]]:
"""Fetch Vercel AI Gateway /v1/models and return Hermes-shaped pricing."""
from hermes_constants import AI_GATEWAY_BASE_URL
cache_key = AI_GATEWAY_BASE_URL.rstrip("/")
if not force_refresh and cache_key in _pricing_cache:
return _pricing_cache[cache_key]
try:
req = urllib.request.Request(
f"{cache_key}/models",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read().decode())
except Exception:
_pricing_cache[cache_key] = {}
return {}
result: dict[str, dict[str, str]] = {}
for item in payload.get("data", []):
if not isinstance(item, dict):
continue
mid = item.get("id")
pricing = item.get("pricing")
if not (mid and isinstance(pricing, dict)):
continue
entry: dict[str, str] = {
"prompt": str(pricing.get("input", "")),
"completion": str(pricing.get("output", "")),
}
if pricing.get("input_cache_read"):
entry["input_cache_read"] = str(pricing["input_cache_read"])
if pricing.get("input_cache_write"):
entry["input_cache_write"] = str(pricing["input_cache_write"])
result[mid] = entry
_pricing_cache[cache_key] = result
return result
def _resolve_openrouter_api_key() -> str:
"""Best-effort OpenRouter API key for pricing fetch."""
return os.getenv("OPENROUTER_API_KEY", "").strip()
@@ -839,7 +993,7 @@ def _resolve_nous_pricing_credentials() -> tuple[str, str]:
def get_pricing_for_provider(provider: str, *, force_refresh: bool = False) -> dict[str, dict[str, str]]:
"""Return live pricing for providers that support it (openrouter, nous)."""
"""Return live pricing for providers that support it (openrouter, ai-gateway, nous)."""
normalized = normalize_provider(provider)
if normalized == "openrouter":
return fetch_models_with_pricing(
@@ -847,11 +1001,11 @@ def get_pricing_for_provider(provider: str, *, force_refresh: bool = False) -> d
base_url="https://openrouter.ai/api",
force_refresh=force_refresh,
)
if normalized == "ai-gateway":
return fetch_ai_gateway_pricing(force_refresh=force_refresh)
if normalized == "nous":
api_key, base_url = _resolve_nous_pricing_credentials()
if base_url:
# Nous base_url typically looks like https://inference-api.nousresearch.com/v1
# We need the part before /v1 for our fetch function
stripped = base_url.rstrip("/")
if stripped.endswith("/v1"):
stripped = stripped[:-3]
@@ -1253,9 +1407,7 @@ def provider_model_ids(provider: Optional[str], *, force_refresh: bool = False)
if live:
return live
if normalized == "ai-gateway":
live = _fetch_ai_gateway_models()
if live:
return live
return ai_gateway_model_ids()
if normalized == "custom":
base_url = _get_custom_base_url()
if base_url:

View File

@@ -20,7 +20,6 @@ Usage:
response = agent.run_conversation("Tell me about the latest Python updates")
"""
import ast
import asyncio
import base64
import concurrent.futures
@@ -909,6 +908,10 @@ class AIAgent:
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
}
elif "ai-gateway.vercel.sh" in effective_base.lower():
from agent.auxiliary_client import _AI_GATEWAY_HEADERS
client_kwargs["default_headers"] = dict(_AI_GATEWAY_HEADERS)
elif "api.githubcopilot.com" in effective_base.lower():
from hermes_cli.models import copilot_default_headers
@@ -3329,119 +3332,6 @@ class AIAgent:
_VALID_API_ROLES = frozenset({"system", "user", "assistant", "tool", "function", "developer"})
@staticmethod
def _normalize_tool_call_arguments(arguments: Any) -> tuple[str, bool]:
"""Return ``(normalized_text, is_complete)`` for tool-call arguments.
Conservative by design: repairs harmless formatting quirks common in
Gemma 4 / Ollama output (whitespace, trailing commas, Python-style
single-quoted dicts, bare key/value pairs) but does NOT auto-close
truncated JSON objects. Truly incomplete fragments must remain marked
incomplete so the agent can retry instead of silently dropping fields.
"""
if isinstance(arguments, (dict, list)):
return json.dumps(arguments, ensure_ascii=False, separators=(",", ":")), True
if arguments is None:
return "{}", True
if not isinstance(arguments, str):
arguments = str(arguments)
text = arguments.strip()
if not text:
return "{}", True
def _parse_candidate(candidate: str):
try:
return json.loads(candidate)
except (json.JSONDecodeError, TypeError, ValueError):
pass
try:
return ast.literal_eval(candidate)
except (SyntaxError, ValueError):
return None
candidates: list[str] = [text]
trimmed_trailing_commas = re.sub(r",\s*([}\]])", r"\1", text)
if trimmed_trailing_commas != text:
candidates.append(trimmed_trailing_commas)
if ":" in text and not text.startswith(("{", "[")):
wrapped = "{" + text + "}"
candidates.append(wrapped)
quoted_keys = re.sub(
r'([\{,]\s*)([A-Za-z_][A-Za-z0-9_\-]*)(\s*:)',
r'\1"\2"\3',
wrapped,
)
if quoted_keys != wrapped:
candidates.append(quoted_keys)
trimmed_quoted_keys = re.sub(r",\s*([}\]])", r"\1", quoted_keys)
if trimmed_quoted_keys != quoted_keys:
candidates.append(trimmed_quoted_keys)
seen: set[str] = set()
for candidate in candidates:
if candidate in seen:
continue
seen.add(candidate)
parsed = _parse_candidate(candidate)
if isinstance(parsed, (dict, list)):
return json.dumps(parsed, ensure_ascii=False, separators=(",", ":")), True
return text, False
@staticmethod
def _merge_consecutive_assistant_tool_call_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Merge adjacent assistant messages that each carry tool_calls.
Some providers emit parallel tool calls as multiple consecutive assistant
messages instead of a single assistant message with multiple tool calls.
Merge only adjacent assistant/tool-call messages; any non-assistant
boundary flushes the current batch.
"""
merged: List[Dict[str, Any]] = []
pending: Optional[Dict[str, Any]] = None
def _flush_pending() -> None:
nonlocal pending
if pending is not None:
merged.append(pending)
pending = None
for msg in messages:
if not isinstance(msg, dict):
_flush_pending()
merged.append(msg)
continue
role = msg.get("role")
tool_calls = msg.get("tool_calls")
if role == "assistant" and isinstance(tool_calls, list) and tool_calls:
if pending is None:
pending = copy.deepcopy(msg)
continue
pending_tool_calls = pending.get("tool_calls")
if not isinstance(pending_tool_calls, list):
pending_tool_calls = []
pending["tool_calls"] = pending_tool_calls
pending_tool_calls.extend(copy.deepcopy(tool_calls))
pending_content = pending.get("content") or ""
current_content = msg.get("content") or ""
if pending_content and current_content:
pending["content"] = pending_content + "\n" + current_content
elif current_content:
pending["content"] = current_content
continue
_flush_pending()
merged.append(msg)
_flush_pending()
return merged
@staticmethod
def _sanitize_api_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Fix orphaned tool_call / tool_result pairs before every LLM call.
@@ -3461,7 +3351,7 @@ class AIAgent:
)
continue
filtered.append(msg)
messages = AIAgent._merge_consecutive_assistant_tool_call_messages(filtered)
messages = filtered
surviving_call_ids: set = set()
for msg in messages:
@@ -4781,11 +4671,13 @@ class AIAgent:
return True
def _apply_client_headers_for_base_url(self, base_url: str) -> None:
from agent.auxiliary_client import _OR_HEADERS
from agent.auxiliary_client import _AI_GATEWAY_HEADERS, _OR_HEADERS
normalized = (base_url or "").lower()
if "openrouter" in normalized:
self._client_kwargs["default_headers"] = dict(_OR_HEADERS)
elif "ai-gateway.vercel.sh" in normalized:
self._client_kwargs["default_headers"] = dict(_AI_GATEWAY_HEADERS)
elif "api.githubcopilot.com" in normalized:
from hermes_cli.models import copilot_default_headers
@@ -5368,9 +5260,12 @@ class AIAgent:
mock_tool_calls = []
for idx in sorted(tool_calls_acc):
tc = tool_calls_acc[idx]
arguments, is_complete = self._normalize_tool_call_arguments(tc["function"]["arguments"])
if not is_complete:
has_truncated_tool_args = True
arguments = tc["function"]["arguments"]
if arguments and arguments.strip():
try:
json.loads(arguments)
except json.JSONDecodeError:
has_truncated_tool_args = True
mock_tool_calls.append(SimpleNamespace(
id=tc["id"],
type=tc["type"],
@@ -6674,7 +6569,6 @@ class AIAgent:
response_item_id if isinstance(response_item_id, str) else None,
)
normalized_args, _ = self._normalize_tool_call_arguments(tool_call.function.arguments)
tc_dict = {
"id": call_id,
"call_id": call_id,
@@ -6682,7 +6576,7 @@ class AIAgent:
"type": tool_call.type,
"function": {
"name": tool_call.function.name,
"arguments": normalized_args,
"arguments": tool_call.function.arguments
},
}
# Preserve extra_content (e.g. Gemini thought_signature) so it
@@ -10143,15 +10037,21 @@ class AIAgent:
# Handle empty strings as empty objects (common model quirk)
invalid_json_args = []
for tc in assistant_message.tool_calls:
normalized_args, is_complete = self._normalize_tool_call_arguments(tc.function.arguments)
tc.function.arguments = normalized_args
if not is_complete:
try:
json.loads(normalized_args)
except json.JSONDecodeError as e:
invalid_json_args.append((tc.function.name, str(e)))
except Exception as e:
invalid_json_args.append((tc.function.name, str(e)))
args = tc.function.arguments
if isinstance(args, (dict, list)):
tc.function.arguments = json.dumps(args)
continue
if args is not None and not isinstance(args, str):
tc.function.arguments = str(args)
args = tc.function.arguments
# Treat empty/whitespace strings as empty object
if not args or not args.strip():
tc.function.arguments = "{}"
continue
try:
json.loads(args)
except json.JSONDecodeError as e:
invalid_json_args.append((tc.function.name, str(e)))
if invalid_json_args:
# Check if the invalid JSON is due to truncation rather

View File

@@ -0,0 +1,222 @@
"""AI Gateway provider UX, live pricing, and model promotion tests."""
from __future__ import annotations
import json
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli import models as models_module
from hermes_cli.models import (
CANONICAL_PROVIDERS,
VERCEL_AI_GATEWAY_MODELS,
_ai_gateway_model_is_free,
ai_gateway_model_ids,
fetch_ai_gateway_models,
fetch_ai_gateway_pricing,
get_pricing_for_provider,
)
def _mock_urlopen(payload):
resp = MagicMock()
resp.read.return_value = json.dumps(payload).encode()
ctx = MagicMock()
ctx.__enter__.return_value = resp
ctx.__exit__.return_value = False
return ctx
def _reset_caches():
models_module._ai_gateway_catalog_cache = None
models_module._pricing_cache.clear()
@pytest.fixture
def config_home(tmp_path, monkeypatch):
home = tmp_path / "hermes"
home.mkdir()
(home / "config.yaml").write_text("model: some-old-model\n")
(home / ".env").write_text("")
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.delenv("AI_GATEWAY_API_KEY", raising=False)
monkeypatch.delenv("AI_GATEWAY_BASE_URL", raising=False)
return home
def test_ai_gateway_provider_is_promoted_near_top_of_picker():
slugs = [entry.slug for entry in CANONICAL_PROVIDERS]
assert "ai-gateway" in slugs[:3]
def test_ai_gateway_pricing_translates_input_output_to_prompt_completion():
_reset_caches()
payload = {
"data": [
{
"id": "moonshotai/kimi-k2.5",
"type": "language",
"pricing": {
"input": "0.0000006",
"output": "0.0000025",
"input_cache_read": "0.00000015",
"input_cache_write": "0.0000006",
},
}
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_pricing(force_refresh=True)
entry = result["moonshotai/kimi-k2.5"]
assert entry["prompt"] == "0.0000006"
assert entry["completion"] == "0.0000025"
assert entry["input_cache_read"] == "0.00000015"
assert entry["input_cache_write"] == "0.0000006"
def test_get_pricing_for_provider_supports_ai_gateway():
_reset_caches()
payload = {
"data": [
{
"id": "moonshotai/kimi-k2.5",
"type": "language",
"pricing": {"input": "0.0001", "output": "0.0002"},
}
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = get_pricing_for_provider("ai-gateway", force_refresh=True)
assert result["moonshotai/kimi-k2.5"] == {"prompt": "0.0001", "completion": "0.0002"}
def test_ai_gateway_pricing_returns_empty_on_fetch_failure():
_reset_caches()
with patch("urllib.request.urlopen", side_effect=OSError("network down")):
result = fetch_ai_gateway_pricing(force_refresh=True)
assert result == {}
def test_ai_gateway_pricing_skips_entries_without_pricing_dict():
_reset_caches()
payload = {
"data": [
{"id": "x/y", "pricing": None},
{"id": "a/b", "pricing": {"input": "0", "output": "0"}},
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_pricing(force_refresh=True)
assert "x/y" not in result
assert result["a/b"] == {"prompt": "0", "completion": "0"}
def test_ai_gateway_free_detector():
assert _ai_gateway_model_is_free({"input": "0", "output": "0"}) is True
assert _ai_gateway_model_is_free({"input": "0", "output": "0.01"}) is False
assert _ai_gateway_model_is_free({"input": "0.01", "output": "0"}) is False
assert _ai_gateway_model_is_free(None) is False
assert _ai_gateway_model_is_free({"input": "not a number"}) is False
def test_fetch_ai_gateway_models_filters_against_live_catalog():
_reset_caches()
preferred = [mid for mid, _ in VERCEL_AI_GATEWAY_MODELS]
live_ids = preferred[:3]
payload = {
"data": [
{"id": mid, "pricing": {"input": "0.001", "output": "0.002"}}
for mid in live_ids
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_models(force_refresh=True)
assert [mid for mid, _ in result] == live_ids
assert result[0][1] == "recommended"
assert ai_gateway_model_ids(force_refresh=False) == live_ids
def test_fetch_ai_gateway_models_tags_free_models():
_reset_caches()
first_id = VERCEL_AI_GATEWAY_MODELS[0][0]
second_id = VERCEL_AI_GATEWAY_MODELS[1][0]
payload = {
"data": [
{"id": first_id, "pricing": {"input": "0.001", "output": "0.002"}},
{"id": second_id, "pricing": {"input": "0", "output": "0"}},
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_models(force_refresh=True)
by_id = dict(result)
assert by_id[first_id] == "recommended"
assert by_id[second_id] == "free"
def test_free_moonshot_model_auto_promoted_to_top_even_if_not_curated():
_reset_caches()
first_curated = VERCEL_AI_GATEWAY_MODELS[0][0]
unlisted_free_moonshot = "moonshotai/kimi-coder-free-preview"
payload = {
"data": [
{"id": first_curated, "pricing": {"input": "0.001", "output": "0.002"}},
{"id": unlisted_free_moonshot, "pricing": {"input": "0", "output": "0"}},
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_models(force_refresh=True)
assert result[0] == (unlisted_free_moonshot, "recommended")
assert any(mid == first_curated for mid, _ in result)
def test_paid_moonshot_does_not_get_auto_promoted():
_reset_caches()
first_curated = VERCEL_AI_GATEWAY_MODELS[0][0]
payload = {
"data": [
{"id": first_curated, "pricing": {"input": "0.001", "output": "0.002"}},
{"id": "moonshotai/some-paid-variant", "pricing": {"input": "0.001", "output": "0.002"}},
]
}
with patch("urllib.request.urlopen", return_value=_mock_urlopen(payload)):
result = fetch_ai_gateway_models(force_refresh=True)
assert result[0][0] == first_curated
def test_fetch_ai_gateway_models_falls_back_on_error():
_reset_caches()
with patch("urllib.request.urlopen", side_effect=OSError("network")):
result = fetch_ai_gateway_models(force_refresh=True)
assert result == list(VERCEL_AI_GATEWAY_MODELS)
def test_ai_gateway_setup_flow_shows_deeplink_and_passes_pricing(config_home, monkeypatch, capsys):
from hermes_cli.main import _model_flow_ai_gateway
from hermes_cli.config import load_config
pricing = {"moonshotai/kimi-k2.6": {"prompt": "0", "completion": "0"}}
monkeypatch.setenv("HERMES_HOME", str(config_home))
with patch("getpass.getpass", return_value="vercel-key"), \
patch("hermes_cli.models.ai_gateway_model_ids", return_value=["moonshotai/kimi-k2.6"]), \
patch("hermes_cli.models.get_pricing_for_provider", return_value=pricing), \
patch("hermes_cli.auth._prompt_model_selection", return_value="moonshotai/kimi-k2.6") as prompt_selection, \
patch("hermes_cli.auth.deactivate_provider"):
_model_flow_ai_gateway(load_config(), "")
out = capsys.readouterr().out
assert "vercel.com/d?to=%2F%5Bteam%5D%2F%7E%2Fai-gateway&title=AI+Gateway" in out
assert "free credits" in out.lower()
assert prompt_selection.call_args.kwargs["pricing"] == pricing
import yaml
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config["model"]
assert model["provider"] == "ai-gateway"
assert model["api_mode"] == "chat_completions"

View File

@@ -0,0 +1,62 @@
"""Attribution default_headers applied per provider via base-URL detection."""
from unittest.mock import MagicMock, patch
from run_agent import AIAgent
@patch("run_agent.OpenAI")
def test_openrouter_base_url_applies_or_headers(mock_openai):
mock_openai.return_value = MagicMock()
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent._apply_client_headers_for_base_url("https://openrouter.ai/api/v1")
headers = agent._client_kwargs["default_headers"]
assert headers["HTTP-Referer"] == "https://hermes-agent.nousresearch.com"
assert headers["X-OpenRouter-Title"] == "Hermes Agent"
@patch("run_agent.OpenAI")
def test_ai_gateway_base_url_applies_attribution_headers(mock_openai):
mock_openai.return_value = MagicMock()
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent._apply_client_headers_for_base_url("https://ai-gateway.vercel.sh/v1")
headers = agent._client_kwargs["default_headers"]
assert headers["HTTP-Referer"] == "https://hermes-agent.nousresearch.com"
assert headers["X-Title"] == "Hermes Agent"
assert headers["User-Agent"].startswith("HermesAgent/")
@patch("run_agent.OpenAI")
def test_unknown_base_url_clears_default_headers(mock_openai):
mock_openai.return_value = MagicMock()
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent._client_kwargs["default_headers"] = {"X-Stale": "yes"}
agent._apply_client_headers_for_base_url("https://api.example.com/v1")
assert "default_headers" not in agent._client_kwargs

View File

@@ -1037,138 +1037,6 @@ class TestBuildAssistantMessage:
result = agent._build_assistant_message(msg, "tool_calls")
assert "extra_content" not in result["tool_calls"][0]
def test_tool_call_arguments_normalized_from_gemma4_whitespace(self, agent):
tc = _mock_tool_call(
name="read_file",
arguments=' \n {"path": "README.md"} \n ',
call_id="c4",
)
msg = _mock_assistant_msg(content="", tool_calls=[tc])
result = agent._build_assistant_message(msg, "tool_calls")
assert result["tool_calls"][0]["function"]["arguments"] == '{"path":"README.md"}'
def test_tool_call_arguments_normalized_from_single_quotes_and_trailing_comma(self, agent):
tc = _mock_tool_call(
name="read_file",
arguments="{'path': 'README.md',}",
call_id="c5",
)
msg = _mock_assistant_msg(content="", tool_calls=[tc])
result = agent._build_assistant_message(msg, "tool_calls")
assert result["tool_calls"][0]["function"]["arguments"] == '{"path":"README.md"}'
class TestNormalizeToolCallArguments:
@pytest.mark.parametrize(
("raw_args", "expected"),
[
('{"q":"test"}', '{"q":"test"}'),
(' \n {"q": "test"} \n ', '{"q":"test"}'),
('{"q": "test",}', '{"q":"test"}'),
("{'q': 'test'}", '{"q":"test"}'),
("{'path': 'README.md', 'mode': 'read'}", '{"path":"README.md","mode":"read"}'),
('"path": "README.md"', '{"path":"README.md"}'),
('path: "README.md"', '{"path":"README.md"}'),
('path: "README.md", mode: "read"', '{"path":"README.md","mode":"read"}'),
({"path": "README.md"}, '{"path":"README.md"}'),
(["README.md", "docs.md"], '["README.md","docs.md"]'),
('\t\n ', '{}'),
('{"nested": {"path": "README.md"}}', '{"nested":{"path":"README.md"}}'),
],
)
def test_complete_args_are_normalized(self, raw_args, expected):
normalized, is_complete = AIAgent._normalize_tool_call_arguments(raw_args)
assert is_complete is True
assert normalized == expected
@pytest.mark.parametrize(
"raw_args",
[
'{"path": "README.md"',
'{"a": 1, "b"',
'{"path": [1, 2}',
"{'path': 'README.md'",
'path: "README.md", mode:',
'{"command": "echo hello",',
],
)
def test_incomplete_args_are_not_marked_complete(self, raw_args):
normalized, is_complete = AIAgent._normalize_tool_call_arguments(raw_args)
assert is_complete is False
assert isinstance(normalized, str)
assert normalized == raw_args.strip()
class TestSanitizeApiMessages:
def test_merges_consecutive_assistant_tool_call_messages(self):
messages = [
{
"role": "assistant",
"content": "first",
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
},
{
"role": "assistant",
"content": "second",
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "search_files", "arguments": '{"pattern":"TODO"}'}}],
},
{"role": "tool", "tool_call_id": "c1", "content": "a.py"},
{"role": "tool", "tool_call_id": "c2", "content": "matches"},
]
sanitized = AIAgent._sanitize_api_messages(messages)
assert len(sanitized) == 3
assert sanitized[0]["role"] == "assistant"
assert [tc["id"] for tc in sanitized[0]["tool_calls"]] == ["c1", "c2"]
assert sanitized[0]["content"] == "first\nsecond"
def test_does_not_merge_assistant_tool_call_messages_across_non_assistant_boundary(self):
messages = [
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
},
{"role": "tool", "tool_call_id": "c1", "content": "a.py"},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"b.py"}'}}],
},
{"role": "tool", "tool_call_id": "c2", "content": "b.py"},
]
sanitized = AIAgent._sanitize_api_messages(messages)
assistant_msgs = [m for m in sanitized if m.get("role") == "assistant"]
assert len(assistant_msgs) == 2
assert assistant_msgs[0]["tool_calls"][0]["id"] == "c1"
assert assistant_msgs[1]["tool_calls"][0]["id"] == "c2"
def test_merge_preserves_tool_call_order(self):
messages = [
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"b.py"}'}}],
},
{
"role": "assistant",
"content": "",
"tool_calls": [{"id": "c3", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"c.py"}'}}],
},
]
sanitized = AIAgent._sanitize_api_messages(messages)
assert [tc["id"] for tc in sanitized[0]["tool_calls"]] == ["c1", "c2", "c3"]
class TestFormatToolsForSystemMessage:
def test_no_tools_returns_empty_array(self, agent):
@@ -3599,59 +3467,6 @@ class TestStreamingApiCall:
assert tc[0].function.arguments == '{"path":"x.txt","content":"hel'
assert resp.choices[0].finish_reason == "length"
@pytest.mark.parametrize(
("raw_arguments", "expected"),
[
(' \n {"path": "x.txt"} \n ', '{"path":"x.txt"}'),
("{'path': 'x.txt',}", '{"path":"x.txt"}'),
('path: "x.txt", mode: "read"', '{"path":"x.txt","mode":"read"}'),
],
)
def test_repairable_tool_call_args_do_not_upgrade_finish_reason_to_length(self, agent, raw_arguments, expected):
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", raw_arguments)]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 1
assert tc[0].function.name == "read_file"
assert tc[0].function.arguments == expected
assert resp.choices[0].finish_reason == "tool_calls"
def test_streamed_tool_call_args_single_quotes_across_chunks_normalized(self, agent):
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", "{'path':")]),
_make_chunk(tool_calls=[_make_tc_delta(0, None, None, " 'x.txt',}")]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 1
assert tc[0].function.arguments == '{"path":"x.txt"}'
assert resp.choices[0].finish_reason == "tool_calls"
def test_streamed_split_json_chunks_still_reassemble(self, agent):
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", '{"path":')]),
_make_chunk(tool_calls=[_make_tc_delta(0, None, None, ' "x.txt"}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 1
assert tc[0].function.arguments == '{"path":"x.txt"}'
assert resp.choices[0].finish_reason == "tool_calls"
def test_ollama_reused_index_separate_tool_calls(self, agent):
"""Ollama sends every tool call at index 0 with different ids.