Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
eab5635a7a fix: restore /usage account limits (#958)
All checks were successful
Lint / lint (pull_request) Successful in 9s
2026-04-22 10:36:49 -04:00
13 changed files with 681 additions and 492 deletions

326
agent/account_usage.py Normal file
View File

@@ -0,0 +1,326 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional
import httpx
from agent.anthropic_adapter import _is_oauth_token, resolve_anthropic_token
from hermes_cli.auth import _read_codex_tokens, resolve_codex_runtime_credentials
from hermes_cli.runtime_provider import resolve_runtime_provider
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
@dataclass(frozen=True)
class AccountUsageWindow:
label: str
used_percent: Optional[float] = None
reset_at: Optional[datetime] = None
detail: Optional[str] = None
@dataclass(frozen=True)
class AccountUsageSnapshot:
provider: str
source: str
fetched_at: datetime
title: str = "Account limits"
plan: Optional[str] = None
windows: tuple[AccountUsageWindow, ...] = ()
details: tuple[str, ...] = ()
unavailable_reason: Optional[str] = None
@property
def available(self) -> bool:
return bool(self.windows or self.details) and not self.unavailable_reason
def _title_case_slug(value: Optional[str]) -> Optional[str]:
cleaned = str(value or "").strip()
if not cleaned:
return None
return cleaned.replace("_", " ").replace("-", " ").title()
def _parse_dt(value: Any) -> Optional[datetime]:
if value in (None, ""):
return None
if isinstance(value, (int, float)):
return datetime.fromtimestamp(float(value), tz=timezone.utc)
if isinstance(value, str):
text = value.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except ValueError:
return None
return None
def _format_reset(dt: Optional[datetime]) -> str:
if not dt:
return "unknown"
local_dt = dt.astimezone()
delta = dt - _utc_now()
total_seconds = int(delta.total_seconds())
if total_seconds <= 0:
return f"now ({local_dt.strftime('%Y-%m-%d %H:%M %Z')})"
hours, rem = divmod(total_seconds, 3600)
minutes = rem // 60
if hours >= 24:
days, hours = divmod(hours, 24)
rel = f"in {days}d {hours}h"
elif hours > 0:
rel = f"in {hours}h {minutes}m"
else:
rel = f"in {minutes}m"
return f"{rel} ({local_dt.strftime('%Y-%m-%d %H:%M %Z')})"
def render_account_usage_lines(snapshot: Optional[AccountUsageSnapshot], *, markdown: bool = False) -> list[str]:
if not snapshot:
return []
header = f"📈 {'**' if markdown else ''}{snapshot.title}{'**' if markdown else ''}"
lines = [header]
if snapshot.plan:
lines.append(f"Provider: {snapshot.provider} ({snapshot.plan})")
else:
lines.append(f"Provider: {snapshot.provider}")
for window in snapshot.windows:
if window.used_percent is None:
base = f"{window.label}: unavailable"
else:
remaining = max(0, round(100 - float(window.used_percent)))
used = max(0, round(float(window.used_percent)))
base = f"{window.label}: {remaining}% remaining ({used}% used)"
if window.reset_at:
base += f" • resets {_format_reset(window.reset_at)}"
elif window.detail:
base += f"{window.detail}"
lines.append(base)
for detail in snapshot.details:
lines.append(detail)
if snapshot.unavailable_reason:
lines.append(f"Unavailable: {snapshot.unavailable_reason}")
return lines
def _resolve_codex_usage_url(base_url: str) -> str:
normalized = (base_url or "").strip().rstrip("/")
if not normalized:
normalized = "https://chatgpt.com/backend-api/codex"
if normalized.endswith("/codex"):
normalized = normalized[: -len("/codex")]
if "/backend-api" in normalized:
return normalized + "/wham/usage"
return normalized + "/api/codex/usage"
def _fetch_codex_account_usage() -> Optional[AccountUsageSnapshot]:
creds = resolve_codex_runtime_credentials(refresh_if_expiring=True)
token_data = _read_codex_tokens()
tokens = token_data.get("tokens") or {}
account_id = str(tokens.get("account_id", "") or "").strip() or None
headers = {
"Authorization": f"Bearer {creds['api_key']}",
"Accept": "application/json",
"User-Agent": "codex-cli",
}
if account_id:
headers["ChatGPT-Account-Id"] = account_id
with httpx.Client(timeout=15.0) as client:
response = client.get(_resolve_codex_usage_url(creds.get("base_url", "")), headers=headers)
response.raise_for_status()
payload = response.json() or {}
rate_limit = payload.get("rate_limit") or {}
windows: list[AccountUsageWindow] = []
for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")):
window = rate_limit.get(key) or {}
used = window.get("used_percent")
if used is None:
continue
windows.append(
AccountUsageWindow(
label=label,
used_percent=float(used),
reset_at=_parse_dt(window.get("reset_at")),
)
)
details: list[str] = []
credits = payload.get("credits") or {}
if credits.get("has_credits"):
balance = credits.get("balance")
if isinstance(balance, (int, float)):
details.append(f"Credits balance: ${float(balance):.2f}")
elif credits.get("unlimited"):
details.append("Credits balance: unlimited")
return AccountUsageSnapshot(
provider="openai-codex",
source="usage_api",
fetched_at=_utc_now(),
plan=_title_case_slug(payload.get("plan_type")),
windows=tuple(windows),
details=tuple(details),
)
def _fetch_anthropic_account_usage() -> Optional[AccountUsageSnapshot]:
token = (resolve_anthropic_token() or "").strip()
if not token:
return None
if not _is_oauth_token(token):
return AccountUsageSnapshot(
provider="anthropic",
source="oauth_usage_api",
fetched_at=_utc_now(),
unavailable_reason="Anthropic account limits are only available for OAuth-backed Claude accounts.",
)
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
"anthropic-beta": "oauth-2025-04-20",
"User-Agent": "claude-code/2.1.0",
}
with httpx.Client(timeout=15.0) as client:
response = client.get("https://api.anthropic.com/api/oauth/usage", headers=headers)
response.raise_for_status()
payload = response.json() or {}
windows: list[AccountUsageWindow] = []
mapping = (
("five_hour", "Current session"),
("seven_day", "Current week"),
("seven_day_opus", "Opus week"),
("seven_day_sonnet", "Sonnet week"),
)
for key, label in mapping:
window = payload.get(key) or {}
util = window.get("utilization")
if util is None:
continue
used = float(util) * 100 if float(util) <= 1 else float(util)
windows.append(
AccountUsageWindow(
label=label,
used_percent=used,
reset_at=_parse_dt(window.get("resets_at")),
)
)
details: list[str] = []
extra = payload.get("extra_usage") or {}
if extra.get("is_enabled"):
used_credits = extra.get("used_credits")
monthly_limit = extra.get("monthly_limit")
currency = extra.get("currency") or "USD"
if isinstance(used_credits, (int, float)) and isinstance(monthly_limit, (int, float)):
details.append(
f"Extra usage: {used_credits:.2f} / {monthly_limit:.2f} {currency}"
)
return AccountUsageSnapshot(
provider="anthropic",
source="oauth_usage_api",
fetched_at=_utc_now(),
windows=tuple(windows),
details=tuple(details),
)
def _fetch_openrouter_account_usage(base_url: Optional[str], api_key: Optional[str]) -> Optional[AccountUsageSnapshot]:
runtime = resolve_runtime_provider(
requested="openrouter",
explicit_base_url=base_url,
explicit_api_key=api_key,
)
token = str(runtime.get("api_key", "") or "").strip()
if not token:
return None
normalized = str(runtime.get("base_url", "") or "").rstrip("/")
credits_url = f"{normalized}/credits"
key_url = f"{normalized}/key"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
with httpx.Client(timeout=10.0) as client:
credits_resp = client.get(credits_url, headers=headers)
credits_resp.raise_for_status()
credits = (credits_resp.json() or {}).get("data") or {}
try:
key_resp = client.get(key_url, headers=headers)
key_resp.raise_for_status()
key_data = (key_resp.json() or {}).get("data") or {}
except Exception:
key_data = {}
total_credits = float(credits.get("total_credits") or 0.0)
total_usage = float(credits.get("total_usage") or 0.0)
details = [f"Credits balance: ${max(0.0, total_credits - total_usage):.2f}"]
windows: list[AccountUsageWindow] = []
limit = key_data.get("limit")
limit_remaining = key_data.get("limit_remaining")
limit_reset = str(key_data.get("limit_reset") or "").strip()
usage = key_data.get("usage")
if (
isinstance(limit, (int, float))
and float(limit) > 0
and isinstance(limit_remaining, (int, float))
and 0 <= float(limit_remaining) <= float(limit)
):
limit_value = float(limit)
remaining_value = float(limit_remaining)
used_percent = ((limit_value - remaining_value) / limit_value) * 100
detail_parts = [f"${remaining_value:.2f} of ${limit_value:.2f} remaining"]
if limit_reset:
detail_parts.append(f"resets {limit_reset}")
windows.append(
AccountUsageWindow(
label="API key quota",
used_percent=used_percent,
detail="".join(detail_parts),
)
)
if isinstance(usage, (int, float)):
usage_parts = [f"API key usage: ${float(usage):.2f} total"]
for value, label in (
(key_data.get("usage_daily"), "today"),
(key_data.get("usage_weekly"), "this week"),
(key_data.get("usage_monthly"), "this month"),
):
if isinstance(value, (int, float)) and float(value) > 0:
usage_parts.append(f"${float(value):.2f} {label}")
details.append("".join(usage_parts))
return AccountUsageSnapshot(
provider="openrouter",
source="credits_api",
fetched_at=_utc_now(),
windows=tuple(windows),
details=tuple(details),
)
def fetch_account_usage(
provider: Optional[str],
*,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
) -> Optional[AccountUsageSnapshot]:
normalized = str(provider or "").strip().lower()
if normalized in {"", "auto", "custom"}:
return None
try:
if normalized == "openai-codex":
return _fetch_codex_account_usage()
if normalized == "anthropic":
return _fetch_anthropic_account_usage()
if normalized == "openrouter":
return _fetch_openrouter_account_usage(base_url, api_key)
except Exception:
return None
return None

View File

@@ -1,69 +0,0 @@
"""First-class context snapshot artifacts for live runtime memory evaluation."""
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any
from hermes_constants import get_hermes_home
_SAFE_SEGMENT_RE = re.compile(r"[^A-Za-z0-9_.-]+")
class ContextSnapshotRecorder:
"""Write per-call prompt-composition artifacts for a Hermes session."""
def __init__(self, session_id: str, *, enabled: bool = False, base_dir: str | Path | None = None):
self.session_id = session_id or "session"
self.enabled = bool(enabled)
self.base_dir = Path(base_dir) if base_dir else get_hermes_home() / "reports" / "context_snapshots"
@property
def session_dir(self) -> Path:
safe_session = _SAFE_SEGMENT_RE.sub("_", self.session_id).strip("._") or "session"
return self.base_dir / safe_session
def record_call(
self,
api_call_count: int,
*,
system_prompt: str,
memory_provider_system_prompt: str = "",
memory_prefetch_raw: str = "",
memory_context_block: str = "",
api_user_message: str = "",
api_messages: list[dict[str, Any]] | None = None,
metadata: dict[str, Any] | None = None,
) -> Path | None:
if not self.enabled:
return None
call_dir = self.session_dir / f"call_{api_call_count:03d}"
call_dir.mkdir(parents=True, exist_ok=True)
self._write_text(call_dir / "system_prompt.txt", system_prompt or "")
self._write_text(call_dir / "memory_provider_system_prompt.txt", memory_provider_system_prompt or "")
self._write_text(call_dir / "memory_prefetch_raw.txt", memory_prefetch_raw or "")
self._write_text(call_dir / "memory_context_block.txt", memory_context_block or "")
self._write_text(call_dir / "api_user_message.txt", api_user_message or "")
self._write_json(call_dir / "api_messages.json", api_messages or [])
self._write_json(
call_dir / "metadata.json",
{
"session_id": self.session_id,
"api_call_count": api_call_count,
**(metadata or {}),
},
)
return call_dir
@staticmethod
def _write_text(path: Path, content: str) -> None:
path.write_text(content, encoding="utf-8")
@staticmethod
def _write_json(path: Path, payload: Any) -> None:
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")

25
cli.py
View File

@@ -13,6 +13,7 @@ Usage:
python cli.py --list-tools # List available tools and exit
"""
import concurrent.futures
import logging
import os
import shutil
@@ -63,6 +64,7 @@ from agent.usage_pricing import (
format_duration_compact,
format_token_count_compact,
)
from agent.account_usage import fetch_account_usage, render_account_usage_lines
from hermes_cli.banner import _format_context_length, format_banner_version_label
_COMMAND_SPINNER_FRAMES = ("", "", "", "", "", "", "", "", "", "")
@@ -6471,6 +6473,29 @@ class HermesCLI:
if cost_result.status == "unknown":
print(f" Note: Pricing unknown for {agent.model}")
# Account limits -- fetched off-thread with a hard timeout so slow
# provider APIs don't hang the prompt.
provider = getattr(agent, "provider", None) or getattr(self, "provider", None)
base_url = getattr(agent, "base_url", None) or getattr(self, "base_url", None)
api_key = getattr(agent, "api_key", None) or getattr(self, "api_key", None)
account_snapshot = None
if provider:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _pool:
try:
account_snapshot = _pool.submit(
fetch_account_usage,
provider,
base_url=base_url,
api_key=api_key,
).result(timeout=10.0)
except (concurrent.futures.TimeoutError, Exception):
account_snapshot = None
account_lines = [f" {line}" for line in render_account_usage_lines(account_snapshot)]
if account_lines:
print()
for line in account_lines:
print(line)
if self.verbose:
logging.getLogger().setLevel(logging.DEBUG)
for noisy in ('openai', 'openai._base_client', 'httpx', 'httpcore', 'asyncio', 'hpack', 'grpc', 'modal'):

View File

@@ -1,132 +0,0 @@
# Hindsight local eval homes for live Hermes runtime testing
Issue: #1010
Parent: #985
This document defines a reproducible, profile-scoped evaluation layout for baseline / MemPalace / Hindsight comparisons without requiring Hindsight Cloud.
## Eval home layout
Use three separate `HERMES_HOME` directories so each run has isolated config, memory, sessions, and artifacts.
```text
~/.hermes/profiles/atlas-baseline/
config.yaml
.env
MEMORY.md
USER.md
reports/context_snapshots/
~/.hermes/profiles/atlas-mempalace/
config.yaml
.env
MEMORY.md
USER.md
reports/context_snapshots/
plugins/ # if a local MemPalace plugin is installed for this eval lane
~/.hermes/profiles/atlas-hindsight/
config.yaml
.env
MEMORY.md
USER.md
hindsight/config.json
reports/context_snapshots/
```
## Hindsight local config
The Hindsight provider already loads config from `$HERMES_HOME/hindsight/config.json` first. For the local eval lane, prefer `local_embedded` so Hermes can bring up a local Hindsight daemon without cloud signup.
Example `~/.hermes/profiles/atlas-hindsight/hindsight/config.json`:
```json
{
"mode": "local_embedded",
"memory_mode": "context",
"recall_prefetch_method": "recall",
"llm_provider": "ollama",
"llm_model": "gemma3:12b",
"api_url": "http://localhost:8888"
}
```
Notes:
- `local_embedded` avoids any Hindsight Cloud dependency.
- If `profile` is omitted, Hermes now derives a stable local Hindsight profile name from the active profile identity / `HERMES_HOME` instead of collapsing all local runs into the shared legacy `hermes` profile.
- `local_external` remains valid if you already run a local Hindsight server yourself.
## Runtime switching procedure
Switch by exporting `HERMES_HOME` before launching Hermes.
### 1. Baseline
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-baseline"
unset HERMES_CONTEXT_SNAPSHOTS
hermes chat
```
### 2. MemPalace lane
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-mempalace"
export HERMES_CONTEXT_SNAPSHOTS=1
hermes chat
```
### 3. Hindsight lane
```bash
export HERMES_HOME="$HOME/.hermes/profiles/atlas-hindsight"
export HERMES_CONTEXT_SNAPSHOTS=1
hermes chat
```
## Raw artifact capture
When `HERMES_CONTEXT_SNAPSHOTS=1` is enabled, Hermes writes first-class prompt-composition artifacts under the active home by default.
Artifact tree:
```text
$HERMES_HOME/reports/context_snapshots/<session-id>/call_001/
system_prompt.txt
memory_provider_system_prompt.txt
memory_prefetch_raw.txt
memory_context_block.txt
api_user_message.txt
api_messages.json
metadata.json
```
Minimum files a benchmark should inspect:
- `system_prompt.txt`
- `memory_prefetch_raw.txt`
- `memory_context_block.txt`
- `api_user_message.txt`
- `api_messages.json`
These prove:
- what the system prompt was
- what the provider prefetched
- what entered `<memory-context>`
- what the final API user message looked like
- what full payload reached the model
## Follow-on benchmark workflow
A benchmark issue can now consume this path without redoing integration work:
1. pick one eval home (`atlas-baseline`, `atlas-mempalace`, `atlas-hindsight`)
2. export the corresponding `HERMES_HOME`
3. run Hermes on the same prompt set
4. compare the snapshot artifacts in `reports/context_snapshots/`
5. score recall quality and answer quality separately
## Why this is sovereign
- no hosted Hindsight Cloud dependency is required
- the Hindsight config is profile-scoped under `hindsight/config.json`
- the runtime artifacts stay under the active `HERMES_HOME`
- switching between baseline / MemPalace / Hindsight is just a `HERMES_HOME` swap

View File

@@ -28,6 +28,8 @@ from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
from agent.account_usage import fetch_account_usage, render_account_usage_lines
# ---------------------------------------------------------------------------
# SSL certificate auto-detection for NixOS and other non-standard systems.
# Must run BEFORE any HTTP library (discord, aiohttp, etc.) is imported.
@@ -6481,6 +6483,38 @@ class GatewayRunner:
if cached:
agent = cached[0]
# Resolve provider/base_url/api_key for the account-usage fetch.
# Prefer the live agent; fall back to persisted billing data on the
# SessionDB row so `/usage` still returns account info between turns
# when no agent is resident.
provider = getattr(agent, "provider", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
base_url = getattr(agent, "base_url", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
api_key = getattr(agent, "api_key", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
if not provider and getattr(self, "_session_db", None) is not None:
try:
_entry_for_billing = self.session_store.get_or_create_session(source)
persisted = self._session_db.get_session(_entry_for_billing.session_id) or {}
except Exception:
persisted = {}
provider = provider or persisted.get("billing_provider")
base_url = base_url or persisted.get("billing_base_url")
# Fetch account usage off the event loop so slow provider APIs don't
# block the gateway. Failures are non-fatal -- account_lines stays [].
account_lines: list[str] = []
if provider:
try:
account_snapshot = await asyncio.to_thread(
fetch_account_usage,
provider,
base_url=base_url,
api_key=api_key,
)
except Exception:
account_snapshot = None
if account_snapshot:
account_lines = render_account_usage_lines(account_snapshot, markdown=True)
if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0:
lines = []
@@ -6538,6 +6572,10 @@ class GatewayRunner:
if ctx.compression_count:
lines.append(f"Compressions: {ctx.compression_count}")
if account_lines:
lines.append("")
lines.extend(account_lines)
return "\n".join(lines)
# No agent at all -- check session history for a rough count
@@ -6547,12 +6585,18 @@ class GatewayRunner:
from agent.model_metadata import estimate_messages_tokens_rough
msgs = [m for m in history if m.get("role") in ("user", "assistant") and m.get("content")]
approx = estimate_messages_tokens_rough(msgs)
return (
f"📊 **Session Info**\n"
f"Messages: {len(msgs)}\n"
f"Estimated context: ~{approx:,} tokens\n"
f"_(Detailed usage available after the first agent response)_"
)
lines = [
"📊 **Session Info**",
f"Messages: {len(msgs)}",
f"Estimated context: ~{approx:,} tokens",
"_(Detailed usage available after the first agent response)_",
]
if account_lines:
lines.append("")
lines.extend(account_lines)
return "\n".join(lines)
if account_lines:
return "\n".join(account_lines)
return "No usage data available for this session."
async def _handle_insights_command(self, event: MessageEvent) -> str:

View File

@@ -178,25 +178,6 @@ def _load_config() -> dict:
}
def _derive_local_profile_name(agent_identity: str = "", hermes_home: str = "") -> str:
"""Return a stable profile name for local embedded Hindsight storage.
Prefer the active Hermes profile identity when available, otherwise fall back
to the basename of the active HERMES_HOME path. This prevents all local
Hindsight eval homes from sharing the legacy default profile name "hermes".
"""
from pathlib import Path
import re
raw = (agent_identity or "").strip()
if not raw and hermes_home:
raw = Path(hermes_home).name.strip()
if not raw:
raw = "hermes"
safe = re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip(".-_")
return safe or "hermes"
# ---------------------------------------------------------------------------
# MemoryProvider implementation
# ---------------------------------------------------------------------------
@@ -487,8 +468,6 @@ class HindsightMemoryProvider(MemoryProvider):
def initialize(self, session_id: str, **kwargs) -> None:
self._session_id = session_id
hermes_home = str(kwargs.get("hermes_home") or "")
agent_identity = str(kwargs.get("agent_identity") or "")
# Check client version and auto-upgrade if needed
try:
@@ -521,11 +500,6 @@ class HindsightMemoryProvider(MemoryProvider):
# "local" is a legacy alias for "local_embedded"
if self._mode == "local":
self._mode = "local_embedded"
if self._mode == "local_embedded" and not self._config.get("profile"):
self._config["profile"] = _derive_local_profile_name(
agent_identity=agent_identity,
hermes_home=hermes_home,
)
self._api_key = self._config.get("apiKey") or self._config.get("api_key") or os.environ.get("HINDSIGHT_API_KEY", "")
default_url = _DEFAULT_LOCAL_URL if self._mode in ("local_embedded", "local_external") else _DEFAULT_API_URL
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)

View File

@@ -604,8 +604,6 @@ class AIAgent:
checkpoint_max_snapshots: int = 50,
pass_session_id: bool = False,
persist_session: bool = True,
context_snapshots_enabled: bool | None = None,
context_snapshots_dir: str | None = None,
):
"""
Initialize the AI Agent.
@@ -1131,43 +1129,6 @@ class AIAgent:
except Exception:
_agent_cfg = {}
def _is_enabled(value):
if isinstance(value, bool):
return value
return str(value).strip().lower() in {"1", "true", "yes", "on"}
_debug_cfg = _agent_cfg.get("debug", {}) if isinstance(_agent_cfg, dict) else {}
if not isinstance(_debug_cfg, dict):
_debug_cfg = {}
_snapshot_cfg = _debug_cfg.get("context_snapshots", {})
if not isinstance(_snapshot_cfg, dict):
_snapshot_cfg = {}
_snapshots_env = os.getenv("HERMES_CONTEXT_SNAPSHOTS")
_snapshots_dir_env = os.getenv("HERMES_CONTEXT_SNAPSHOTS_DIR")
if context_snapshots_enabled is None:
if _snapshots_env is not None:
self._context_snapshots_enabled = _is_enabled(_snapshots_env)
else:
self._context_snapshots_enabled = _is_enabled(_snapshot_cfg.get("enabled", False))
else:
self._context_snapshots_enabled = bool(context_snapshots_enabled)
self._context_snapshots_dir = (
context_snapshots_dir
or _snapshots_dir_env
or _snapshot_cfg.get("dir")
or None
)
try:
from agent.context_snapshots import ContextSnapshotRecorder
self._context_snapshot_recorder = ContextSnapshotRecorder(
session_id=self.session_id,
enabled=self._context_snapshots_enabled,
base_dir=self._context_snapshots_dir,
)
except Exception as _snapshot_err:
logger.debug("Context snapshot recorder init failed: %s", _snapshot_err)
self._context_snapshot_recorder = None
# Persistent memory (MEMORY.md + USER.md) -- loaded from disk
self._memory_store = None
self._memory_enabled = False
@@ -8183,17 +8144,12 @@ class AIAgent:
# Use original_user_message (clean input) — user_message may contain
# injected skill content that bloats / breaks provider queries.
_ext_prefetch_cache = ""
_memory_provider_prompt_cache = ""
if self._memory_manager:
try:
_query = original_user_message if isinstance(original_user_message, str) else ""
_ext_prefetch_cache = self._memory_manager.prefetch_all(_query) or ""
except Exception:
pass
try:
_memory_provider_prompt_cache = self._memory_manager.build_system_prompt() or ""
except Exception:
pass
while (api_call_count < self.max_iterations and self.iteration_budget.remaining > 0) or self._budget_grace_call:
# Reset per-turn checkpoint dedup so each iteration can take one snapshot
@@ -8261,8 +8217,6 @@ class AIAgent:
# However, providers like Moonshot AI require a separate 'reasoning_content' field
# on assistant messages with tool_calls. We handle both cases here.
api_messages = []
_current_api_user_message = ""
_current_memory_context_block = ""
for idx, msg in enumerate(messages):
api_msg = msg.copy()
@@ -8277,15 +8231,12 @@ class AIAgent:
_fenced = build_memory_context_block(_ext_prefetch_cache)
if _fenced:
_injections.append(_fenced)
_current_memory_context_block = _fenced
if _plugin_user_context:
_injections.append(_plugin_user_context)
if _injections:
_base = api_msg.get("content", "")
if isinstance(_base, str):
api_msg["content"] = _base + "\n\n" + "\n\n".join(_injections)
if isinstance(api_msg.get("content"), str):
_current_api_user_message = api_msg["content"]
# For ALL assistant messages, pass reasoning back to the API
# This ensures multi-turn reasoning context is preserved
@@ -8320,13 +8271,7 @@ class AIAgent:
from agent.privacy_filter import PrivacyFilter
pf = PrivacyFilter()
# Sanitize messages before they reach the provider
_pf_result = pf.sanitize_messages(api_messages)
if isinstance(_pf_result, tuple):
api_messages, _pf_report = _pf_result
if getattr(pf, "last_report", None) is None:
pf.last_report = _pf_report
else:
api_messages = _pf_result
api_messages = pf.sanitize_messages(api_messages)
if pf.last_report and pf.last_report.had_redactions:
logger.info(f"Privacy Filter: Redacted sensitive data from turn payload. Details: {pf.last_report.summary()}")
except Exception as e:
@@ -8397,27 +8342,6 @@ class AIAgent:
new_tcs.append(tc)
am["tool_calls"] = new_tcs
if self._context_snapshot_recorder:
try:
self._context_snapshot_recorder.record_call(
api_call_count,
system_prompt=effective_system,
memory_provider_system_prompt=_memory_provider_prompt_cache,
memory_prefetch_raw=_ext_prefetch_cache,
memory_context_block=_current_memory_context_block,
api_user_message=_current_api_user_message,
api_messages=api_messages,
metadata={
"model": self.model,
"provider": self.provider,
"platform": self.platform or "",
"api_mode": self.api_mode,
"memory_providers": [p.name for p in getattr(self._memory_manager, "providers", [])],
},
)
except Exception as _snapshot_err:
logger.debug("Context snapshot capture failed: %s", _snapshot_err)
# Calculate approximate request size for logging
total_chars = sum(len(str(msg)) for msg in api_messages)
approx_tokens = estimate_messages_tokens_rough(api_messages)

View File

@@ -1,43 +0,0 @@
from pathlib import Path
from agent.context_snapshots import ContextSnapshotRecorder
def test_disabled_recorder_writes_nothing(tmp_path):
recorder = ContextSnapshotRecorder(session_id="session-1", enabled=False, base_dir=tmp_path)
out = recorder.record_call(
1,
system_prompt="system",
api_messages=[{"role": "user", "content": "hello"}],
)
assert out is None
assert not (tmp_path / "session-1").exists()
def test_enabled_recorder_writes_expected_artifacts(tmp_path):
recorder = ContextSnapshotRecorder(session_id="session-1", enabled=True, base_dir=tmp_path)
out = recorder.record_call(
1,
system_prompt="system prompt",
memory_provider_system_prompt="# Hindsight Memory\nActive.",
memory_prefetch_raw="- remembered fact",
memory_context_block="<memory-context>\nremembered\n</memory-context>",
api_user_message="What do I prefer?\n\n<memory-context>\nremembered\n</memory-context>",
api_messages=[
{"role": "system", "content": "system prompt"},
{"role": "user", "content": "What do I prefer?"},
],
metadata={"provider": "openai", "memory_providers": ["builtin", "hindsight"]},
)
assert out == tmp_path / "session-1" / "call_001"
assert (out / "system_prompt.txt").read_text(encoding="utf-8") == "system prompt"
assert (out / "memory_provider_system_prompt.txt").read_text(encoding="utf-8").startswith("# Hindsight Memory")
assert (out / "memory_prefetch_raw.txt").read_text(encoding="utf-8") == "- remembered fact"
assert "<memory-context>" in (out / "memory_context_block.txt").read_text(encoding="utf-8")
assert "What do I prefer?" in (out / "api_user_message.txt").read_text(encoding="utf-8")
assert (out / "api_messages.json").read_text(encoding="utf-8").startswith("[")
assert '"hindsight"' in (out / "metadata.json").read_text(encoding="utf-8")

View File

@@ -175,3 +175,79 @@ class TestUsageCachedAgent:
result = await runner._handle_usage_command(event)
assert "Cost: included" in result
class TestUsageAccountSection:
"""Account-limits section appended to /usage output."""
@pytest.mark.asyncio
async def test_usage_command_includes_account_section(self, monkeypatch):
agent = _make_mock_agent(provider="openai-codex")
agent.base_url = "https://chatgpt.com/backend-api/codex"
agent.api_key = "unused"
runner = _make_runner(SK, cached_agent=agent)
event = MagicMock()
monkeypatch.setattr(
"gateway.run.fetch_account_usage",
lambda provider, base_url=None, api_key=None: object(),
)
monkeypatch.setattr(
"gateway.run.render_account_usage_lines",
lambda snapshot, markdown=False: [
"📈 **Account limits**",
"Provider: openai-codex (Pro)",
"Session: 85% remaining (15% used)",
],
)
with patch("agent.rate_limit_tracker.format_rate_limit_compact", return_value="RPM: 50/60"), \
patch("agent.usage_pricing.estimate_usage_cost") as mock_cost:
mock_cost.return_value = MagicMock(amount_usd=None, status="included")
result = await runner._handle_usage_command(event)
assert "📊 **Session Token Usage**" in result
assert "📈 **Account limits**" in result
assert "Provider: openai-codex (Pro)" in result
@pytest.mark.asyncio
async def test_usage_command_uses_persisted_provider_when_agent_not_running(self, monkeypatch):
runner = _make_runner(SK)
runner._session_db = MagicMock()
runner._session_db.get_session.return_value = {
"billing_provider": "openai-codex",
"billing_base_url": "https://chatgpt.com/backend-api/codex",
}
session_entry = MagicMock()
session_entry.session_id = "sess-1"
runner.session_store.get_or_create_session.return_value = session_entry
runner.session_store.load_transcript.return_value = [
{"role": "user", "content": "earlier"},
]
calls = {}
async def _fake_to_thread(fn, *args, **kwargs):
calls["args"] = args
calls["kwargs"] = kwargs
return fn(*args, **kwargs)
monkeypatch.setattr("gateway.run.asyncio.to_thread", _fake_to_thread)
monkeypatch.setattr(
"gateway.run.fetch_account_usage",
lambda provider, base_url=None, api_key=None: object(),
)
monkeypatch.setattr(
"gateway.run.render_account_usage_lines",
lambda snapshot, markdown=False: [
"📈 **Account limits**",
"Provider: openai-codex (Pro)",
],
)
event = MagicMock()
result = await runner._handle_usage_command(event)
assert calls["args"] == ("openai-codex",)
assert calls["kwargs"]["base_url"] == "https://chatgpt.com/backend-api/codex"
assert "📊 **Session Info**" in result
assert "📈 **Account limits**" in result

View File

@@ -596,26 +596,3 @@ class TestAvailability:
monkeypatch.setenv("HINDSIGHT_MODE", "local")
p = HindsightMemoryProvider()
assert p.is_available()
def test_local_embedded_profile_defaults_to_agent_identity(self, tmp_path, monkeypatch):
config_path = tmp_path / "hindsight" / "config.json"
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(json.dumps({
"mode": "local_embedded",
"llm_provider": "ollama",
"llm_model": "gemma3:12b",
}))
monkeypatch.setattr(
"plugins.memory.hindsight.get_hermes_home",
lambda: tmp_path,
)
p = HindsightMemoryProvider()
p.initialize(
session_id="test-session",
hermes_home=str(tmp_path / "profiles" / "atlas-hindsight"),
platform="cli",
agent_identity="atlas-hindsight",
)
assert p._config["profile"] == "atlas-hindsight"

View File

@@ -1,94 +0,0 @@
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import importlib
import sys
import types
def _make_tool_defs(*names: str) -> list:
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
def _mock_response(content="Done", finish_reason="stop"):
msg = SimpleNamespace(content=content, tool_calls=None)
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
return SimpleNamespace(choices=[choice], usage=SimpleNamespace(prompt_tokens=1, completion_tokens=1, total_tokens=2))
def _load_ai_agent():
sys.modules.setdefault("agent.auxiliary_client", types.SimpleNamespace(call_llm=lambda *a, **k: ""))
run_agent = importlib.import_module("run_agent")
return run_agent.AIAgent
def test_run_conversation_writes_context_snapshot_artifacts(tmp_path):
AIAgent = _load_ai_agent()
class _FakePrivacyFilter:
def __init__(self):
self.last_report = None
def sanitize_messages(self, messages):
return list(messages)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("hermes_cli.plugins.invoke_hook", return_value=[]),
patch.dict(sys.modules, {"agent.privacy_filter": types.SimpleNamespace(PrivacyFilter=_FakePrivacyFilter)}),
):
agent = AIAgent(
api_key="test-key-1234567890",
base_url="https://example.com/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
context_snapshots_enabled=True,
context_snapshots_dir=str(tmp_path),
)
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _mock_response(content="Done")
agent._build_system_prompt = MagicMock(return_value="Core system prompt")
agent._memory_manager = MagicMock()
agent._memory_manager.prefetch_all.return_value = "- remembered preference"
agent._memory_manager.build_system_prompt.return_value = "# Hindsight Memory\nActive."
agent._memory_manager.providers = [
SimpleNamespace(name="builtin"),
SimpleNamespace(name="hindsight"),
]
result = agent.run_conversation("What do I prefer?")
assert result["final_response"] == "Done"
call_dir = tmp_path / agent.session_id / "call_001"
assert call_dir.exists()
assert (call_dir / "system_prompt.txt").read_text(encoding="utf-8") == "Core system prompt"
assert (call_dir / "memory_provider_system_prompt.txt").read_text(encoding="utf-8").startswith("# Hindsight Memory")
assert (call_dir / "memory_prefetch_raw.txt").read_text(encoding="utf-8") == "- remembered preference"
assert "<memory-context>" in (call_dir / "memory_context_block.txt").read_text(encoding="utf-8")
api_user_message = (call_dir / "api_user_message.txt").read_text(encoding="utf-8")
assert "What do I prefer?" in api_user_message
assert "remembered preference" in api_user_message
api_messages = (call_dir / "api_messages.json").read_text(encoding="utf-8")
assert '"role": "system"' in api_messages
assert '"role": "user"' in api_messages
metadata = (call_dir / "metadata.json").read_text(encoding="utf-8")
assert '"hindsight"' in metadata

203
tests/test_account_usage.py Normal file
View File

@@ -0,0 +1,203 @@
from datetime import datetime, timezone
from agent.account_usage import (
AccountUsageSnapshot,
AccountUsageWindow,
fetch_account_usage,
render_account_usage_lines,
)
class _Response:
def __init__(self, payload, status_code=200):
self._payload = payload
self.status_code = status_code
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError(f"HTTP {self.status_code}")
def json(self):
return self._payload
class _Client:
def __init__(self, payload):
self._payload = payload
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get(self, url, headers=None):
return _Response(self._payload)
class _RoutingClient:
def __init__(self, payloads):
self._payloads = payloads
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get(self, url, headers=None):
return _Response(self._payloads[url])
def test_fetch_account_usage_codex(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_codex_runtime_credentials",
lambda refresh_if_expiring=True: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage._read_codex_tokens",
lambda: {"tokens": {"account_id": "acct_123"}},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=15.0: _Client(
{
"plan_type": "pro",
"rate_limit": {
"primary_window": {
"used_percent": 15,
"reset_at": 1_900_000_000,
"limit_window_seconds": 18000,
},
"secondary_window": {
"used_percent": 40,
"reset_at": 1_900_500_000,
"limit_window_seconds": 604800,
},
},
"credits": {"has_credits": True, "balance": 12.5},
}
),
)
snapshot = fetch_account_usage("openai-codex")
assert snapshot is not None
assert snapshot.plan == "Pro"
assert len(snapshot.windows) == 2
assert snapshot.windows[0].label == "Session"
assert snapshot.windows[0].used_percent == 15.0
assert snapshot.windows[0].reset_at == datetime.fromtimestamp(1_900_000_000, tz=timezone.utc)
assert "Credits balance: $12.50" in snapshot.details
def test_render_account_usage_lines_includes_reset_and_provider():
snapshot = AccountUsageSnapshot(
provider="openai-codex",
source="usage_api",
fetched_at=datetime.now(timezone.utc),
plan="Pro",
windows=(
AccountUsageWindow(
label="Session",
used_percent=25,
reset_at=datetime.now(timezone.utc),
),
),
details=("Credits balance: $9.99",),
)
lines = render_account_usage_lines(snapshot)
assert lines[0] == "📈 Account limits"
assert "openai-codex (Pro)" in lines[1]
assert "Session: 75% remaining (25% used)" in lines[2]
assert "Credits balance: $9.99" in lines[3]
def test_fetch_account_usage_openrouter_uses_limit_remaining_and_ignores_deprecated_rate_limit(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_runtime_provider",
lambda requested, explicit_base_url=None, explicit_api_key=None: {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=10.0: _RoutingClient(
{
"https://openrouter.ai/api/v1/credits": {
"data": {"total_credits": 300.0, "total_usage": 10.92}
},
"https://openrouter.ai/api/v1/key": {
"data": {
"limit": 100.0,
"limit_remaining": 70.0,
"limit_reset": "monthly",
"usage": 12.5,
"usage_daily": 0.5,
"usage_weekly": 2.0,
"usage_monthly": 8.0,
"rate_limit": {"requests": -1, "interval": "10s"},
}
},
}
),
)
snapshot = fetch_account_usage("openrouter")
assert snapshot is not None
assert snapshot.windows == (
AccountUsageWindow(
label="API key quota",
used_percent=30.0,
detail="$70.00 of $100.00 remaining • resets monthly",
),
)
assert "Credits balance: $289.08" in snapshot.details
assert "API key usage: $12.50 total • $0.50 today • $2.00 this week • $8.00 this month" in snapshot.details
assert all("-1 requests / 10s" not in line for line in render_account_usage_lines(snapshot))
def test_fetch_account_usage_openrouter_omits_quota_window_when_key_has_no_limit(monkeypatch):
monkeypatch.setattr(
"agent.account_usage.resolve_runtime_provider",
lambda requested, explicit_base_url=None, explicit_api_key=None: {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "***",
},
)
monkeypatch.setattr(
"agent.account_usage.httpx.Client",
lambda timeout=10.0: _RoutingClient(
{
"https://openrouter.ai/api/v1/credits": {
"data": {"total_credits": 100.0, "total_usage": 25.5}
},
"https://openrouter.ai/api/v1/key": {
"data": {
"limit": None,
"limit_remaining": None,
"usage": 25.5,
"usage_daily": 1.25,
"usage_weekly": 4.5,
"usage_monthly": 18.0,
}
},
}
),
)
snapshot = fetch_account_usage("openrouter")
assert snapshot is not None
assert snapshot.windows == ()
assert "Credits balance: $74.50" in snapshot.details
assert "API key usage: $25.50 total • $1.25 today • $4.50 this week • $18.00 this month" in snapshot.details

View File

@@ -1,22 +0,0 @@
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DOC = ROOT / "docs" / "hindsight-local-eval.md"
def test_hindsight_local_eval_doc_exists_and_covers_switching():
assert DOC.exists(), "missing Hindsight local eval doc"
text = DOC.read_text(encoding="utf-8")
for snippet in (
"atlas-baseline",
"atlas-mempalace",
"atlas-hindsight",
"HERMES_HOME",
"HERMES_CONTEXT_SNAPSHOTS",
"memory_prefetch_raw.txt",
"api_user_message.txt",
"local_embedded",
"hindsight/config.json",
):
assert snippet in text