Compare commits

..

1 Commits

Author SHA1 Message Date
Metatron
cc1c061a6b perf: cap marathon sessions at 200 messages with forced compression (closes #326)
Some checks failed
Notebook CI / notebook-smoke (push) Failing after 1m46s
Forge CI / smoke-and-build (pull_request) Failing after 51s
Add max_messages limit (default 200, configurable via HERMES_MAX_MESSAGES
env var). When conversation message count exceeds the cap, force a
compression pass that creates a new session lineage.

Prevents error cascading and tool fixation in marathon sessions (200+
messages) which empirically show 45-84% error rates.

Uses existing _compress_context mechanism — agent continues seamlessly
in a fresh session with compressed history.
2026-04-13 21:31:03 -04:00
2 changed files with 46 additions and 55 deletions

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -644,56 +643,7 @@ def _build_job_prompt(job: dict) -> str:
return "\n".join(parts)
# Regex patterns for local service references that fail on cloud endpoints
_CLOUD_INCOMPATIBLE_PATTERNS = [
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
"check service health via available tools"),
]
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
"""Rewrite prompt instructions that assume local service access when running on cloud.
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
Instead of just warning, this rewrites the instruction to a cloud-compatible
equivalent that the agent can actually execute.
Returns the (possibly rewritten) prompt.
"""
try:
from agent.model_metadata import is_local_endpoint
except ImportError:
return prompt
if is_local_endpoint(base_url or ""):
return prompt # Local — no rewrite needed
rewritten = prompt
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
rewritten = pattern.sub(replacement, rewritten)
if rewritten != prompt:
rewritten = (
"[NOTE: Some instructions were adjusted for cloud execution. "
"Local service checks were rewritten to use available tools.]
"
+ rewritten
)
return rewritten
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.

View File

@@ -586,6 +586,9 @@ class AIAgent:
self.model = model
self.max_iterations = max_iterations
# Marathon session cap: force compression when messages exceed this.
# Eliminates error-cascading in 200+ message sessions (#326).
self.max_messages = int(os.environ.get("HERMES_MAX_MESSAGES", "200"))
# Shared iteration budget — parent creates, children inherit.
# Consumed by every LLM turn across parent + all subagents.
self.iteration_budget = iteration_budget or IterationBudget(max_iterations)
@@ -1001,10 +1004,30 @@ class AIAgent:
self._session_db = session_db
self._parent_session_id = parent_session_id
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
# Lazy session creation: defer until first message flush (#314).
# _flush_messages_to_session_db() calls ensure_session() which uses
# INSERT OR IGNORE — creating the row only when messages arrive.
# This eliminates 32% of sessions that are created but never used.
if self._session_db:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
model=self.model,
model_config={
"max_iterations": self.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
},
user_id=None,
parent_session_id=self._parent_session_id,
)
except Exception as e:
# Transient SQLite lock contention (e.g. CLI and gateway writing
# concurrently) must NOT permanently disable session_search for
# this agent. Keep _session_db alive — subsequent message
# flushes and session_search calls will still work once the
# lock clears. The session row may be missing from the index
# for this run, but that is recoverable (flushes upsert rows).
logger.warning(
"Session DB create_session failed (session_search still available): %s", e
)
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore
@@ -7308,6 +7331,24 @@ class AIAgent:
pass
while api_call_count < self.max_iterations and self.iteration_budget.remaining > 0:
# Marathon session guard: if message count exceeds max_messages,
# force a compression pass to start a fresh session lineage.
# Prevents error cascading and tool fixation in long sessions. #326
if (
self.compression_enabled
and len(messages) > self.max_messages
):
if not self.quiet_mode:
self._safe_print(
f"\n🔄 Marathon session limit hit ({len(messages)} msgs > "
f"{self.max_messages} cap). Compressing and continuing..."
)
messages, active_system_prompt = self._compress_context(
messages, system_message,
approx_tokens=estimate_messages_tokens_rough(messages),
task_id=effective_task_id,
)
# Reset per-turn checkpoint dedup so each iteration can take one snapshot
self._checkpoint_mgr.new_turn()