Compare commits

..

10 Commits

Author SHA1 Message Date
Alexander Whitestone
0d92b9ad15 feat(scripts): add memory budget enforcement tool (#256)
All checks were successful
Forge CI / smoke-and-build (pull_request) Successful in 40s
Add scripts/memory_budget.py — a CI-friendly tool for checking and
enforcing character budgets on MEMORY.md and USER.md memory files.

Features:
- Checks MEMORY.md vs memory_char_limit (default 2200)
- Checks USER.md vs user_char_limit (default 1375)
- Estimates total injection cost (chars / ~4 chars per token)
- Alerts when approaching limits (>80% usage)
- --report flag for detailed breakdown with progress bars
- --verbose flag for per-entry details
- --enforce flag trims oldest entries to fit budget
- --json flag for machine-readable output (CI integration)
- Exit codes: 0=within budget, 1=over budget, 2=trimmed
- Suggestions for largest entries when over budget

Relates to #256
2026-04-09 21:13:01 -04:00
7d2421a15f Merge pull request 'ci: add duplicate model detection check' (#235) from feat/ci-no-duplicate-models into main
All checks were successful
Forge CI / smoke-and-build (push) Successful in 54s
2026-04-08 22:55:16 +00:00
Alexander Whitestone
5a942d71a1 ci: add duplicate model check step to CI workflow
All checks were successful
Forge CI / smoke-and-build (pull_request) Successful in 49s
2026-04-08 08:16:00 -04:00
Alexander Whitestone
044f0f8951 ci: add check_no_duplicate_models.py - catches duplicate model IDs (#224) 2026-04-08 08:15:27 -04:00
61c59ce332 Merge pull request 'fix(config): replace kimi-for-coding with kimi-k2.5 across codebase' (#225) from fix/kimi-fallback-rebase into main
Some checks failed
Forge CI / smoke-and-build (push) Successful in 50s
Notebook CI / notebook-smoke (push) Failing after 13s
2026-04-08 06:57:03 +00:00
01ce8ae889 fix: remove duplicate kimi-k2.5 entries from model lists
All checks were successful
Forge CI / smoke-and-build (pull_request) Successful in 47s
2026-04-08 00:49:52 +00:00
Alexander Whitestone
b179250ab8 fix(config): replace kimi-for-coding with kimi-k2.5 in all refs
All checks were successful
Forge CI / smoke-and-build (pull_request) Successful in 36s
- model_metadata.py
- fallback-config.yaml
- hermes_cli/auth.py, main.py, models.py
- test_api_key_providers.py
- docs/integrations/providers.md
- ezra quarterly report
2026-04-07 12:58:44 -04:00
01a3f47a5b Merge pull request '[claude] Fix syntax errors in Ollama provider wiring (#223)' (#224) from claude/issue-223 into main
All checks were successful
Forge CI / smoke-and-build (push) Successful in 57s
2026-04-07 16:40:34 +00:00
Alexander Whitestone
4538e11f97 fix(auxiliary_client): repair syntax errors in Ollama provider wiring
All checks were successful
Forge CI / smoke-and-build (pull_request) Successful in 45s
The Ollama feature commit introduced two broken `OpenAI(api_key=*** base_url=...)` calls
where `***` was a redacted variable name and the separating comma was missing.
Replace both occurrences with `api_key=api_key, base_url=base_url`.

Fixes #223
2026-04-07 12:04:40 -04:00
7936483ffc feat(provider): first-class Ollama support + Gemma 4 defaults (#169)
- Add 'ollama' to CLI provider choices and auth aliases
- Wire Ollama through resolve_provider_client with auto-detection
- Add _try_ollama to auxiliary fallback chain (before local/custom)
- Add ollama to vision provider order
- Update model_metadata.py: ollama prefix + gemma-4-* context lengths (256K)
- Default model: gemma4:12b when provider=ollama
2026-04-07 12:04:10 -04:00
8 changed files with 464 additions and 11 deletions

View File

@@ -47,6 +47,11 @@ jobs:
source .venv/bin/activate
python scripts/syntax_guard.py
- name: No duplicate models
run: |
source .venv/bin/activate
python scripts/check_no_duplicate_models.py
- name: Green-path E2E
run: |
source .venv/bin/activate

View File

@@ -940,7 +940,7 @@ def _try_ollama() -> Tuple[Optional[OpenAI], Optional[str]]:
return None, None
api_key = (os.getenv("OLLAMA_API_KEY", "") or "ollama").strip()
model = _read_main_model() or "gemma4:12b"
return OpenAI(api_key=*** base_url=base_url), model
return OpenAI(api_key=api_key, base_url=base_url), model
def _get_provider_chain() -> List[tuple]:
@@ -1216,7 +1216,7 @@ def resolve_provider_client(
base_url = base_url + "/v1" if not base_url.endswith("/v1") else base_url
api_key = (explicit_api_key or os.getenv("OLLAMA_API_KEY", "") or "ollama").strip()
final_model = model or _read_main_model() or "gemma4:12b"
client = OpenAI(api_key=*** base_url=base_url)
client = OpenAI(api_key=api_key, base_url=base_url)
return (_to_async_client(client, final_model) if async_mode else (client, final_model))
# ── Custom endpoint (OPENAI_BASE_URL + OPENAI_API_KEY) ───────────

View File

@@ -148,7 +148,7 @@ PROVIDER_TO_MODELS_DEV: Dict[str, str] = {
"openrouter": "openrouter",
"anthropic": "anthropic",
"zai": "zai",
"kimi-coding": "kimi-for-coding",
"kimi-coding": "kimi-k2.5",
"minimax": "minimax",
"minimax-cn": "minimax-cn",
"deepseek": "deepseek",

View File

@@ -2126,7 +2126,7 @@ def _model_flow_kimi(config, current_model=""):
# Step 3: Model selection — show appropriate models for the endpoint
if is_coding_plan:
# Coding Plan models (kimi-k2.5 first — kimi-for-coding retired due to 403)
# Coding Plan models (kimi-k2.5 first)
model_list = [
"kimi-k2.5",
"kimi-k2-thinking",

View File

@@ -78,7 +78,7 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
extra_env_vars=("GLM_API_KEY", "ZAI_API_KEY", "Z_AI_API_KEY"),
base_url_env_var="GLM_BASE_URL",
),
"kimi-for-coding": HermesOverlay(
"kimi-k2.5": HermesOverlay(
transport="openai_chat",
base_url_env_var="KIMI_BASE_URL",
),
@@ -162,10 +162,10 @@ ALIASES: Dict[str, str] = {
"z.ai": "zai",
"zhipu": "zai",
# kimi-for-coding (models.dev ID)
"kimi": "kimi-for-coding",
"kimi-coding": "kimi-for-coding",
"moonshot": "kimi-for-coding",
# kimi-k2.5 (models.dev ID)
"kimi": "kimi-k2.5",
"kimi-coding": "kimi-k2.5",
"moonshot": "kimi-k2.5",
# minimax-cn
"minimax-china": "minimax-cn",
@@ -376,7 +376,7 @@ LABELS: Dict[str, str] = {
"github-copilot": "GitHub Copilot",
"anthropic": "Anthropic",
"zai": "Z.AI / GLM",
"kimi-for-coding": "Kimi / Moonshot",
"kimi-k2.5": "Kimi / Moonshot",
"minimax": "MiniMax",
"minimax-cn": "MiniMax (China)",
"deepseek": "DeepSeek",

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""CI check: ensure no duplicate model IDs exist in provider configs.
Catches the class of bugs where a rename introduces a duplicate entry
(e.g. PR #225 kimi-for-coding -> kimi-k2.5 when kimi-k2.5 already existed).
Runtime target: < 2 seconds.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Allow running from repo root
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT))
def check_openrouter_models() -> list[str]:
"""Check OPENROUTER_MODELS for duplicate model IDs."""
try:
from hermes_cli.models import OPENROUTER_MODELS
except ImportError:
return []
errors = []
seen: dict[str, int] = {}
for i, (model_id, _desc) in enumerate(OPENROUTER_MODELS):
if model_id in seen:
errors.append(
f" OPENROUTER_MODELS: duplicate '{model_id}' "
f"(index {seen[model_id]} and {i})"
)
else:
seen[model_id] = i
return errors
def check_provider_models() -> list[str]:
"""Check _PROVIDER_MODELS for duplicate model IDs within each provider list."""
from hermes_cli.models import _PROVIDER_MODELS
errors = []
for provider, models in _PROVIDER_MODELS.items():
seen: dict[str, int] = {}
for i, model_id in enumerate(models):
if model_id in seen:
errors.append(
f" _PROVIDER_MODELS['{provider}']: duplicate '{model_id}' "
f"(index {seen[model_id]} and {i})"
)
else:
seen[model_id] = i
return errors
def main() -> int:
errors = []
errors.extend(check_openrouter_models())
errors.extend(check_provider_models())
if errors:
print(f"FAIL: {len(errors)} duplicate model(s) found:")
for e in errors:
print(e)
return 1
print("OK: no duplicate model entries")
return 0
if __name__ == "__main__":
raise SystemExit(main())

374
scripts/memory_budget.py Normal file
View File

@@ -0,0 +1,374 @@
#!/usr/bin/env python3
"""Memory Budget Enforcement Tool for hermes-agent.
Checks and enforces character/token budgets on MEMORY.md and USER.md files.
Designed for CI integration, pre-commit hooks, and manual health checks.
Usage:
python scripts/memory_budget.py # Check budget (exit 0/1)
python scripts/memory_budget.py --report # Detailed breakdown
python scripts/memory_budget.py --enforce # Trim entries to fit budget
python scripts/memory_budget.py --hermes-home ~/.hermes # Custom HERMES_HOME
Exit codes:
0 Within budget
1 Over budget (no trimming performed)
2 Entries were trimmed (--enforce was used)
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import List
# ---------------------------------------------------------------------------
# Constants (must stay in sync with tools/memory_tool.py)
# ---------------------------------------------------------------------------
ENTRY_DELIMITER = "\n§\n"
DEFAULT_MEMORY_CHAR_LIMIT = 2200
DEFAULT_USER_CHAR_LIMIT = 1375
WARN_THRESHOLD = 0.80 # alert when >80% of budget used
CHARS_PER_TOKEN = 4 # rough estimate matching agent/model_metadata.py
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class FileReport:
"""Budget analysis for a single memory file."""
label: str # "MEMORY.md" or "USER.md"
path: Path
exists: bool
char_limit: int
raw_chars: int # raw file size in chars
entry_chars: int # chars after splitting/rejoining entries
entry_count: int
entries: List[str] # individual entry texts
@property
def usage_pct(self) -> float:
if self.char_limit <= 0:
return 0.0
return min(100.0, (self.entry_chars / self.char_limit) * 100)
@property
def estimated_tokens(self) -> int:
return self.entry_chars // CHARS_PER_TOKEN
@property
def over_budget(self) -> bool:
return self.entry_chars > self.char_limit
@property
def warning(self) -> bool:
return self.usage_pct >= (WARN_THRESHOLD * 100)
@property
def remaining_chars(self) -> int:
return max(0, self.char_limit - self.entry_chars)
def _read_entries(path: Path) -> List[str]:
"""Read a memory file and split into entries (matching MemoryStore logic)."""
if not path.exists():
return []
try:
raw = path.read_text(encoding="utf-8")
except (OSError, IOError):
return []
if not raw.strip():
return []
entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)]
return [e for e in entries if e]
def _write_entries(path: Path, entries: List[str]) -> None:
"""Write entries back to a memory file."""
content = ENTRY_DELIMITER.join(entries) if entries else ""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
def analyze_file(path: Path, label: str, char_limit: int) -> FileReport:
"""Analyze a single memory file against its budget."""
exists = path.exists()
entries = _read_entries(path) if exists else []
raw_chars = path.stat().st_size if exists else 0
joined = ENTRY_DELIMITER.join(entries)
return FileReport(
label=label,
path=path,
exists=exists,
char_limit=char_limit,
raw_chars=raw_chars,
entry_chars=len(joined),
entry_count=len(entries),
entries=entries,
)
def trim_entries(report: FileReport) -> List[str]:
"""Trim oldest entries until the file fits within its budget.
Entries are removed from the front (oldest first) because memory files
append new entries at the end.
"""
entries = list(report.entries)
joined = ENTRY_DELIMITER.join(entries)
while len(joined) > report.char_limit and entries:
entries.pop(0)
joined = ENTRY_DELIMITER.join(entries)
return entries
# ---------------------------------------------------------------------------
# Reporting
# ---------------------------------------------------------------------------
def _bar(pct: float, width: int = 30) -> str:
"""Render a text progress bar."""
filled = int(pct / 100 * width)
bar = "#" * filled + "-" * (width - filled)
return f"[{bar}]"
def print_report(memory: FileReport, user: FileReport, *, verbose: bool = False) -> None:
"""Print a human-readable budget report."""
total_chars = memory.entry_chars + user.entry_chars
total_limit = memory.char_limit + user.char_limit
total_tokens = total_chars // CHARS_PER_TOKEN
total_pct = (total_chars / total_limit * 100) if total_limit > 0 else 0
print("=" * 60)
print(" MEMORY BUDGET REPORT")
print("=" * 60)
print()
for rpt in (memory, user):
status = "OVER " if rpt.over_budget else ("WARN" if rpt.warning else " OK ")
print(f" {rpt.label:12s} {status} {_bar(rpt.usage_pct)} {rpt.usage_pct:5.1f}%")
print(f" {'':12s} {rpt.entry_chars:,}/{rpt.char_limit:,} chars "
f"| {rpt.entry_count} entries "
f"| ~{rpt.estimated_tokens:,} tokens")
if rpt.exists and verbose and rpt.entries:
for i, entry in enumerate(rpt.entries):
preview = entry[:72].replace("\n", " ")
if len(entry) > 72:
preview += "..."
print(f" #{i+1}: ({len(entry)} chars) {preview}")
print()
print(f" TOTAL {_bar(total_pct)} {total_pct:5.1f}%")
print(f" {total_chars:,}/{total_limit:,} chars | ~{total_tokens:,} tokens")
print()
# Alerts
alerts = []
for rpt in (memory, user):
if rpt.over_budget:
overshoot = rpt.entry_chars - rpt.char_limit
alerts.append(
f" CRITICAL {rpt.label} is {overshoot:,} chars over budget "
f"({rpt.entry_chars:,}/{rpt.char_limit:,}). "
f"Run with --enforce to auto-trim."
)
elif rpt.warning:
alerts.append(
f" WARNING {rpt.label} is at {rpt.usage_pct:.0f}% capacity. "
f"Consider compressing or cleaning up entries."
)
if alerts:
print(" ALERTS")
print(" ------")
for a in alerts:
print(a)
print()
def print_json(memory: FileReport, user: FileReport) -> None:
"""Print a JSON report for machine consumption."""
import json
def _rpt_dict(r: FileReport) -> dict:
return {
"label": r.label,
"path": str(r.path),
"exists": r.exists,
"char_limit": r.char_limit,
"entry_chars": r.entry_chars,
"entry_count": r.entry_count,
"estimated_tokens": r.estimated_tokens,
"usage_pct": round(r.usage_pct, 1),
"over_budget": r.over_budget,
"warning": r.warning,
"remaining_chars": r.remaining_chars,
}
total_chars = memory.entry_chars + user.entry_chars
total_limit = memory.char_limit + user.char_limit
data = {
"memory": _rpt_dict(memory),
"user": _rpt_dict(user),
"total": {
"chars": total_chars,
"limit": total_limit,
"estimated_tokens": total_chars // CHARS_PER_TOKEN,
"usage_pct": round((total_chars / total_limit * 100) if total_limit else 0, 1),
"over_budget": memory.over_budget or user.over_budget,
"warning": memory.warning or user.warning,
},
}
print(json.dumps(data, indent=2))
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _resolve_hermes_home(custom: str | None) -> Path:
"""Resolve HERMES_HOME directory."""
if custom:
return Path(custom).expanduser()
import os
return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
def main() -> int:
parser = argparse.ArgumentParser(
description="Check and enforce memory budgets for hermes-agent.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--hermes-home", metavar="DIR",
help="Custom HERMES_HOME directory (default: $HERMES_HOME or ~/.hermes)",
)
parser.add_argument(
"--memory-limit", type=int, default=DEFAULT_MEMORY_CHAR_LIMIT,
help=f"Character limit for MEMORY.md (default: {DEFAULT_MEMORY_CHAR_LIMIT})",
)
parser.add_argument(
"--user-limit", type=int, default=DEFAULT_USER_CHAR_LIMIT,
help=f"Character limit for USER.md (default: {DEFAULT_USER_CHAR_LIMIT})",
)
parser.add_argument(
"--report", action="store_true",
help="Print detailed per-file budget report",
)
parser.add_argument(
"--verbose", "-v", action="store_true",
help="Show individual entry details in report",
)
parser.add_argument(
"--enforce", action="store_true",
help="Trim oldest entries to fit within budget (writes to disk)",
)
parser.add_argument(
"--json", action="store_true", dest="json_output",
help="Output report as JSON (for CI/scripting)",
)
args = parser.parse_args()
hermes_home = _resolve_hermes_home(args.hermes_home)
memories_dir = hermes_home / "memories"
# Analyze both files
memory = analyze_file(
memories_dir / "MEMORY.md", "MEMORY.md", args.memory_limit,
)
user = analyze_file(
memories_dir / "USER.md", "USER.md", args.user_limit,
)
over_budget = memory.over_budget or user.over_budget
trimmed = False
# Enforce budget by trimming entries
if args.enforce and over_budget:
for rpt in (memory, user):
if rpt.over_budget and rpt.exists:
trimmed_entries = trim_entries(rpt)
removed = rpt.entry_count - len(trimmed_entries)
if removed > 0:
_write_entries(rpt.path, trimmed_entries)
rpt.entries = trimmed_entries
rpt.entry_count = len(trimmed_entries)
rpt.entry_chars = len(ENTRY_DELIMITER.join(trimmed_entries))
rpt.raw_chars = rpt.path.stat().st_size
print(f" Trimmed {removed} oldest entries from {rpt.label} "
f"({rpt.entry_chars:,}/{rpt.char_limit:,} chars now)")
trimmed = True
# Re-check after trimming
over_budget = memory.over_budget or user.over_budget
# Output
if args.json_output:
print_json(memory, user)
elif args.report or args.verbose:
print_report(memory, user, verbose=args.verbose)
else:
# Compact summary
if over_budget:
print("Memory budget: OVER")
for rpt in (memory, user):
if rpt.over_budget:
print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars "
f"({rpt.usage_pct:.0f}%)")
elif memory.warning or user.warning:
print("Memory budget: WARNING")
for rpt in (memory, user):
if rpt.warning:
print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars "
f"({rpt.usage_pct:.0f}%)")
else:
print("Memory budget: OK")
for rpt in (memory, user):
if rpt.exists:
print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars "
f"({rpt.usage_pct:.0f}%)")
# Suggest actions when over budget but not enforced
if over_budget and not args.enforce:
suggestions = []
for rpt in (memory, user):
if rpt.over_budget:
suggestions.append(
f" - {rpt.label}: remove stale entries or run with --enforce to auto-trim"
)
# Identify largest entries
if rpt.entries:
indexed = sorted(enumerate(rpt.entries), key=lambda x: len(x[1]), reverse=True)
top3 = indexed[:3]
for idx, entry in top3:
preview = entry[:60].replace("\n", " ")
if len(entry) > 60:
preview += "..."
suggestions.append(
f" largest entry #{idx+1}: ({len(entry)} chars) {preview}"
)
if suggestions:
print()
print("Suggestions:")
for s in suggestions:
print(s)
# Exit code
if trimmed:
return 2
if over_budget:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -895,7 +895,7 @@ class TestKimiMoonshotModelListIsolation:
def test_moonshot_list_excludes_coding_plan_only_models(self):
from hermes_cli.main import _PROVIDER_MODELS
moonshot_models = _PROVIDER_MODELS["moonshot"]
coding_plan_only = {"kimi-k2-thinking-turbo"}
coding_plan_only = {"kimi-k2.5", "kimi-k2-thinking-turbo"}
leaked = set(moonshot_models) & coding_plan_only
assert not leaked, f"Moonshot list contains Coding Plan-only models: {leaked}"