Based on #6079 by @tunamitom with critical fixes and comprehensive tests. Changes from #6079: - Fix: sanitization overwrite bug — Qwen message prep now runs AFTER codex field sanitization, not before (was silently discarding Qwen transforms) - Fix: missing try/except AuthError in runtime_provider.py — stale Qwen credentials now fall through to next provider on auto-detect - Fix: 'qwen' alias conflict — bare 'qwen' stays mapped to 'alibaba' (DashScope); use 'qwen-portal' or 'qwen-cli' for the OAuth provider - Fix: hardcoded ['coder-model'] replaced with live API fetch + curated fallback list (qwen3-coder-plus, qwen3-coder) - Fix: extract _is_qwen_portal() helper + _qwen_portal_headers() to replace 5 inline 'portal.qwen.ai' string checks and share headers between init and credential swap - Fix: add Qwen branch to _apply_client_headers_for_base_url for mid-session credential swaps - Fix: remove suspicious TypeError catch blocks around _prompt_provider_choice - Fix: handle bare string items in content lists (were silently dropped) - Fix: remove redundant dict() copies after deepcopy in message prep - Revert: unrelated ai-gateway test mock removal and model_switch.py comment deletion New tests (30 test functions): - _qwen_cli_auth_path, _read_qwen_cli_tokens (success + 3 error paths) - _save_qwen_cli_tokens (roundtrip, parent creation, permissions) - _qwen_access_token_is_expiring (5 edge cases: fresh, expired, within skew, None, non-numeric) - _refresh_qwen_cli_tokens (success, preserve old refresh, 4 error paths, default expires_in, disk persistence) - resolve_qwen_runtime_credentials (fresh, auto-refresh, force-refresh, missing token, env override) - get_qwen_auth_status (logged in, not logged in) - Runtime provider resolution (direct, pool entry, alias) - _build_api_kwargs (metadata, vl_high_resolution_images, message formatting, max_tokens suppression)
400 lines
13 KiB
Python
400 lines
13 KiB
Python
"""Tests for Qwen OAuth provider authentication (hermes_cli/auth.py).
|
|
|
|
Covers: _qwen_cli_auth_path, _read_qwen_cli_tokens, _save_qwen_cli_tokens,
|
|
_qwen_access_token_is_expiring, _refresh_qwen_cli_tokens,
|
|
resolve_qwen_runtime_credentials, get_qwen_auth_status.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import stat
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from hermes_cli.auth import (
|
|
AuthError,
|
|
DEFAULT_QWEN_BASE_URL,
|
|
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
|
|
_qwen_cli_auth_path,
|
|
_read_qwen_cli_tokens,
|
|
_save_qwen_cli_tokens,
|
|
_qwen_access_token_is_expiring,
|
|
_refresh_qwen_cli_tokens,
|
|
resolve_qwen_runtime_credentials,
|
|
get_qwen_auth_status,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_qwen_tokens(
|
|
access_token="test-access-token",
|
|
refresh_token="test-refresh-token",
|
|
expiry_date=None,
|
|
**extra,
|
|
):
|
|
"""Create a minimal Qwen CLI OAuth credential dict."""
|
|
if expiry_date is None:
|
|
# 1 hour from now in milliseconds
|
|
expiry_date = int((time.time() + 3600) * 1000)
|
|
data = {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"token_type": "Bearer",
|
|
"expiry_date": expiry_date,
|
|
"resource_url": "portal.qwen.ai",
|
|
}
|
|
data.update(extra)
|
|
return data
|
|
|
|
|
|
def _write_qwen_creds(tmp_path, tokens=None):
|
|
"""Write tokens to the Qwen CLI credentials file and return the path."""
|
|
qwen_dir = tmp_path / ".qwen"
|
|
qwen_dir.mkdir(parents=True, exist_ok=True)
|
|
creds_path = qwen_dir / "oauth_creds.json"
|
|
if tokens is None:
|
|
tokens = _make_qwen_tokens()
|
|
creds_path.write_text(json.dumps(tokens), encoding="utf-8")
|
|
return creds_path
|
|
|
|
|
|
@pytest.fixture()
|
|
def qwen_env(tmp_path, monkeypatch):
|
|
"""Redirect _qwen_cli_auth_path to tmp_path/.qwen/oauth_creds.json."""
|
|
creds_path = tmp_path / ".qwen" / "oauth_creds.json"
|
|
monkeypatch.setattr(
|
|
"hermes_cli.auth._qwen_cli_auth_path", lambda: creds_path
|
|
)
|
|
return tmp_path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _qwen_cli_auth_path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_qwen_cli_auth_path_returns_expected_location():
|
|
path = _qwen_cli_auth_path()
|
|
assert path == Path.home() / ".qwen" / "oauth_creds.json"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _read_qwen_cli_tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_read_qwen_cli_tokens_success(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="my-access")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
result = _read_qwen_cli_tokens()
|
|
assert result["access_token"] == "my-access"
|
|
assert result["refresh_token"] == "test-refresh-token"
|
|
|
|
|
|
def test_read_qwen_cli_tokens_missing_file(qwen_env):
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_qwen_cli_tokens()
|
|
assert exc.value.code == "qwen_auth_missing"
|
|
|
|
|
|
def test_read_qwen_cli_tokens_invalid_json(qwen_env):
|
|
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
|
|
creds_path.parent.mkdir(parents=True, exist_ok=True)
|
|
creds_path.write_text("not json{{{", encoding="utf-8")
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_qwen_cli_tokens()
|
|
assert exc.value.code == "qwen_auth_read_failed"
|
|
|
|
|
|
def test_read_qwen_cli_tokens_non_dict(qwen_env):
|
|
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
|
|
creds_path.parent.mkdir(parents=True, exist_ok=True)
|
|
creds_path.write_text(json.dumps(["a", "b"]), encoding="utf-8")
|
|
with pytest.raises(AuthError) as exc:
|
|
_read_qwen_cli_tokens()
|
|
assert exc.value.code == "qwen_auth_invalid"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _save_qwen_cli_tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_save_qwen_cli_tokens_roundtrip(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="saved-token")
|
|
saved_path = _save_qwen_cli_tokens(tokens)
|
|
assert saved_path.exists()
|
|
loaded = json.loads(saved_path.read_text(encoding="utf-8"))
|
|
assert loaded["access_token"] == "saved-token"
|
|
|
|
|
|
def test_save_qwen_cli_tokens_creates_parent(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
saved_path = _save_qwen_cli_tokens(tokens)
|
|
assert saved_path.parent.exists()
|
|
|
|
|
|
def test_save_qwen_cli_tokens_permissions(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
saved_path = _save_qwen_cli_tokens(tokens)
|
|
mode = saved_path.stat().st_mode
|
|
assert mode & stat.S_IRUSR # owner read
|
|
assert mode & stat.S_IWUSR # owner write
|
|
assert not (mode & stat.S_IRGRP) # no group read
|
|
assert not (mode & stat.S_IROTH) # no other read
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _qwen_access_token_is_expiring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_expiring_token_not_expired():
|
|
# 1 hour from now in milliseconds
|
|
future_ms = int((time.time() + 3600) * 1000)
|
|
assert not _qwen_access_token_is_expiring(future_ms)
|
|
|
|
|
|
def test_expiring_token_already_expired():
|
|
# 1 hour ago in milliseconds
|
|
past_ms = int((time.time() - 3600) * 1000)
|
|
assert _qwen_access_token_is_expiring(past_ms)
|
|
|
|
|
|
def test_expiring_token_within_skew():
|
|
# Just inside the default skew window
|
|
near_ms = int((time.time() + QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS - 5) * 1000)
|
|
assert _qwen_access_token_is_expiring(near_ms)
|
|
|
|
|
|
def test_expiring_token_none_returns_true():
|
|
assert _qwen_access_token_is_expiring(None)
|
|
|
|
|
|
def test_expiring_token_non_numeric_returns_true():
|
|
assert _qwen_access_token_is_expiring("not-a-number")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _refresh_qwen_cli_tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_refresh_qwen_cli_tokens_success(qwen_env):
|
|
tokens = _make_qwen_tokens(refresh_token="old-refresh")
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.return_value = {
|
|
"access_token": "new-access",
|
|
"refresh_token": "new-refresh",
|
|
"expires_in": 7200,
|
|
}
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
result = _refresh_qwen_cli_tokens(tokens)
|
|
|
|
assert result["access_token"] == "new-access"
|
|
assert result["refresh_token"] == "new-refresh"
|
|
assert "expiry_date" in result
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_preserves_old_refresh_if_not_in_response(qwen_env):
|
|
tokens = _make_qwen_tokens(refresh_token="keep-me")
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.return_value = {
|
|
"access_token": "new-access",
|
|
# No refresh_token in response — should keep old one
|
|
"expires_in": 3600,
|
|
}
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
result = _refresh_qwen_cli_tokens(tokens)
|
|
|
|
assert result["refresh_token"] == "keep-me"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_missing_refresh_token():
|
|
tokens = {"access_token": "at", "refresh_token": ""}
|
|
with pytest.raises(AuthError) as exc:
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
assert exc.value.code == "qwen_refresh_token_missing"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_http_error(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 401
|
|
resp.text = "unauthorized"
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
with pytest.raises(AuthError) as exc:
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
assert exc.value.code == "qwen_refresh_failed"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_network_error(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.side_effect = ConnectionError("timeout")
|
|
with pytest.raises(AuthError) as exc:
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
assert exc.value.code == "qwen_refresh_failed"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_invalid_json_response(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.side_effect = ValueError("bad json")
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
with pytest.raises(AuthError) as exc:
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
assert exc.value.code == "qwen_refresh_invalid_json"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_missing_access_token_in_response(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.return_value = {"something": "but no access_token"}
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
with pytest.raises(AuthError) as exc:
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
assert exc.value.code == "qwen_refresh_invalid_response"
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_default_expires_in(qwen_env):
|
|
"""When expires_in is missing, default to 6 hours."""
|
|
tokens = _make_qwen_tokens()
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.return_value = {"access_token": "new"}
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
result = _refresh_qwen_cli_tokens(tokens)
|
|
|
|
# Verify expiry_date is roughly now + 6h (within 60s tolerance)
|
|
expected_ms = int(time.time() * 1000) + 6 * 60 * 60 * 1000
|
|
assert abs(result["expiry_date"] - expected_ms) < 60_000
|
|
|
|
|
|
def test_refresh_qwen_cli_tokens_saves_to_disk(qwen_env):
|
|
tokens = _make_qwen_tokens()
|
|
|
|
resp = MagicMock()
|
|
resp.status_code = 200
|
|
resp.json.return_value = {
|
|
"access_token": "disk-check",
|
|
"expires_in": 3600,
|
|
}
|
|
|
|
with patch("hermes_cli.auth.httpx") as mock_httpx:
|
|
mock_httpx.post.return_value = resp
|
|
_refresh_qwen_cli_tokens(tokens)
|
|
|
|
# Verify it was persisted
|
|
creds_path = qwen_env / ".qwen" / "oauth_creds.json"
|
|
assert creds_path.exists()
|
|
saved = json.loads(creds_path.read_text(encoding="utf-8"))
|
|
assert saved["access_token"] == "disk-check"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# resolve_qwen_runtime_credentials
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_resolve_qwen_runtime_credentials_fresh_token(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="fresh-at")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
|
|
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False)
|
|
assert creds["provider"] == "qwen-oauth"
|
|
assert creds["api_key"] == "fresh-at"
|
|
assert creds["base_url"] == DEFAULT_QWEN_BASE_URL
|
|
assert creds["source"] == "qwen-cli"
|
|
|
|
|
|
def test_resolve_qwen_runtime_credentials_triggers_refresh(qwen_env):
|
|
# Write an expired token
|
|
expired_ms = int((time.time() - 3600) * 1000)
|
|
tokens = _make_qwen_tokens(access_token="old", expiry_date=expired_ms)
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
|
|
refreshed = _make_qwen_tokens(access_token="refreshed-at")
|
|
|
|
with patch(
|
|
"hermes_cli.auth._refresh_qwen_cli_tokens", return_value=refreshed
|
|
) as mock_refresh:
|
|
creds = resolve_qwen_runtime_credentials()
|
|
mock_refresh.assert_called_once()
|
|
assert creds["api_key"] == "refreshed-at"
|
|
|
|
|
|
def test_resolve_qwen_runtime_credentials_force_refresh(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="old-at")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
|
|
refreshed = _make_qwen_tokens(access_token="force-refreshed")
|
|
|
|
with patch(
|
|
"hermes_cli.auth._refresh_qwen_cli_tokens", return_value=refreshed
|
|
) as mock_refresh:
|
|
creds = resolve_qwen_runtime_credentials(force_refresh=True)
|
|
mock_refresh.assert_called_once()
|
|
assert creds["api_key"] == "force-refreshed"
|
|
|
|
|
|
def test_resolve_qwen_runtime_credentials_missing_access_token(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
|
|
with pytest.raises(AuthError) as exc:
|
|
resolve_qwen_runtime_credentials(refresh_if_expiring=False)
|
|
assert exc.value.code == "qwen_access_token_missing"
|
|
|
|
|
|
def test_resolve_qwen_runtime_credentials_base_url_env_override(qwen_env, monkeypatch):
|
|
tokens = _make_qwen_tokens(access_token="at")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
monkeypatch.setenv("HERMES_QWEN_BASE_URL", "https://custom.qwen.ai/v1")
|
|
|
|
creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False)
|
|
assert creds["base_url"] == "https://custom.qwen.ai/v1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_qwen_auth_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_get_qwen_auth_status_logged_in(qwen_env):
|
|
tokens = _make_qwen_tokens(access_token="status-at")
|
|
_write_qwen_creds(qwen_env, tokens)
|
|
|
|
status = get_qwen_auth_status()
|
|
assert status["logged_in"] is True
|
|
assert status["api_key"] == "status-at"
|
|
|
|
|
|
def test_get_qwen_auth_status_not_logged_in(qwen_env):
|
|
# No credentials file
|
|
status = get_qwen_auth_status()
|
|
assert status["logged_in"] is False
|
|
assert "error" in status
|