#!/usr/bin/env python3 """Memory Budget Enforcement Tool for hermes-agent. Checks and enforces character/token budgets on MEMORY.md and USER.md files. Designed for CI integration, pre-commit hooks, and manual health checks. Usage: python scripts/memory_budget.py # Check budget (exit 0/1) python scripts/memory_budget.py --report # Detailed breakdown python scripts/memory_budget.py --enforce # Trim entries to fit budget python scripts/memory_budget.py --hermes-home ~/.hermes # Custom HERMES_HOME Exit codes: 0 Within budget 1 Over budget (no trimming performed) 2 Entries were trimmed (--enforce was used) """ from __future__ import annotations import argparse import sys from dataclasses import dataclass from pathlib import Path from typing import List # --------------------------------------------------------------------------- # Constants (must stay in sync with tools/memory_tool.py) # --------------------------------------------------------------------------- ENTRY_DELIMITER = "\n§\n" DEFAULT_MEMORY_CHAR_LIMIT = 2200 DEFAULT_USER_CHAR_LIMIT = 1375 WARN_THRESHOLD = 0.80 # alert when >80% of budget used CHARS_PER_TOKEN = 4 # rough estimate matching agent/model_metadata.py # --------------------------------------------------------------------------- # Data structures # --------------------------------------------------------------------------- @dataclass class FileReport: """Budget analysis for a single memory file.""" label: str # "MEMORY.md" or "USER.md" path: Path exists: bool char_limit: int raw_chars: int # raw file size in chars entry_chars: int # chars after splitting/rejoining entries entry_count: int entries: List[str] # individual entry texts @property def usage_pct(self) -> float: if self.char_limit <= 0: return 0.0 return min(100.0, (self.entry_chars / self.char_limit) * 100) @property def estimated_tokens(self) -> int: return self.entry_chars // CHARS_PER_TOKEN @property def over_budget(self) -> bool: return self.entry_chars > self.char_limit @property def warning(self) -> bool: return self.usage_pct >= (WARN_THRESHOLD * 100) @property def remaining_chars(self) -> int: return max(0, self.char_limit - self.entry_chars) def _read_entries(path: Path) -> List[str]: """Read a memory file and split into entries (matching MemoryStore logic).""" if not path.exists(): return [] try: raw = path.read_text(encoding="utf-8") except (OSError, IOError): return [] if not raw.strip(): return [] entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)] return [e for e in entries if e] def _write_entries(path: Path, entries: List[str]) -> None: """Write entries back to a memory file.""" content = ENTRY_DELIMITER.join(entries) if entries else "" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def analyze_file(path: Path, label: str, char_limit: int) -> FileReport: """Analyze a single memory file against its budget.""" exists = path.exists() entries = _read_entries(path) if exists else [] raw_chars = path.stat().st_size if exists else 0 joined = ENTRY_DELIMITER.join(entries) return FileReport( label=label, path=path, exists=exists, char_limit=char_limit, raw_chars=raw_chars, entry_chars=len(joined), entry_count=len(entries), entries=entries, ) def trim_entries(report: FileReport) -> List[str]: """Trim oldest entries until the file fits within its budget. Entries are removed from the front (oldest first) because memory files append new entries at the end. """ entries = list(report.entries) joined = ENTRY_DELIMITER.join(entries) while len(joined) > report.char_limit and entries: entries.pop(0) joined = ENTRY_DELIMITER.join(entries) return entries # --------------------------------------------------------------------------- # Reporting # --------------------------------------------------------------------------- def _bar(pct: float, width: int = 30) -> str: """Render a text progress bar.""" filled = int(pct / 100 * width) bar = "#" * filled + "-" * (width - filled) return f"[{bar}]" def print_report(memory: FileReport, user: FileReport, *, verbose: bool = False) -> None: """Print a human-readable budget report.""" total_chars = memory.entry_chars + user.entry_chars total_limit = memory.char_limit + user.char_limit total_tokens = total_chars // CHARS_PER_TOKEN total_pct = (total_chars / total_limit * 100) if total_limit > 0 else 0 print("=" * 60) print(" MEMORY BUDGET REPORT") print("=" * 60) print() for rpt in (memory, user): status = "OVER " if rpt.over_budget else ("WARN" if rpt.warning else " OK ") print(f" {rpt.label:12s} {status} {_bar(rpt.usage_pct)} {rpt.usage_pct:5.1f}%") print(f" {'':12s} {rpt.entry_chars:,}/{rpt.char_limit:,} chars " f"| {rpt.entry_count} entries " f"| ~{rpt.estimated_tokens:,} tokens") if rpt.exists and verbose and rpt.entries: for i, entry in enumerate(rpt.entries): preview = entry[:72].replace("\n", " ") if len(entry) > 72: preview += "..." print(f" #{i+1}: ({len(entry)} chars) {preview}") print() print(f" TOTAL {_bar(total_pct)} {total_pct:5.1f}%") print(f" {total_chars:,}/{total_limit:,} chars | ~{total_tokens:,} tokens") print() # Alerts alerts = [] for rpt in (memory, user): if rpt.over_budget: overshoot = rpt.entry_chars - rpt.char_limit alerts.append( f" CRITICAL {rpt.label} is {overshoot:,} chars over budget " f"({rpt.entry_chars:,}/{rpt.char_limit:,}). " f"Run with --enforce to auto-trim." ) elif rpt.warning: alerts.append( f" WARNING {rpt.label} is at {rpt.usage_pct:.0f}% capacity. " f"Consider compressing or cleaning up entries." ) if alerts: print(" ALERTS") print(" ------") for a in alerts: print(a) print() def print_json(memory: FileReport, user: FileReport) -> None: """Print a JSON report for machine consumption.""" import json def _rpt_dict(r: FileReport) -> dict: return { "label": r.label, "path": str(r.path), "exists": r.exists, "char_limit": r.char_limit, "entry_chars": r.entry_chars, "entry_count": r.entry_count, "estimated_tokens": r.estimated_tokens, "usage_pct": round(r.usage_pct, 1), "over_budget": r.over_budget, "warning": r.warning, "remaining_chars": r.remaining_chars, } total_chars = memory.entry_chars + user.entry_chars total_limit = memory.char_limit + user.char_limit data = { "memory": _rpt_dict(memory), "user": _rpt_dict(user), "total": { "chars": total_chars, "limit": total_limit, "estimated_tokens": total_chars // CHARS_PER_TOKEN, "usage_pct": round((total_chars / total_limit * 100) if total_limit else 0, 1), "over_budget": memory.over_budget or user.over_budget, "warning": memory.warning or user.warning, }, } print(json.dumps(data, indent=2)) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def _resolve_hermes_home(custom: str | None) -> Path: """Resolve HERMES_HOME directory.""" if custom: return Path(custom).expanduser() import os return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) def main() -> int: parser = argparse.ArgumentParser( description="Check and enforce memory budgets for hermes-agent.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument( "--hermes-home", metavar="DIR", help="Custom HERMES_HOME directory (default: $HERMES_HOME or ~/.hermes)", ) parser.add_argument( "--memory-limit", type=int, default=DEFAULT_MEMORY_CHAR_LIMIT, help=f"Character limit for MEMORY.md (default: {DEFAULT_MEMORY_CHAR_LIMIT})", ) parser.add_argument( "--user-limit", type=int, default=DEFAULT_USER_CHAR_LIMIT, help=f"Character limit for USER.md (default: {DEFAULT_USER_CHAR_LIMIT})", ) parser.add_argument( "--report", action="store_true", help="Print detailed per-file budget report", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show individual entry details in report", ) parser.add_argument( "--enforce", action="store_true", help="Trim oldest entries to fit within budget (writes to disk)", ) parser.add_argument( "--json", action="store_true", dest="json_output", help="Output report as JSON (for CI/scripting)", ) args = parser.parse_args() hermes_home = _resolve_hermes_home(args.hermes_home) memories_dir = hermes_home / "memories" # Analyze both files memory = analyze_file( memories_dir / "MEMORY.md", "MEMORY.md", args.memory_limit, ) user = analyze_file( memories_dir / "USER.md", "USER.md", args.user_limit, ) over_budget = memory.over_budget or user.over_budget trimmed = False # Enforce budget by trimming entries if args.enforce and over_budget: for rpt in (memory, user): if rpt.over_budget and rpt.exists: trimmed_entries = trim_entries(rpt) removed = rpt.entry_count - len(trimmed_entries) if removed > 0: _write_entries(rpt.path, trimmed_entries) rpt.entries = trimmed_entries rpt.entry_count = len(trimmed_entries) rpt.entry_chars = len(ENTRY_DELIMITER.join(trimmed_entries)) rpt.raw_chars = rpt.path.stat().st_size print(f" Trimmed {removed} oldest entries from {rpt.label} " f"({rpt.entry_chars:,}/{rpt.char_limit:,} chars now)") trimmed = True # Re-check after trimming over_budget = memory.over_budget or user.over_budget # Output if args.json_output: print_json(memory, user) elif args.report or args.verbose: print_report(memory, user, verbose=args.verbose) else: # Compact summary if over_budget: print("Memory budget: OVER") for rpt in (memory, user): if rpt.over_budget: print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " f"({rpt.usage_pct:.0f}%)") elif memory.warning or user.warning: print("Memory budget: WARNING") for rpt in (memory, user): if rpt.warning: print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " f"({rpt.usage_pct:.0f}%)") else: print("Memory budget: OK") for rpt in (memory, user): if rpt.exists: print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " f"({rpt.usage_pct:.0f}%)") # Suggest actions when over budget but not enforced if over_budget and not args.enforce: suggestions = [] for rpt in (memory, user): if rpt.over_budget: suggestions.append( f" - {rpt.label}: remove stale entries or run with --enforce to auto-trim" ) # Identify largest entries if rpt.entries: indexed = sorted(enumerate(rpt.entries), key=lambda x: len(x[1]), reverse=True) top3 = indexed[:3] for idx, entry in top3: preview = entry[:60].replace("\n", " ") if len(entry) > 60: preview += "..." suggestions.append( f" largest entry #{idx+1}: ({len(entry)} chars) {preview}" ) if suggestions: print() print("Suggestions:") for s in suggestions: print(s) # Exit code if trimmed: return 2 if over_budget: return 1 return 0 if __name__ == "__main__": sys.exit(main())