diff --git a/scripts/memory_budget.py b/scripts/memory_budget.py new file mode 100644 index 000000000..f307a2a10 --- /dev/null +++ b/scripts/memory_budget.py @@ -0,0 +1,374 @@ +#!/usr/bin/env python3 +"""Memory Budget Enforcement Tool for hermes-agent. + +Checks and enforces character/token budgets on MEMORY.md and USER.md files. +Designed for CI integration, pre-commit hooks, and manual health checks. + +Usage: + python scripts/memory_budget.py # Check budget (exit 0/1) + python scripts/memory_budget.py --report # Detailed breakdown + python scripts/memory_budget.py --enforce # Trim entries to fit budget + python scripts/memory_budget.py --hermes-home ~/.hermes # Custom HERMES_HOME + +Exit codes: + 0 Within budget + 1 Over budget (no trimming performed) + 2 Entries were trimmed (--enforce was used) +""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import List + +# --------------------------------------------------------------------------- +# Constants (must stay in sync with tools/memory_tool.py) +# --------------------------------------------------------------------------- + +ENTRY_DELIMITER = "\n§\n" +DEFAULT_MEMORY_CHAR_LIMIT = 2200 +DEFAULT_USER_CHAR_LIMIT = 1375 +WARN_THRESHOLD = 0.80 # alert when >80% of budget used +CHARS_PER_TOKEN = 4 # rough estimate matching agent/model_metadata.py + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class FileReport: + """Budget analysis for a single memory file.""" + label: str # "MEMORY.md" or "USER.md" + path: Path + exists: bool + char_limit: int + raw_chars: int # raw file size in chars + entry_chars: int # chars after splitting/rejoining entries + entry_count: int + entries: List[str] # individual entry texts + + @property + def usage_pct(self) -> float: + if self.char_limit <= 0: + return 0.0 + return min(100.0, (self.entry_chars / self.char_limit) * 100) + + @property + def estimated_tokens(self) -> int: + return self.entry_chars // CHARS_PER_TOKEN + + @property + def over_budget(self) -> bool: + return self.entry_chars > self.char_limit + + @property + def warning(self) -> bool: + return self.usage_pct >= (WARN_THRESHOLD * 100) + + @property + def remaining_chars(self) -> int: + return max(0, self.char_limit - self.entry_chars) + + +def _read_entries(path: Path) -> List[str]: + """Read a memory file and split into entries (matching MemoryStore logic).""" + if not path.exists(): + return [] + try: + raw = path.read_text(encoding="utf-8") + except (OSError, IOError): + return [] + if not raw.strip(): + return [] + entries = [e.strip() for e in raw.split(ENTRY_DELIMITER)] + return [e for e in entries if e] + + +def _write_entries(path: Path, entries: List[str]) -> None: + """Write entries back to a memory file.""" + content = ENTRY_DELIMITER.join(entries) if entries else "" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + + +def analyze_file(path: Path, label: str, char_limit: int) -> FileReport: + """Analyze a single memory file against its budget.""" + exists = path.exists() + entries = _read_entries(path) if exists else [] + raw_chars = path.stat().st_size if exists else 0 + joined = ENTRY_DELIMITER.join(entries) + return FileReport( + label=label, + path=path, + exists=exists, + char_limit=char_limit, + raw_chars=raw_chars, + entry_chars=len(joined), + entry_count=len(entries), + entries=entries, + ) + + +def trim_entries(report: FileReport) -> List[str]: + """Trim oldest entries until the file fits within its budget. + + Entries are removed from the front (oldest first) because memory files + append new entries at the end. + """ + entries = list(report.entries) + joined = ENTRY_DELIMITER.join(entries) + + while len(joined) > report.char_limit and entries: + entries.pop(0) + joined = ENTRY_DELIMITER.join(entries) + + return entries + + +# --------------------------------------------------------------------------- +# Reporting +# --------------------------------------------------------------------------- + +def _bar(pct: float, width: int = 30) -> str: + """Render a text progress bar.""" + filled = int(pct / 100 * width) + bar = "#" * filled + "-" * (width - filled) + return f"[{bar}]" + + +def print_report(memory: FileReport, user: FileReport, *, verbose: bool = False) -> None: + """Print a human-readable budget report.""" + total_chars = memory.entry_chars + user.entry_chars + total_limit = memory.char_limit + user.char_limit + total_tokens = total_chars // CHARS_PER_TOKEN + total_pct = (total_chars / total_limit * 100) if total_limit > 0 else 0 + + print("=" * 60) + print(" MEMORY BUDGET REPORT") + print("=" * 60) + print() + + for rpt in (memory, user): + status = "OVER " if rpt.over_budget else ("WARN" if rpt.warning else " OK ") + print(f" {rpt.label:12s} {status} {_bar(rpt.usage_pct)} {rpt.usage_pct:5.1f}%") + print(f" {'':12s} {rpt.entry_chars:,}/{rpt.char_limit:,} chars " + f"| {rpt.entry_count} entries " + f"| ~{rpt.estimated_tokens:,} tokens") + if rpt.exists and verbose and rpt.entries: + for i, entry in enumerate(rpt.entries): + preview = entry[:72].replace("\n", " ") + if len(entry) > 72: + preview += "..." + print(f" #{i+1}: ({len(entry)} chars) {preview}") + print() + + print(f" TOTAL {_bar(total_pct)} {total_pct:5.1f}%") + print(f" {total_chars:,}/{total_limit:,} chars | ~{total_tokens:,} tokens") + print() + + # Alerts + alerts = [] + for rpt in (memory, user): + if rpt.over_budget: + overshoot = rpt.entry_chars - rpt.char_limit + alerts.append( + f" CRITICAL {rpt.label} is {overshoot:,} chars over budget " + f"({rpt.entry_chars:,}/{rpt.char_limit:,}). " + f"Run with --enforce to auto-trim." + ) + elif rpt.warning: + alerts.append( + f" WARNING {rpt.label} is at {rpt.usage_pct:.0f}% capacity. " + f"Consider compressing or cleaning up entries." + ) + + if alerts: + print(" ALERTS") + print(" ------") + for a in alerts: + print(a) + print() + + +def print_json(memory: FileReport, user: FileReport) -> None: + """Print a JSON report for machine consumption.""" + import json + + def _rpt_dict(r: FileReport) -> dict: + return { + "label": r.label, + "path": str(r.path), + "exists": r.exists, + "char_limit": r.char_limit, + "entry_chars": r.entry_chars, + "entry_count": r.entry_count, + "estimated_tokens": r.estimated_tokens, + "usage_pct": round(r.usage_pct, 1), + "over_budget": r.over_budget, + "warning": r.warning, + "remaining_chars": r.remaining_chars, + } + + total_chars = memory.entry_chars + user.entry_chars + total_limit = memory.char_limit + user.char_limit + + data = { + "memory": _rpt_dict(memory), + "user": _rpt_dict(user), + "total": { + "chars": total_chars, + "limit": total_limit, + "estimated_tokens": total_chars // CHARS_PER_TOKEN, + "usage_pct": round((total_chars / total_limit * 100) if total_limit else 0, 1), + "over_budget": memory.over_budget or user.over_budget, + "warning": memory.warning or user.warning, + }, + } + print(json.dumps(data, indent=2)) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _resolve_hermes_home(custom: str | None) -> Path: + """Resolve HERMES_HOME directory.""" + if custom: + return Path(custom).expanduser() + import os + return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Check and enforce memory budgets for hermes-agent.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--hermes-home", metavar="DIR", + help="Custom HERMES_HOME directory (default: $HERMES_HOME or ~/.hermes)", + ) + parser.add_argument( + "--memory-limit", type=int, default=DEFAULT_MEMORY_CHAR_LIMIT, + help=f"Character limit for MEMORY.md (default: {DEFAULT_MEMORY_CHAR_LIMIT})", + ) + parser.add_argument( + "--user-limit", type=int, default=DEFAULT_USER_CHAR_LIMIT, + help=f"Character limit for USER.md (default: {DEFAULT_USER_CHAR_LIMIT})", + ) + parser.add_argument( + "--report", action="store_true", + help="Print detailed per-file budget report", + ) + parser.add_argument( + "--verbose", "-v", action="store_true", + help="Show individual entry details in report", + ) + parser.add_argument( + "--enforce", action="store_true", + help="Trim oldest entries to fit within budget (writes to disk)", + ) + parser.add_argument( + "--json", action="store_true", dest="json_output", + help="Output report as JSON (for CI/scripting)", + ) + + args = parser.parse_args() + hermes_home = _resolve_hermes_home(args.hermes_home) + memories_dir = hermes_home / "memories" + + # Analyze both files + memory = analyze_file( + memories_dir / "MEMORY.md", "MEMORY.md", args.memory_limit, + ) + user = analyze_file( + memories_dir / "USER.md", "USER.md", args.user_limit, + ) + + over_budget = memory.over_budget or user.over_budget + trimmed = False + + # Enforce budget by trimming entries + if args.enforce and over_budget: + for rpt in (memory, user): + if rpt.over_budget and rpt.exists: + trimmed_entries = trim_entries(rpt) + removed = rpt.entry_count - len(trimmed_entries) + if removed > 0: + _write_entries(rpt.path, trimmed_entries) + rpt.entries = trimmed_entries + rpt.entry_count = len(trimmed_entries) + rpt.entry_chars = len(ENTRY_DELIMITER.join(trimmed_entries)) + rpt.raw_chars = rpt.path.stat().st_size + print(f" Trimmed {removed} oldest entries from {rpt.label} " + f"({rpt.entry_chars:,}/{rpt.char_limit:,} chars now)") + trimmed = True + # Re-check after trimming + over_budget = memory.over_budget or user.over_budget + + # Output + if args.json_output: + print_json(memory, user) + elif args.report or args.verbose: + print_report(memory, user, verbose=args.verbose) + else: + # Compact summary + if over_budget: + print("Memory budget: OVER") + for rpt in (memory, user): + if rpt.over_budget: + print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " + f"({rpt.usage_pct:.0f}%)") + elif memory.warning or user.warning: + print("Memory budget: WARNING") + for rpt in (memory, user): + if rpt.warning: + print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " + f"({rpt.usage_pct:.0f}%)") + else: + print("Memory budget: OK") + for rpt in (memory, user): + if rpt.exists: + print(f" {rpt.label}: {rpt.entry_chars:,}/{rpt.char_limit:,} chars " + f"({rpt.usage_pct:.0f}%)") + + # Suggest actions when over budget but not enforced + if over_budget and not args.enforce: + suggestions = [] + for rpt in (memory, user): + if rpt.over_budget: + suggestions.append( + f" - {rpt.label}: remove stale entries or run with --enforce to auto-trim" + ) + # Identify largest entries + if rpt.entries: + indexed = sorted(enumerate(rpt.entries), key=lambda x: len(x[1]), reverse=True) + top3 = indexed[:3] + for idx, entry in top3: + preview = entry[:60].replace("\n", " ") + if len(entry) > 60: + preview += "..." + suggestions.append( + f" largest entry #{idx+1}: ({len(entry)} chars) {preview}" + ) + if suggestions: + print() + print("Suggestions:") + for s in suggestions: + print(s) + + # Exit code + if trimmed: + return 2 + if over_budget: + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main())