Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
fbc91dea82 fix(#296): poka-yoke — prevent silent context overflow with mandatory compression
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m5s
## Problem
When context approaches model limit, no guaranteed fail-safe. Agent could
ignore compression warnings, causing silent data loss when context overflows.

## Solution (Poka-yoke: Mitigation)
1. Hard token budget check at 85% of context length — auto-compression
2. At 95%, BLOCK tool calls and force compression before continuing
3. Add /context-status slash command (aliases: /ctx, /context)
4. Log context overflow events separately from normal compression

## Changes

### agent/context_compressor.py
- WARNING_THRESHOLD (0.85), CRITICAL_THRESHOLD (0.95)
- should_auto_compress(), should_block_tools(), get_usage_level()
- Extended get_status() with usage_level and thresholds

### run_agent.py
- 85%/95% checks before compression in main loop
- overflow_triggered param on _compress_context()
- Separate CONTEXT WARNING/CRITICAL logging

### hermes_cli/commands.py
- CommandDef("context-status", ...) with aliases (ctx, context)

### cli.py
- _show_context_status() with progress bar and threshold display

Refs #296
2026-04-13 21:38:13 -04:00
6 changed files with 165 additions and 479 deletions

View File

@@ -138,6 +138,48 @@ class ContextCompressor:
rough_estimate = estimate_messages_tokens_rough(messages)
return rough_estimate >= self.threshold_tokens
# ── Poka-yoke: Hard context overflow safeguards (#296) ──────────────
WARNING_THRESHOLD = 0.85 # 85% — auto-compression trigger
CRITICAL_THRESHOLD = 0.95 # 95% — block tools, force compression
def get_context_usage_percent(self, prompt_tokens: int = None) -> float:
"""Return context usage as a percentage of total context length (0-100)."""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
if self.context_length <= 0:
return 0.0
return min(100.0, (tokens / self.context_length) * 100)
def get_usage_level(self, prompt_tokens: int = None) -> str:
"""Return the current context pressure level: 'normal', 'warning', 'critical'."""
pct = self.get_context_usage_percent(prompt_tokens) / 100.0
if pct >= self.CRITICAL_THRESHOLD:
return "critical"
elif pct >= self.WARNING_THRESHOLD:
return "warning"
return "normal"
def should_auto_compress(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the WARNING threshold (85% of context length).
This is a HARD trigger — unlike should_compress() which uses the
configurable threshold_percent, this always fires at 85% regardless
of configuration. Poka-yoke: don't trust voluntary behavior.
"""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
warning_tokens = int(self.context_length * self.WARNING_THRESHOLD)
return tokens >= warning_tokens
def should_block_tools(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the CRITICAL threshold (95% of context length).
When True, the agent MUST NOT make further tool calls until compression
completes. Poka-yoke: enforce the constraint mechanically.
"""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
critical_tokens = int(self.context_length * self.CRITICAL_THRESHOLD)
return tokens >= critical_tokens
def get_status(self) -> Dict[str, Any]:
"""Get current compression status for display/logging."""
return {
@@ -146,6 +188,9 @@ class ContextCompressor:
"context_length": self.context_length,
"usage_percent": min(100, (self.last_prompt_tokens / self.context_length * 100)) if self.context_length else 0,
"compression_count": self.compression_count,
"usage_level": self.get_usage_level(),
"warning_threshold_tokens": int(self.context_length * self.WARNING_THRESHOLD),
"critical_threshold_tokens": int(self.context_length * self.CRITICAL_THRESHOLD),
}
# ------------------------------------------------------------------

46
cli.py
View File

@@ -4658,6 +4658,8 @@ def _upload_0x0st(content: str) -> str | None:
self._handle_reasoning_command(cmd_original)
elif canonical == "compress":
self._manual_compress()
elif canonical == "context-status":
self._show_context_status()
elif canonical == "usage":
self._show_usage()
elif canonical == "insights":
@@ -5474,6 +5476,50 @@ def _upload_0x0st(content: str) -> str | None:
except Exception as e:
print(f" ❌ Compression failed: {e}")
def _show_context_status(self):
"""Show context usage, compression history, and remaining budget."""
if not self.agent:
print("(._.) No active agent -- send a message first.")
return
compressor = getattr(self.agent, "context_compressor", None)
if not compressor:
print("(._.) Context compressor not initialized.")
return
status = compressor.get_status()
# Calculate real token usage
from agent.model_metadata import estimate_messages_tokens_rough
real_tokens = status.get("last_prompt_tokens", 0)
if not real_tokens and self.conversation_history:
real_tokens = estimate_messages_tokens_rough(self.conversation_history)
context_length = status.get("context_length", 0)
usage_percent = (real_tokens / context_length * 100) if context_length > 0 else 0
usage_level = status.get("usage_level", "normal")
# Format usage level with emoji
level_emoji = {"normal": "", "warning": "⚠️", "critical": "🔴"}.get(usage_level, "")
print(f"\n📊 Context Status")
print(f" {level_emoji} Status: {usage_level.upper()}")
print(f" Usage: {real_tokens:,} / {context_length:,} tokens ({usage_percent:.1f}%)")
print(f" Compression threshold: {status.get('threshold_tokens', 0):,} tokens ({status.get('threshold_tokens', 0) / context_length * 100:.0f}%)" if context_length else "")
print(f" Warning threshold (85%): {status.get('warning_threshold_tokens', 0):,} tokens")
print(f" Critical threshold (95%): {status.get('critical_threshold_tokens', 0):,} tokens")
print(f" Compressions: {status.get('compression_count', 0)}")
# Progress bar
bar_length = 40
filled = int(bar_length * min(usage_percent, 100) / 100)
bar = "" * filled + "" * (bar_length - filled)
print(f" [{bar}] {usage_percent:.1f}%")
# Remaining budget
remaining = max(0, context_length - real_tokens)
print(f" Remaining: {remaining:,} tokens")
def _show_usage(self):
"""Show cumulative token usage for the current session."""
if not self.agent:

View File

@@ -1,89 +0,0 @@
#!/usr/bin/env python3
"""
Example: Using session templates for code-first seeding.
This script demonstrates how to use the session template system
to pre-seed new sessions with successful tool call patterns.
"""
import sys
from pathlib import Path
# Add the parent directory to the path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.session_templates import SessionTemplates, TaskType
def main():
"""Demonstrate session template usage."""
# Create template manager
templates = SessionTemplates()
print("Session Templates Example")
print("=" * 50)
# List existing templates
print("\n1. Existing templates:")
template_list = templates.list_templates()
if template_list:
for t in template_list:
print(f" - {t.name}: {t.task_type.value} ({len(t.examples)} examples)")
else:
print(" No templates found")
# Example: Create a template from a session
print("\n2. Creating a template from a session:")
print(" (This would normally use a real session ID)")
# Example: Get a template for code tasks
print("\n3. Getting a template for CODE tasks:")
code_template = templates.get_template(TaskType.CODE)
if code_template:
print(f" Found template: {code_template.name}")
print(f" Type: {code_template.task_type.value}")
print(f" Examples: {len(code_template.examples)}")
# Show first example
if code_template.examples:
example = code_template.examples[0]
print(f" First example: {example.tool_name}")
print(f" Arguments: {example.arguments}")
print(f" Result preview: {example.result[:100]}...")
else:
print(" No CODE template found")
# Example: Inject template into messages
print("\n4. Injecting template into messages:")
if code_template:
# Create sample messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Help me write some code"}
]
# Inject template
updated_messages = templates.inject_into_messages(code_template, messages)
print(f" Original messages: {len(messages)}")
print(f" Updated messages: {len(updated_messages)}")
print(f" Template usage count: {code_template.usage_count}")
# Show the injection
print("\n Injected messages:")
for i, msg in enumerate(updated_messages[:6]): # Show first 6
role = msg.get('role', 'unknown')
content = msg.get('content', '')
if content:
content_preview = content[:50] + "..." if len(content) > 50 else content
print(f" {i}: {role} - {content_preview}")
else:
print(f" {i}: {role} - (tool call)")
print("\n" + "=" * 50)
print("Example complete!")
if __name__ == "__main__":
main()

View File

@@ -60,6 +60,8 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("branch", "Branch the current session (explore a different path)", "Session",
aliases=("fork",), args_hint="[name]"),
CommandDef("compress", "Manually compress conversation context", "Session"),
CommandDef("context-status", "Show context usage, compression history, and remaining budget", "Session",
aliases=("ctx", "context")),
CommandDef("rollback", "List or restore filesystem checkpoints", "Session",
args_hint="[number]"),
CommandDef("stop", "Kill all running background processes", "Session"),

View File

@@ -5931,18 +5931,28 @@ class AIAgent:
if messages and messages[-1].get("_flush_sentinel") == _sentinel:
messages.pop()
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default") -> tuple:
def _compress_context(self, messages: list, system_message: str, *, approx_tokens: int = None, task_id: str = "default", overflow_triggered: bool = False) -> tuple:
"""Compress conversation context and split the session in SQLite.
Returns:
(compressed_messages, new_system_prompt) tuple
"""
_pre_msg_count = len(messages)
logger.info(
"context compression started: session=%s messages=%d tokens=~%s model=%s",
self.session_id or "none", _pre_msg_count,
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
)
# Log overflow events separately (#296)
if overflow_triggered:
logger.warning(
"CONTEXT OVERFLOW COMPRESSION: session=%s messages=%d tokens=~%s model=%s "
"reason=overflow_triggered",
self.session_id or "none", _pre_msg_count,
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
)
else:
logger.info(
"context compression started: session=%s messages=%d tokens=~%s model=%s",
self.session_id or "none", _pre_msg_count,
f"{approx_tokens:,}" if approx_tokens else "unknown", self.model,
)
# Pre-compression memory flush: let the model save memories before they're lost
self.flush_memories(messages, min_turns=0)
@@ -9001,6 +9011,62 @@ class AIAgent:
self._context_pressure_warned = True
self._emit_context_pressure(_compaction_progress, _compressor)
# ── Poka-yoke: Hard context overflow safeguards (#296) ──────────
_usage_level = _compressor.get_usage_level(_real_tokens)
# Log context overflow events separately
if _usage_level == "critical" and not getattr(self, '_context_critical_logged', False):
self._context_critical_logged = True
logger.warning(
"CONTEXT CRITICAL: %.1f%% of context used (%d/%d tokens). "
"Tool calls BLOCKED until compression completes.",
_compressor.get_context_usage_percent(_real_tokens),
_real_tokens, _compressor.context_length,
)
elif _usage_level == "warning" and not getattr(self, '_context_warning_logged', False):
self._context_warning_logged = True
logger.warning(
"CONTEXT WARNING: %.1f%% of context used (%d/%d tokens). "
"Auto-compression triggered.",
_compressor.get_context_usage_percent(_real_tokens),
_real_tokens, _compressor.context_length,
)
# Reset log flags when context drops below warning
if _usage_level == "normal":
self._context_critical_logged = False
self._context_warning_logged = False
# CRITICAL (95%): Block tool calls — force compression
if self.compression_enabled and _compressor.should_block_tools(_real_tokens):
logger.warning(
"CONTEXT CRITICAL: Blocking tool calls and forcing compression. "
"%.1f%% of context used.",
_compressor.get_context_usage_percent(_real_tokens),
)
messages, active_system_prompt = self._compress_context(
messages, system_message,
approx_tokens=_real_tokens,
task_id=effective_task_id,
)
conversation_history = None
self._session_messages = messages
self._save_session_log(messages)
continue
# WARNING (85%): Auto-compression trigger (hard-coded threshold)
if self.compression_enabled and _compressor.should_auto_compress(_real_tokens):
messages, active_system_prompt = self._compress_context(
messages, system_message,
approx_tokens=_real_tokens,
task_id=effective_task_id,
)
conversation_history = None
self._session_messages = messages
self._save_session_log(messages)
continue
# Standard compression (configurable threshold_percent)
if self.compression_enabled and _compressor.should_compress(_real_tokens):
messages, active_system_prompt = self._compress_context(
messages, system_message,

View File

@@ -1,384 +0,0 @@
"""
Session templates for code-first seeding.
Based on research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
improve over time. File-heavy sessions degrade. The key is deterministic feedback loops.
This module provides:
1. Template extraction from successful sessions
2. Task type classification (code, file, research)
3. Template storage in ~/.hermes/session-templates/
4. Template injection into new sessions
"""
import json
import logging
import os
import sqlite3
import time
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
# Default template directory
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
"""Task type classification."""
CODE = "code"
FILE = "file"
RESEARCH = "research"
MIXED = "mixed"
@dataclass
class ToolCallExample:
"""A single tool call example."""
tool_name: str
arguments: Dict[str, Any]
result: str
success: bool
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallExample':
return cls(**data)
@dataclass
class SessionTemplate:
"""A session template with tool call examples."""
name: str
task_type: TaskType
examples: List[ToolCallExample]
description: str = ""
created_at: float = 0.0
usage_count: int = 0
def __post_init__(self):
if self.created_at == 0.0:
self.created_at = time.time()
def to_dict(self) -> Dict[str, Any]:
data = asdict(self)
data['task_type'] = self.task_type.value
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
data['task_type'] = TaskType(data['task_type'])
examples_data = data.get('examples', [])
data['examples'] = [ToolCallExample.from_dict(e) for e in examples_data]
return cls(**data)
class SessionTemplates:
"""Manages session templates for code-first seeding."""
def __init__(self, template_dir: Optional[Path] = None):
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
self.template_dir.mkdir(parents=True, exist_ok=True)
self.templates: Dict[str, SessionTemplate] = {}
self._load_templates()
def _load_templates(self):
"""Load all templates from disk."""
for template_file in self.template_dir.glob("*.json"):
try:
with open(template_file, 'r') as f:
data = json.load(f)
template = SessionTemplate.from_dict(data)
self.templates[template.name] = template
except Exception as e:
logger.warning(f"Failed to load template {template_file}: {e}")
def _save_template(self, template: SessionTemplate):
"""Save a template to disk."""
template_file = self.template_dir / f"{template.name}.json"
with open(template_file, 'w') as f:
json.dump(template.to_dict(), f, indent=2)
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
"""Classify task type based on tool calls."""
if not tool_calls:
return TaskType.MIXED
# Count tool types
code_tools = {'execute_code', 'code_execution'}
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
tool_names = [tc.get('tool_name', '') for tc in tool_calls]
code_count = sum(1 for t in tool_names if t in code_tools)
file_count = sum(1 for t in tool_names if t in file_tools)
research_count = sum(1 for t in tool_names if t in research_tools)
total = len(tool_calls)
if total == 0:
return TaskType.MIXED
# Determine dominant type (60% threshold)
if code_count / total > 0.6:
return TaskType.CODE
elif file_count / total > 0.6:
return TaskType.FILE
elif research_count / total > 0.6:
return TaskType.RESEARCH
else:
return TaskType.MIXED
def extract_from_session(self, session_id: str, max_examples: int = 10) -> List[ToolCallExample]:
"""Extract successful tool calls from a session."""
db_path = Path.home() / ".hermes" / "state.db"
if not db_path.exists():
return []
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
# Get messages with tool calls
cursor = conn.execute("""
SELECT role, content, tool_calls, tool_name
FROM messages
WHERE session_id = ?
ORDER BY timestamp
LIMIT 100
""", (session_id,))
messages = cursor.fetchall()
conn.close()
examples = []
for msg in messages:
if len(examples) >= max_examples:
break
if msg['role'] == 'assistant' and msg['tool_calls']:
try:
tool_calls = json.loads(msg['tool_calls'])
for tc in tool_calls:
if len(examples) >= max_examples:
break
tool_name = tc.get('function', {}).get('name')
if not tool_name:
continue
try:
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
except:
arguments = {}
examples.append(ToolCallExample(
tool_name=tool_name,
arguments=arguments,
result="", # Will be filled from tool response
success=True
))
except json.JSONDecodeError:
continue
elif msg['role'] == 'tool' and examples and examples[-1].result == "":
examples[-1].result = msg['content'] or ""
return examples
except Exception as e:
logger.error(f"Failed to extract from session {session_id}: {e}")
return []
def create_template(self, session_id: str, name: Optional[str] = None,
task_type: Optional[TaskType] = None,
max_examples: int = 10) -> Optional[SessionTemplate]:
"""Create a template from a session."""
examples = self.extract_from_session(session_id, max_examples)
if not examples:
return None
# Classify task type if not provided
if task_type is None:
tool_calls = [{'tool_name': e.tool_name} for e in examples]
task_type = self.classify_task_type(tool_calls)
# Generate name if not provided
if name is None:
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
# Create template
template = SessionTemplate(
name=name,
task_type=task_type,
examples=examples,
description=f"Template with {len(examples)} examples"
)
# Save template
self.templates[name] = template
self._save_template(template)
logger.info(f"Created template {name} with {len(examples)} examples")
return template
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
"""Get the best template for a task type."""
matching = [t for t in self.templates.values() if t.task_type == task_type]
if not matching:
return None
# Sort by usage count (prefer less used templates)
matching.sort(key=lambda t: t.usage_count)
return matching[0]
def inject_into_messages(self, template: SessionTemplate,
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Inject template examples into messages."""
if not template.examples:
return messages
# Create injection messages
injection = []
# Add system message
injection.append({
"role": "system",
"content": f"Session template: {template.name} ({template.task_type.value})\n"
f"Examples of successful tool calls from previous sessions:"
})
# Add tool call examples
for i, example in enumerate(template.examples):
# Assistant message with tool call
injection.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"template_{i}",
"type": "function",
"function": {
"name": example.tool_name,
"arguments": json.dumps(example.arguments)
}
}]
})
# Tool response
injection.append({
"role": "tool",
"tool_call_id": f"template_{i}",
"content": example.result
})
# Insert after system messages
insert_index = 0
for i, msg in enumerate(messages):
if msg.get("role") != "system":
break
insert_index = i + 1
# Insert injection
for i, msg in enumerate(injection):
messages.insert(insert_index + i, msg)
# Update usage count
template.usage_count += 1
self._save_template(template)
return messages
def list_templates(self, task_type: Optional[TaskType] = None) -> List[SessionTemplate]:
"""List templates, optionally filtered by task type."""
templates = list(self.templates.values())
if task_type:
templates = [t for t in templates if t.task_type == task_type]
templates.sort(key=lambda t: t.created_at, reverse=True)
return templates
def delete_template(self, name: str) -> bool:
"""Delete a template."""
if name not in self.templates:
return False
del self.templates[name]
template_file = self.template_dir / f"{name}.json"
if template_file.exists():
template_file.unlink()
logger.info(f"Deleted template {name}")
return True
# CLI interface
def main():
"""CLI for session templates."""
import argparse
parser = argparse.ArgumentParser(description="Session Templates")
subparsers = parser.add_subparsers(dest="command")
# List templates
list_parser = subparsers.add_parser("list", help="List templates")
list_parser.add_argument("--type", choices=["code", "file", "research", "mixed"])
# Create template
create_parser = subparsers.add_parser("create", help="Create template from session")
create_parser.add_argument("session_id", help="Session ID")
create_parser.add_argument("--name", help="Template name")
create_parser.add_argument("--type", choices=["code", "file", "research", "mixed"])
create_parser.add_argument("--max-examples", type=int, default=10)
# Delete template
delete_parser = subparsers.add_parser("delete", help="Delete template")
delete_parser.add_argument("name", help="Template name")
args = parser.parse_args()
templates = SessionTemplates()
if args.command == "list":
task_type = TaskType(args.type) if args.type else None
template_list = templates.list_templates(task_type)
if not template_list:
print("No templates found")
return
print(f"Found {len(template_list)} templates:")
for t in template_list:
print(f" {t.name}: {t.task_type.value} ({len(t.examples)} examples, used {t.usage_count} times)")
elif args.command == "create":
task_type = TaskType(args.type) if args.type else None
template = templates.create_template(
args.session_id,
name=args.name,
task_type=task_type,
max_examples=args.max_examples
)
if template:
print(f"Created template: {template.name}")
print(f" Type: {template.task_type.value}")
print(f" Examples: {len(template.examples)}")
else:
print("Failed to create template")
elif args.command == "delete":
if templates.delete_template(args.name):
print(f"Deleted template: {args.name}")
else:
print(f"Template not found: {args.name}")
else:
parser.print_help()
if __name__ == "__main__":
main()