Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
eab5635a7a fix: restore /usage account limits (#958)
All checks were successful
Lint / lint (pull_request) Successful in 9s
2026-04-22 10:36:49 -04:00
7 changed files with 757 additions and 518 deletions

326
agent/account_usage.py Normal file
View File

@@ -0,0 +1,326 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
import httpx
from agent.anthropic_adapter import _is_oauth_token, resolve_anthropic_token
from hermes_cli.auth import _read_codex_tokens, resolve_codex_runtime_credentials
from hermes_cli.runtime_provider import resolve_runtime_provider
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
@dataclass(frozen=True)
class AccountUsageWindow:
label: str
used_percent: Optional[float] = None
reset_at: Optional[datetime] = None
detail: Optional[str] = None
@dataclass(frozen=True)
class AccountUsageSnapshot:
provider: str
source: str
fetched_at: datetime
title: str = "Account limits"
plan: Optional[str] = None
windows: tuple[AccountUsageWindow, ...] = ()
details: tuple[str, ...] = ()
unavailable_reason: Optional[str] = None
@property
def available(self) -> bool:
return bool(self.windows or self.details) and not self.unavailable_reason
def _title_case_slug(value: Optional[str]) -> Optional[str]:
cleaned = str(value or "").strip()
if not cleaned:
return None
return cleaned.replace("_", " ").replace("-", " ").title()
def _parse_dt(value: Any) -> Optional[datetime]:
if value in (None, ""):
return None
if isinstance(value, (int, float)):
return datetime.fromtimestamp(float(value), tz=timezone.utc)
if isinstance(value, str):
text = value.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except ValueError:
return None
return None
def _format_reset(dt: Optional[datetime]) -> str:
if not dt:
return "unknown"
local_dt = dt.astimezone()
delta = dt - _utc_now()
total_seconds = int(delta.total_seconds())
if total_seconds <= 0:
return f"now ({local_dt.strftime('%Y-%m-%d %H:%M %Z')})"
hours, rem = divmod(total_seconds, 3600)
minutes = rem // 60
if hours >= 24:
days, hours = divmod(hours, 24)
rel = f"in {days}d {hours}h"
elif hours > 0:
rel = f"in {hours}h {minutes}m"
else:
rel = f"in {minutes}m"
return f"{rel} ({local_dt.strftime('%Y-%m-%d %H:%M %Z')})"
def render_account_usage_lines(snapshot: Optional[AccountUsageSnapshot], *, markdown: bool = False) -> list[str]:
if not snapshot:
return []
header = f"📈 {'**' if markdown else ''}{snapshot.title}{'**' if markdown else ''}"
lines = [header]
if snapshot.plan:
lines.append(f"Provider: {snapshot.provider} ({snapshot.plan})")
else:
lines.append(f"Provider: {snapshot.provider}")
for window in snapshot.windows:
if window.used_percent is None:
base = f"{window.label}: unavailable"
else:
remaining = max(0, round(100 - float(window.used_percent)))
used = max(0, round(float(window.used_percent)))
base = f"{window.label}: {remaining}% remaining ({used}% used)"
if window.reset_at:
base += f" • resets {_format_reset(window.reset_at)}"
elif window.detail:
base += f"{window.detail}"
lines.append(base)
for detail in snapshot.details:
lines.append(detail)
if snapshot.unavailable_reason:
lines.append(f"Unavailable: {snapshot.unavailable_reason}")
return lines
def _resolve_codex_usage_url(base_url: str) -> str:
normalized = (base_url or "").strip().rstrip("/")
if not normalized:
normalized = "https://chatgpt.com/backend-api/codex"
if normalized.endswith("/codex"):
normalized = normalized[: -len("/codex")]
if "/backend-api" in normalized:
return normalized + "/wham/usage"
return normalized + "/api/codex/usage"
def _fetch_codex_account_usage() -> Optional[AccountUsageSnapshot]:
creds = resolve_codex_runtime_credentials(refresh_if_expiring=True)
token_data = _read_codex_tokens()
tokens = token_data.get("tokens") or {}
account_id = str(tokens.get("account_id", "") or "").strip() or None
headers = {
"Authorization": f"Bearer {creds['api_key']}",
"Accept": "application/json",
"User-Agent": "codex-cli",
}
if account_id:
headers["ChatGPT-Account-Id"] = account_id
with httpx.Client(timeout=15.0) as client:
response = client.get(_resolve_codex_usage_url(creds.get("base_url", "")), headers=headers)
response.raise_for_status()
payload = response.json() or {}
rate_limit = payload.get("rate_limit") or {}
windows: list[AccountUsageWindow] = []
for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")):
window = rate_limit.get(key) or {}
used = window.get("used_percent")
if used is None:
continue
windows.append(
AccountUsageWindow(
label=label,
used_percent=float(used),
reset_at=_parse_dt(window.get("reset_at")),
)
)
details: list[str] = []
credits = payload.get("credits") or {}
if credits.get("has_credits"):
balance = credits.get("balance")
if isinstance(balance, (int, float)):
details.append(f"Credits balance: ${float(balance):.2f}")
elif credits.get("unlimited"):
details.append("Credits balance: unlimited")
return AccountUsageSnapshot(
provider="openai-codex",
source="usage_api",
fetched_at=_utc_now(),
plan=_title_case_slug(payload.get("plan_type")),
windows=tuple(windows),
details=tuple(details),
)
def _fetch_anthropic_account_usage() -> Optional[AccountUsageSnapshot]:
token = (resolve_anthropic_token() or "").strip()
if not token:
return None
if not _is_oauth_token(token):
return AccountUsageSnapshot(
provider="anthropic",
source="oauth_usage_api",
fetched_at=_utc_now(),
unavailable_reason="Anthropic account limits are only available for OAuth-backed Claude accounts.",
)
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
"anthropic-beta": "oauth-2025-04-20",
"User-Agent": "claude-code/2.1.0",
}
with httpx.Client(timeout=15.0) as client:
response = client.get("https://api.anthropic.com/api/oauth/usage", headers=headers)
response.raise_for_status()
payload = response.json() or {}
windows: list[AccountUsageWindow] = []
mapping = (
("five_hour", "Current session"),
("seven_day", "Current week"),
("seven_day_opus", "Opus week"),
("seven_day_sonnet", "Sonnet week"),
)
for key, label in mapping:
window = payload.get(key) or {}
util = window.get("utilization")
if util is None:
continue
used = float(util) * 100 if float(util) <= 1 else float(util)
windows.append(
AccountUsageWindow(
label=label,
used_percent=used,
reset_at=_parse_dt(window.get("resets_at")),
)
)
details: list[str] = []
extra = payload.get("extra_usage") or {}
if extra.get("is_enabled"):
used_credits = extra.get("used_credits")
monthly_limit = extra.get("monthly_limit")
currency = extra.get("currency") or "USD"
if isinstance(used_credits, (int, float)) and isinstance(monthly_limit, (int, float)):
details.append(
f"Extra usage: {used_credits:.2f} / {monthly_limit:.2f} {currency}"
)
return AccountUsageSnapshot(
provider="anthropic",
source="oauth_usage_api",
fetched_at=_utc_now(),
windows=tuple(windows),
details=tuple(details),
)
def _fetch_openrouter_account_usage(base_url: Optional[str], api_key: Optional[str]) -> Optional[AccountUsageSnapshot]:
runtime = resolve_runtime_provider(
requested="openrouter",
explicit_base_url=base_url,
explicit_api_key=api_key,
)
token = str(runtime.get("api_key", "") or "").strip()
if not token:
return None
normalized = str(runtime.get("base_url", "") or "").rstrip("/")
credits_url = f"{normalized}/credits"
key_url = f"{normalized}/key"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
with httpx.Client(timeout=10.0) as client:
credits_resp = client.get(credits_url, headers=headers)
credits_resp.raise_for_status()
credits = (credits_resp.json() or {}).get("data") or {}
try:
key_resp = client.get(key_url, headers=headers)
key_resp.raise_for_status()
key_data = (key_resp.json() or {}).get("data") or {}
except Exception:
key_data = {}
total_credits = float(credits.get("total_credits") or 0.0)
total_usage = float(credits.get("total_usage") or 0.0)
details = [f"Credits balance: ${max(0.0, total_credits - total_usage):.2f}"]
windows: list[AccountUsageWindow] = []
limit = key_data.get("limit")
limit_remaining = key_data.get("limit_remaining")
limit_reset = str(key_data.get("limit_reset") or "").strip()
usage = key_data.get("usage")
if (
isinstance(limit, (int, float))
and float(limit) > 0
and isinstance(limit_remaining, (int, float))
and 0 <= float(limit_remaining) <= float(limit)
):
limit_value = float(limit)
remaining_value = float(limit_remaining)
used_percent = ((limit_value - remaining_value) / limit_value) * 100
detail_parts = [f"${remaining_value:.2f} of ${limit_value:.2f} remaining"]
if limit_reset:
detail_parts.append(f"resets {limit_reset}")
windows.append(
AccountUsageWindow(
label="API key quota",
used_percent=used_percent,
detail="".join(detail_parts),
)
)
if isinstance(usage, (int, float)):
usage_parts = [f"API key usage: ${float(usage):.2f} total"]
for value, label in (
(key_data.get("usage_daily"), "today"),
(key_data.get("usage_weekly"), "this week"),
(key_data.get("usage_monthly"), "this month"),
):
if isinstance(value, (int, float)) and float(value) > 0:
usage_parts.append(f"${float(value):.2f} {label}")
details.append("".join(usage_parts))
return AccountUsageSnapshot(
provider="openrouter",
source="credits_api",
fetched_at=_utc_now(),
windows=tuple(windows),
details=tuple(details),
)
def fetch_account_usage(
provider: Optional[str],
*,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
) -> Optional[AccountUsageSnapshot]:
normalized = str(provider or "").strip().lower()
if normalized in {"", "auto", "custom"}:
return None
try:
if normalized == "openai-codex":
return _fetch_codex_account_usage()
if normalized == "anthropic":
return _fetch_anthropic_account_usage()
if normalized == "openrouter":
return _fetch_openrouter_account_usage(base_url, api_key)
except Exception:
return None
return None

View File

@@ -1,70 +1,43 @@
from __future__ import annotations
"""
A2A agent card generation for fleet discovery.
Agent Card — A2A-compliant agent discovery.
Part of #843: fix: implement A2A agent card for fleet discovery (#819)
Refs #801.
Closes #802.
Provides metadata about the agent's identity, capabilities, and installed skills
for discovery by other agents in the fleet.
"""
import argparse
import json
import logging
import os
import socket
import sys
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Sequence
from urllib.parse import urlparse, urlunparse
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_cli import __version__
from hermes_cli.config import load_config
from hermes_cli.config import load_config, get_hermes_home
from agent.skill_utils import (
get_all_skills_dirs,
get_disabled_skill_names,
iter_skill_index_files,
parse_frontmatter,
skill_matches_platform,
get_all_skills_dirs,
get_disabled_skill_names,
skill_matches_platform
)
logger = logging.getLogger(__name__)
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
DEFAULT_INPUT_MODES = ["text/plain", "application/json"]
DEFAULT_OUTPUT_MODES = ["text/plain", "application/json"]
_REQUIRED_CAPABILITY_FLAGS = (
"streaming",
"pushNotifications",
"stateTransitionHistory",
)
@dataclass
class AgentSkill:
id: str
name: str
description: str = ""
tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"id": self.id, "name": self.name}
if self.description:
data["description"] = self.description
if self.tags:
data["tags"] = self.tags
return data
version: str = "1.0.0"
@dataclass
class AgentCapabilities:
streaming: bool = True
pushNotifications: bool = False
stateTransitionHistory: bool = True
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
tools: bool = True
vision: bool = False
reasoning: bool = False
@dataclass
class AgentCard:
@@ -74,81 +47,14 @@ class AgentCard:
version: str = __version__
capabilities: AgentCapabilities = field(default_factory=AgentCapabilities)
skills: List[AgentSkill] = field(default_factory=list)
defaultInputModes: List[str] = field(default_factory=lambda: list(DEFAULT_INPUT_MODES))
defaultOutputModes: List[str] = field(default_factory=lambda: list(DEFAULT_OUTPUT_MODES))
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"name": self.name,
"description": self.description,
"url": self.url,
"version": self.version,
"capabilities": self.capabilities.to_dict(),
"skills": [skill.to_dict() for skill in self.skills],
"defaultInputModes": list(self.defaultInputModes),
"defaultOutputModes": list(self.defaultOutputModes),
}
if self.metadata:
data["metadata"] = dict(self.metadata)
return data
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
def _env_or_empty(key: str) -> str:
return os.environ.get(key, "").strip()
def _as_agent_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
agent_cfg = config.get("agent")
return dict(agent_cfg) if isinstance(agent_cfg, Mapping) else {}
def _as_a2a_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
a2a_cfg = config.get("a2a")
return dict(a2a_cfg) if isinstance(a2a_cfg, Mapping) else {}
def _normalize_string_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, str):
parts = value.split(",")
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
parts = list(value)
else:
parts = [value]
out: List[str] = []
seen = set()
for item in parts:
text = str(item).strip()
if not text or text in seen:
continue
seen.add(text)
out.append(text)
return out
def _normalize_skill_tags(frontmatter: Mapping[str, Any]) -> List[str]:
tags = _normalize_string_list(frontmatter.get("tags"))
category = str(frontmatter.get("category") or "").strip()
if category and category not in tags:
tags.append(category)
return tags
defaultInputModes: List[str] = field(default_factory=lambda: ["text/plain"])
defaultOutputModes: List[str] = field(default_factory=lambda: ["text/plain"])
def _load_skills() -> List[AgentSkill]:
"""Scan enabled skills and return A2A skill metadata."""
skills: List[AgentSkill] = []
"""Scan all enabled skills and return metadata."""
skills = []
disabled = get_disabled_skill_names()
seen_ids = set()
for skills_dir in get_all_skills_dirs():
if not skills_dir.is_dir():
continue
@@ -159,262 +65,71 @@ def _load_skills() -> List[AgentSkill]:
except Exception:
continue
skill_name = frontmatter.get("name") or skill_file.parent.name
if str(skill_name) in disabled:
continue
if not skill_matches_platform(frontmatter):
continue
skill_id = str(frontmatter.get("name") or skill_file.parent.name).strip().lower().replace(" ", "-")
if skill_id in disabled or skill_id in seen_ids:
continue
seen_ids.add(skill_id)
skills.append(AgentSkill(
id=str(skill_name),
name=str(frontmatter.get("name", skill_name)),
description=str(frontmatter.get("description", "")),
version=str(frontmatter.get("version", "1.0.0"))
))
return skills
display_name = str(frontmatter.get("title") or frontmatter.get("name") or skill_file.parent.name).strip()
description = str(frontmatter.get("description") or "").strip()
tags = _normalize_skill_tags(frontmatter)
skills.append(
AgentSkill(
id=skill_id,
name=display_name,
description=description,
tags=tags,
)
)
def build_agent_card() -> AgentCard:
"""Build the agent card from current configuration and environment."""
config = load_config()
# Identity
name = os.environ.get("HERMES_AGENT_NAME") or config.get("agent", {}).get("name") or "hermes"
description = os.environ.get("HERMES_AGENT_DESCRIPTION") or config.get("agent", {}).get("description") or "Sovereign AI agent"
# URL - try to determine from environment or config
port = os.environ.get("HERMES_WEB_PORT") or "9119"
host = os.environ.get("HERMES_WEB_HOST") or "localhost"
url = f"http://{host}:{port}"
# Capabilities
# In a real scenario, we'd check model metadata for vision/reasoning
capabilities = AgentCapabilities(
streaming=True,
tools=True,
vision=False, # Default to false unless we can confirm
reasoning=False
)
# Skills
skills = _load_skills()
return AgentCard(
name=name,
description=description,
url=url,
version=__version__,
capabilities=capabilities,
skills=skills
)
return sorted(skills, key=lambda skill: skill.id)
def _get_agent_name(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_name = _env_or_empty("HERMES_AGENT_NAME") or _env_or_empty("AGENT_NAME")
if env_name:
return env_name
agent_cfg = _as_agent_config(config)
if agent_cfg.get("name"):
return str(agent_cfg["name"]).strip()
def get_agent_card_json() -> str:
"""Return the agent card as a JSON string."""
try:
hostname = socket.gethostname().split(".", 1)[0].strip()
if hostname:
return hostname
except Exception:
pass
return "hermes"
def _get_description(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_description = _env_or_empty("HERMES_AGENT_DESCRIPTION") or _env_or_empty("AGENT_DESCRIPTION")
if env_description:
return env_description
agent_cfg = _as_agent_config(config)
if agent_cfg.get("description"):
return str(agent_cfg["description"]).strip()
return DEFAULT_DESCRIPTION
def _normalize_a2a_url(url: str) -> str:
raw = (url or "").strip()
if not raw:
return ""
parsed = urlparse(raw if "://" in raw else f"https://{raw}")
scheme = parsed.scheme or "https"
netloc = parsed.netloc or parsed.path
path = parsed.path if parsed.netloc else ""
normalized_path = path.rstrip("/") if path not in ("", "/") else ""
if not normalized_path.endswith("/a2a"):
normalized_path = f"{normalized_path}/a2a" if normalized_path else "/a2a"
return urlunparse((scheme, netloc, normalized_path, "", "", ""))
def _get_agent_url(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return _normalize_a2a_url(override)
agent_cfg = _as_agent_config(config)
a2a_cfg = _as_a2a_config(config)
explicit = (
_env_or_empty("HERMES_A2A_PUBLIC_URL")
or str(a2a_cfg.get("public_url") or "").strip()
or str(agent_cfg.get("a2a_public_url") or "").strip()
)
if explicit:
return _normalize_a2a_url(explicit)
host = (
_env_or_empty("HERMES_A2A_HOST")
or str(a2a_cfg.get("host") or "").strip()
or _env_or_empty("HERMES_WEB_HOST")
or str(agent_cfg.get("host") or "").strip()
or "localhost"
)
port = (
_env_or_empty("HERMES_A2A_PORT")
or str(a2a_cfg.get("port") or "").strip()
or _env_or_empty("HERMES_WEB_PORT")
or str(agent_cfg.get("port") or "").strip()
or "9119"
)
scheme = (
_env_or_empty("HERMES_A2A_SCHEME")
or str(a2a_cfg.get("scheme") or "").strip()
or ("https" if (_env_or_empty("HERMES_MTLS_CERT") or str(port) == "9443") else "http")
)
return _normalize_a2a_url(f"{scheme}://{host}:{port}")
def _merge_skills(base_skills: Iterable[AgentSkill], extra_skills: Iterable[AgentSkill] | None = None) -> List[AgentSkill]:
merged: Dict[str, AgentSkill] = {}
for skill in list(base_skills) + list(extra_skills or []):
if skill.id not in merged:
merged[skill.id] = skill
return [merged[key] for key in sorted(merged)]
def build_agent_card(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
extra_skills: Iterable[AgentSkill] | None = None,
metadata: Mapping[str, Any] | None = None,
) -> AgentCard:
"""Build an A2A-compliant agent card from config, env, and installed skills."""
try:
config = load_config()
except Exception as exc:
logger.debug("Falling back to empty config while building agent card: %s", exc)
config = {}
card = AgentCard(
name=_get_agent_name(config, override=name),
description=_get_description(config, override=description),
url=_get_agent_url(config, override=url),
skills=_merge_skills(_load_skills(), extra_skills),
metadata=dict(metadata or {}),
)
return card
def validate_agent_card(card: AgentCard | Dict[str, Any]) -> List[str]:
"""Return a list of schema-validation errors for an agent card."""
data = card.to_dict() if isinstance(card, AgentCard) else dict(card)
errors: List[str] = []
for field_name in ("name", "description", "url", "version"):
value = data.get(field_name)
if not isinstance(value, str) or not value.strip():
errors.append(f"{field_name} must be a non-empty string")
url_value = str(data.get("url") or "")
parsed = urlparse(url_value)
if not parsed.scheme or not parsed.netloc:
errors.append("url must be an absolute http/https URL")
elif parsed.scheme not in {"http", "https"}:
errors.append("url must use http or https")
elif not parsed.path.rstrip("/").endswith("/a2a"):
errors.append("url must point to the /a2a endpoint")
capabilities = data.get("capabilities")
if not isinstance(capabilities, Mapping):
errors.append("capabilities must be an object")
else:
for capability_name in _REQUIRED_CAPABILITY_FLAGS:
if not isinstance(capabilities.get(capability_name), bool):
errors.append(f"capabilities.{capability_name} must be a boolean")
for field_name, required_modes in (
("defaultInputModes", DEFAULT_INPUT_MODES),
("defaultOutputModes", DEFAULT_OUTPUT_MODES),
):
modes = data.get(field_name)
if not isinstance(modes, list) or not modes:
errors.append(f"{field_name} must be a non-empty list of MIME types")
continue
for mode in modes:
if not isinstance(mode, str) or "/" not in mode:
errors.append(f"{field_name} entries must be MIME types")
for required_mode in required_modes:
if required_mode not in modes:
errors.append(f"{field_name} must include {required_mode}")
skills = data.get("skills")
if not isinstance(skills, list):
errors.append("skills must be a list")
else:
for index, skill in enumerate(skills):
if not isinstance(skill, Mapping):
errors.append(f"skills[{index}] must be an object")
continue
if not str(skill.get("id") or "").strip():
errors.append(f"skills[{index}] missing id")
if not str(skill.get("name") or "").strip():
errors.append(f"skills[{index}] missing name")
tags = skill.get("tags", [])
if tags is None:
tags = []
if not isinstance(tags, list):
errors.append(f"skills[{index}].tags must be a list")
else:
for tag in tags:
if not isinstance(tag, str) or not tag.strip():
errors.append(f"skills[{index}].tags entries must be non-empty strings")
metadata = data.get("metadata")
if metadata is not None and not isinstance(metadata, Mapping):
errors.append("metadata must be an object when present")
return errors
def get_agent_card_json(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
metadata: Mapping[str, Any] | None = None,
indent: int = 2,
) -> str:
"""Return the local agent card as JSON, falling back to an error card on failure."""
try:
card = build_agent_card(name=name, description=description, url=url, metadata=metadata)
errors = validate_agent_card(card)
if errors:
raise ValueError("; ".join(errors))
return card.to_json(indent=indent)
except Exception as exc:
logger.error("Failed to build agent card: %s", exc)
card = build_agent_card()
return json.dumps(asdict(card), indent=2)
except Exception as e:
logger.error(f"Failed to build agent card: {e}")
# Minimal fallback card
fallback = {
"name": name or _env_or_empty("HERMES_AGENT_NAME") or "hermes",
"description": "Sovereign AI agent (agent card fallback)",
"url": url or "http://localhost:9119/a2a",
"name": "hermes",
"description": "Sovereign AI agent (fallback)",
"version": __version__,
"capabilities": AgentCapabilities().to_dict(),
"skills": [],
"defaultInputModes": list(DEFAULT_INPUT_MODES),
"defaultOutputModes": list(DEFAULT_OUTPUT_MODES),
"error": str(exc),
"error": str(e)
}
return json.dumps(fallback, indent=indent)
return json.dumps(fallback, indent=2)
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Generate an A2A-compliant Hermes agent card")
parser.add_argument("--name", help="Override the agent name")
parser.add_argument("--description", help="Override the agent description")
parser.add_argument("--url", help="Override the public A2A URL")
parser.add_argument("--validate", action="store_true", help="Validate before printing; exit 1 on schema errors")
args = parser.parse_args(list(argv) if argv is not None else None)
card = build_agent_card(name=args.name, description=args.description, url=args.url)
errors = validate_agent_card(card)
if args.validate and errors:
for error in errors:
print(error, file=sys.stderr)
return 1
print(card.to_json(indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
def validate_agent_card(card_data: Dict[str, Any]) -> bool:
"""Check if the card data complies with the A2A schema."""
required = ["name", "description", "url", "version"]
return all(k in card_data for k in required)

25
cli.py
View File

@@ -13,6 +13,7 @@ Usage:
python cli.py --list-tools # List available tools and exit
"""
import concurrent.futures
import logging
import os
import shutil
@@ -63,6 +64,7 @@ from agent.usage_pricing import (
format_duration_compact,
format_token_count_compact,
)
from agent.account_usage import fetch_account_usage, render_account_usage_lines
from hermes_cli.banner import _format_context_length, format_banner_version_label
_COMMAND_SPINNER_FRAMES = ("", "", "", "", "", "", "", "", "", "")
@@ -6471,6 +6473,29 @@ class HermesCLI:
if cost_result.status == "unknown":
print(f" Note: Pricing unknown for {agent.model}")
# Account limits -- fetched off-thread with a hard timeout so slow
# provider APIs don't hang the prompt.
provider = getattr(agent, "provider", None) or getattr(self, "provider", None)
base_url = getattr(agent, "base_url", None) or getattr(self, "base_url", None)
api_key = getattr(agent, "api_key", None) or getattr(self, "api_key", None)
account_snapshot = None
if provider:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _pool:
try:
account_snapshot = _pool.submit(
fetch_account_usage,
provider,
base_url=base_url,
api_key=api_key,
).result(timeout=10.0)
except (concurrent.futures.TimeoutError, Exception):
account_snapshot = None
account_lines = [f" {line}" for line in render_account_usage_lines(account_snapshot)]
if account_lines:
print()
for line in account_lines:
print(line)
if self.verbose:
logging.getLogger().setLevel(logging.DEBUG)
for noisy in ('openai', 'openai._base_client', 'httpx', 'httpcore', 'asyncio', 'hpack', 'grpc', 'modal'):

View File

@@ -28,6 +28,8 @@ from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
from agent.account_usage import fetch_account_usage, render_account_usage_lines
# ---------------------------------------------------------------------------
# SSL certificate auto-detection for NixOS and other non-standard systems.
# Must run BEFORE any HTTP library (discord, aiohttp, etc.) is imported.
@@ -6481,6 +6483,38 @@ class GatewayRunner:
if cached:
agent = cached[0]
# Resolve provider/base_url/api_key for the account-usage fetch.
# Prefer the live agent; fall back to persisted billing data on the
# SessionDB row so `/usage` still returns account info between turns
# when no agent is resident.
provider = getattr(agent, "provider", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
base_url = getattr(agent, "base_url", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
api_key = getattr(agent, "api_key", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
if not provider and getattr(self, "_session_db", None) is not None:
try:
_entry_for_billing = self.session_store.get_or_create_session(source)
persisted = self._session_db.get_session(_entry_for_billing.session_id) or {}
except Exception:
persisted = {}
provider = provider or persisted.get("billing_provider")
base_url = base_url or persisted.get("billing_base_url")
# Fetch account usage off the event loop so slow provider APIs don't
# block the gateway. Failures are non-fatal -- account_lines stays [].
account_lines: list[str] = []
if provider:
try:
account_snapshot = await asyncio.to_thread(
fetch_account_usage,
provider,
base_url=base_url,
api_key=api_key,
)
except Exception:
account_snapshot = None
if account_snapshot:
account_lines = render_account_usage_lines(account_snapshot, markdown=True)
if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0:
lines = []
@@ -6538,6 +6572,10 @@ class GatewayRunner:
if ctx.compression_count:
lines.append(f"Compressions: {ctx.compression_count}")
if account_lines:
lines.append("")
lines.extend(account_lines)
return "\n".join(lines)
# No agent at all -- check session history for a rough count
@@ -6547,12 +6585,18 @@ class GatewayRunner:
from agent.model_metadata import estimate_messages_tokens_rough
msgs = [m for m in history if m.get("role") in ("user", "assistant") and m.get("content")]
approx = estimate_messages_tokens_rough(msgs)
return (
f"📊 **Session Info**\n"
f"Messages: {len(msgs)}\n"
f"Estimated context: ~{approx:,} tokens\n"
f"_(Detailed usage available after the first agent response)_"
)
lines = [
"📊 **Session Info**",
f"Messages: {len(msgs)}",
f"Estimated context: ~{approx:,} tokens",
"_(Detailed usage available after the first agent response)_",
]
if account_lines:
lines.append("")
lines.extend(account_lines)
return "\n".join(lines)
if account_lines:
return "\n".join(account_lines)
return "No usage data available for this session."
async def _handle_insights_command(self, event: MessageEvent) -> str:

View File

@@ -1,150 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from agent import agent_card as mod
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
def _set_base_context(monkeypatch, *, name: str = "Timmy", description: str = DEFAULT_DESCRIPTION, url: str = "https://timmy.local:9443/a2a", skills=None):
monkeypatch.setattr(mod, "load_config", lambda: {"agent": {"name": name, "description": description}})
monkeypatch.setattr(
mod,
"_load_skills",
lambda: list(
skills
if skills is not None
else [
mod.AgentSkill(
id="code",
name="Code Implementation",
description="Implement and patch code",
tags=["python", "gitea"],
)
]
),
)
monkeypatch.setenv("HERMES_A2A_PUBLIC_URL", url)
monkeypatch.delenv("HERMES_AGENT_NAME", raising=False)
monkeypatch.delenv("AGENT_NAME", raising=False)
monkeypatch.delenv("HERMES_AGENT_DESCRIPTION", raising=False)
monkeypatch.delenv("AGENT_DESCRIPTION", raising=False)
def test_build_agent_card_matches_issue_802_schema(monkeypatch):
_set_base_context(monkeypatch)
card = mod.build_agent_card()
payload = card.to_dict()
assert payload["name"] == "Timmy"
assert payload["description"] == DEFAULT_DESCRIPTION
assert payload["url"] == "https://timmy.local:9443/a2a"
assert payload["capabilities"] == {
"streaming": True,
"pushNotifications": False,
"stateTransitionHistory": True,
}
assert payload["defaultInputModes"] == ["text/plain", "application/json"]
assert payload["defaultOutputModes"] == ["text/plain", "application/json"]
assert payload["skills"][0]["tags"] == ["python", "gitea"]
assert mod.validate_agent_card(payload) == []
@pytest.mark.parametrize(
("name", "url"),
[
("Timmy", "https://timmy.local:9443/a2a"),
("Allegro", "https://allegro.local:9443/a2a"),
("Ezra", "https://ezra.local:9443/a2a"),
],
)
def test_build_agent_card_supports_fleet_members(monkeypatch, name, url):
_set_base_context(monkeypatch, name=name, url=url, skills=[])
payload = mod.build_agent_card().to_dict()
assert payload["name"] == name
assert payload["url"] == url
assert mod.validate_agent_card(payload) == []
def test_load_skills_collects_tags_and_category(monkeypatch, tmp_path):
skill_root = tmp_path / "skills"
skill_dir = skill_root / "code-implementation"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"""---
name: Code Implementation
description: Implement and patch code
tags: [python, gitea]
category: discovery
---
# Code Implementation
""",
encoding="utf-8",
)
monkeypatch.setattr(mod, "get_all_skills_dirs", lambda: [skill_root])
monkeypatch.setattr(mod, "get_disabled_skill_names", lambda: set())
monkeypatch.setattr(mod, "skill_matches_platform", lambda _frontmatter: True)
skills = mod._load_skills()
assert len(skills) == 1
assert skills[0].id == "code-implementation"
assert skills[0].name == "Code Implementation"
assert skills[0].description == "Implement and patch code"
assert skills[0].tags == ["python", "gitea", "discovery"]
def test_validate_agent_card_reports_schema_errors():
errors = mod.validate_agent_card(
{
"name": "",
"description": "",
"url": "timmy.local",
"version": "",
"capabilities": {"streaming": True},
"skills": [{"id": "", "name": "", "tags": "python"}],
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["plain"],
"metadata": [],
}
)
assert any("name must be a non-empty string" in error for error in errors)
assert any("url must be an absolute http/https URL" in error for error in errors)
assert any("capabilities.pushNotifications" in error for error in errors)
assert any("skills[0] missing id" in error for error in errors)
assert any("skills[0].tags must be a list" in error for error in errors)
assert any("defaultInputModes must include application/json" in error for error in errors)
assert any("defaultOutputModes entries must be MIME types" in error for error in errors)
assert any("metadata must be an object" in error for error in errors)
def test_get_agent_card_json_emits_valid_json(monkeypatch):
_set_base_context(monkeypatch)
payload = json.loads(mod.get_agent_card_json())
assert payload["name"] == "Timmy"
assert mod.validate_agent_card(payload) == []
def test_main_validate_prints_card(monkeypatch, capsys):
_set_base_context(monkeypatch)
exit_code = mod.main(["--validate"])
captured = capsys.readouterr()
assert exit_code == 0
payload = json.loads(captured.out)
assert payload["url"] == "https://timmy.local:9443/a2a"
assert captured.err == ""

View File

@@ -175,3 +175,79 @@ class TestUsageCachedAgent:
result = await runner._handle_usage_command(event)
assert "Cost: included" in result
class TestUsageAccountSection:
"""Account-limits section appended to /usage output."""
@pytest.mark.asyncio
async def test_usage_command_includes_account_section(self, monkeypatch):
agent = _make_mock_agent(provider="openai-codex")
agent.base_url = "https://chatgpt.com/backend-api/codex"
agent.api_key = "unused"
runner = _make_runner(SK, cached_agent=agent)
event = MagicMock()
monkeypatch.setattr(
"gateway.run.fetch_account_usage",
lambda provider, base_url=None, api_key=None: object(),
)
monkeypatch.setattr(
"gateway.run.render_account_usage_lines",
lambda snapshot, markdown=False: [
"📈 **Account limits**",
"Provider: openai-codex (Pro)",
"Session: 85% remaining (15% used)",
],
)
with patch("agent.rate_limit_tracker.format_rate_limit_compact", return_value="RPM: 50/60"), \
patch("agent.usage_pricing.estimate_usage_cost") as mock_cost:
mock_cost.return_value = MagicMock(amount_usd=None, status="included")
result = await runner._handle_usage_command(event)
assert "📊 **Session Token Usage**" in result
assert "📈 **Account limits**" in result
assert "Provider: openai-codex (Pro)" in result
@pytest.mark.asyncio
async def test_usage_command_uses_persisted_provider_when_agent_not_running(self, monkeypatch):
runner = _make_runner(SK)
runner._session_db = MagicMock()
runner._session_db.get_session.return_value = {
"billing_provider": "openai-codex",
"billing_base_url": "https://chatgpt.com/backend-api/codex",
}
session_entry = MagicMock()
session_entry.session_id = "sess-1"
runner.session_store.get_or_create_session.return_value = session_entry
runner.session_store.load_transcript.return_value = [
{"role": "user", "content": "earlier"},
]
calls = {}
async def _fake_to_thread(fn, *args, **kwargs):
calls["args"] = args
calls["kwargs"] = kwargs
return fn(*args, **kwargs)
monkeypatch.setattr("gateway.run.asyncio.to_thread", _fake_to_thread)
monkeypatch.setattr(
"gateway.run.fetch_account_usage",
lambda provider, base_url=None, api_key=None: object(),
)
monkeypatch.setattr(
"gateway.run.render_account_usage_lines",
lambda snapshot, markdown=False: [
"📈 **Account limits**",
"Provider: openai-codex (Pro)",
],
)
event = MagicMock()
result = await runner._handle_usage_command(event)
assert calls["args"] == ("openai-codex",)
assert calls["kwargs"]["base_url"] == "https://chatgpt.com/backend-api/codex"
assert "📊 **Session Info**" in result
assert "📈 **Account limits**" in result

203
tests/test_account_usage.py Normal file
View File

@@ -0,0 +1,203 @@
from datetime import datetime, timezone
from agent.account_usage import (
AccountUsageSnapshot,
AccountUsageWindow,
fetch_account_usage,
render_account_usage_lines,
)
class _Response:
def __init__(self, payload, status_code=200):
self._payload = payload
self.status_code = status_code
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def json(self):
return self._payload
class _Client:
def __init__(self, payload):
self._payload = payload
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get(self, url, headers=None):
return _Response(self._payload)
class _RoutingClient:
def __init__(self, payloads):
self._payloads = payloads
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get(self, url, headers=None):
return _Response(self._payloads[url])
def test_fetch_account_usage_codex(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_codex_runtime_credentials",
lambda refresh_if_expiring=True: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage._read_codex_tokens",
lambda: {"tokens": {"account_id": "acct_123"}},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=15.0: _Client(
{
"plan_type": "pro",
"rate_limit": {
"primary_window": {
"used_percent": 15,
"reset_at": 1_900_000_000,
"limit_window_seconds": 18000,
},
"secondary_window": {
"used_percent": 40,
"reset_at": 1_900_500_000,
"limit_window_seconds": 604800,
},
},
"credits": {"has_credits": True, "balance": 12.5},
}
),
)
snapshot = fetch_account_usage("openai-codex")
assert snapshot is not None
assert snapshot.plan == "Pro"
assert len(snapshot.windows) == 2
assert snapshot.windows[0].label == "Session"
assert snapshot.windows[0].used_percent == 15.0
assert snapshot.windows[0].reset_at == datetime.fromtimestamp(1_900_000_000, tz=timezone.utc)
assert "Credits balance: $12.50" in snapshot.details
def test_render_account_usage_lines_includes_reset_and_provider():
snapshot = AccountUsageSnapshot(
provider="openai-codex",
source="usage_api",
fetched_at=datetime.now(timezone.utc),
plan="Pro",
windows=(
AccountUsageWindow(
label="Session",
used_percent=25,
reset_at=datetime.now(timezone.utc),
),
),
details=("Credits balance: $9.99",),
)
lines = render_account_usage_lines(snapshot)
assert lines[0] == "📈 Account limits"
assert "openai-codex (Pro)" in lines[1]
assert "Session: 75% remaining (25% used)" in lines[2]
assert "Credits balance: $9.99" in lines[3]
def test_fetch_account_usage_openrouter_uses_limit_remaining_and_ignores_deprecated_rate_limit(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_runtime_provider",
lambda requested, explicit_base_url=None, explicit_api_key=None: {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=10.0: _RoutingClient(
{
"https://openrouter.ai/api/v1/credits": {
"data": {"total_credits": 300.0, "total_usage": 10.92}
},
"https://openrouter.ai/api/v1/key": {
"data": {
"limit": 100.0,
"limit_remaining": 70.0,
"limit_reset": "monthly",
"usage": 12.5,
"usage_daily": 0.5,
"usage_weekly": 2.0,
"usage_monthly": 8.0,
"rate_limit": {"requests": -1, "interval": "10s"},
}
},
}
),
)
snapshot = fetch_account_usage("openrouter")
assert snapshot is not None
assert snapshot.windows == (
AccountUsageWindow(
label="API key quota",
used_percent=30.0,
detail="$70.00 of $100.00 remaining • resets monthly",
),
)
assert "Credits balance: $289.08" in snapshot.details
assert "API key usage: $12.50 total • $0.50 today • $2.00 this week • $8.00 this month" in snapshot.details
assert all("-1 requests / 10s" not in line for line in render_account_usage_lines(snapshot))
def test_fetch_account_usage_openrouter_omits_quota_window_when_key_has_no_limit(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_runtime_provider",
lambda requested, explicit_base_url=None, explicit_api_key=None: {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=10.0: _RoutingClient(
{
"https://openrouter.ai/api/v1/credits": {
"data": {"total_credits": 100.0, "total_usage": 25.5}
},
"https://openrouter.ai/api/v1/key": {
"data": {
"limit": None,
"limit_remaining": None,
"usage": 25.5,
"usage_daily": 1.25,
"usage_weekly": 4.5,
"usage_monthly": 18.0,
}
},
}
),
)
snapshot = fetch_account_usage("openrouter")
assert snapshot is not None
assert snapshot.windows == ()
assert "Credits balance: $74.50" in snapshot.details
assert "API key usage: $25.50 total • $1.25 today • $4.50 this week • $18.00 this month" in snapshot.details