Files
hermes-agent/agent/context_compressor.py

384 lines
16 KiB
Python
Raw Normal View History

2026-02-21 22:31:43 -08:00
"""Automatic context window compression for long conversations.
Self-contained class with its own OpenAI client for summarization.
Uses Gemini Flash (cheap/fast) to summarize middle turns while
protecting head and tail context.
"""
import logging
import os
from typing import Any, Dict, List, Optional
2026-02-21 22:31:43 -08:00
from agent.auxiliary_client import call_llm
2026-02-21 22:31:43 -08:00
from agent.model_metadata import (
get_model_context_length,
estimate_messages_tokens_rough,
)
logger = logging.getLogger(__name__)
SUMMARY_PREFIX = (
"[CONTEXT COMPACTION] Earlier turns in this conversation were compacted "
"to save context space. The summary below describes work that was "
"already completed, and the current session state may still reflect "
"that work (for example, files may already be changed). Use the summary "
"and the current state to continue from where things left off, and "
"avoid repeating work:"
)
LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:"
2026-02-21 22:31:43 -08:00
class ContextCompressor:
"""Compresses conversation context when approaching the model's context limit.
Algorithm: protect first N + last N turns, summarize everything in between.
Token tracking uses actual counts from API responses for accuracy.
"""
def __init__(
self,
model: str,
threshold_percent: float = 0.50,
2026-02-21 22:31:43 -08:00
protect_first_n: int = 3,
protect_last_n: int = 4,
summary_target_tokens: int = 2500,
2026-02-21 22:31:43 -08:00
quiet_mode: bool = False,
summary_model_override: str = None,
base_url: str = "",
api_key: str = "",
2026-02-21 22:31:43 -08:00
):
self.model = model
self.base_url = base_url
self.api_key = api_key
2026-02-21 22:31:43 -08:00
self.threshold_percent = threshold_percent
self.protect_first_n = protect_first_n
self.protect_last_n = protect_last_n
self.summary_target_tokens = summary_target_tokens
self.quiet_mode = quiet_mode
self.context_length = get_model_context_length(model, base_url=base_url, api_key=api_key)
2026-02-21 22:31:43 -08:00
self.threshold_tokens = int(self.context_length * threshold_percent)
self.compression_count = 0
self._context_probed = False # True after a step-down from context error
2026-02-21 22:31:43 -08:00
self.last_prompt_tokens = 0
self.last_completion_tokens = 0
self.last_total_tokens = 0
self.summary_model = summary_model_override or ""
2026-02-21 22:31:43 -08:00
def update_from_response(self, usage: Dict[str, Any]):
"""Update tracked token usage from API response."""
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
self.last_completion_tokens = usage.get("completion_tokens", 0)
self.last_total_tokens = usage.get("total_tokens", 0)
def should_compress(self, prompt_tokens: int = None) -> bool:
"""Check if context exceeds the compression threshold."""
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
return tokens >= self.threshold_tokens
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
"""Quick pre-flight check using rough estimate (before API call)."""
rough_estimate = estimate_messages_tokens_rough(messages)
return rough_estimate >= self.threshold_tokens
def get_status(self) -> Dict[str, Any]:
"""Get current compression status for display/logging."""
return {
"last_prompt_tokens": self.last_prompt_tokens,
"threshold_tokens": self.threshold_tokens,
"context_length": self.context_length,
"usage_percent": (self.last_prompt_tokens / self.context_length * 100) if self.context_length else 0,
"compression_count": self.compression_count,
}
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
"""Generate a concise summary of conversation turns.
2026-02-21 22:31:43 -08:00
Tries the auxiliary model first, then falls back to the user's main
model. Returns None if all attempts fail the caller should drop
the middle turns without a summary rather than inject a useless
placeholder.
"""
2026-02-21 22:31:43 -08:00
parts = []
for msg in turns_to_summarize:
role = msg.get("role", "unknown")
content = msg.get("content") or ""
2026-02-21 22:31:43 -08:00
if len(content) > 2000:
content = content[:1000] + "\n...[truncated]...\n" + content[-500:]
tool_calls = msg.get("tool_calls", [])
if tool_calls:
tool_names = [tc.get("function", {}).get("name", "?") for tc in tool_calls if isinstance(tc, dict)]
content += f"\n[Tool calls: {', '.join(tool_names)}]"
parts.append(f"[{role.upper()}]: {content}")
content_to_summarize = "\n\n".join(parts)
prompt = f"""Create a concise handoff summary for a later assistant that will continue this conversation after earlier turns are compacted.
2026-02-21 22:31:43 -08:00
Describe:
2026-02-21 22:31:43 -08:00
1. What actions were taken (tool calls, searches, file operations)
2. Key information or results obtained
3. Important decisions, constraints, or user preferences
4. Relevant data, file names, outputs, or next steps needed to continue
2026-02-21 22:31:43 -08:00
Keep it factual, concise, and focused on helping the next assistant resume without repeating work. Target ~{self.summary_target_tokens} tokens.
2026-02-21 22:31:43 -08:00
---
TURNS TO SUMMARIZE:
{content_to_summarize}
---
Write only the summary body. Do not include any preamble or prefix; the system will add the handoff wrapper."""
2026-02-21 22:31:43 -08:00
# Use the centralized LLM router — handles provider resolution,
# auth, and fallback internally.
try:
call_kwargs = {
"task": "compression",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": self.summary_target_tokens * 2,
"timeout": 30.0,
}
if self.summary_model:
call_kwargs["model"] = self.summary_model
response = call_llm(**call_kwargs)
feat(stt): add free local whisper transcription via faster-whisper (#1185) * fix: Home Assistant event filtering now closed by default Previously, when no watch_domains or watch_entities were configured, ALL state_changed events passed through to the agent, causing users to be flooded with notifications for every HA entity change. Now events are dropped by default unless the user explicitly configures: - watch_domains: list of domains to monitor (e.g. climate, light) - watch_entities: list of specific entity IDs to monitor - watch_all: true (new option — opt-in to receive all events) A warning is logged at connect time if no filters are configured, guiding users to set up their HA platform config. All 49 gateway HA tests + 52 HA tool tests pass. * docs: update Home Assistant integration documentation - homeassistant.md: Fix event filtering docs to reflect closed-by-default behavior. Add watch_all option. Replace Python dict config example with YAML. Fix defaults table (was incorrectly showing 'all'). Add required configuration warning admonition. - environment-variables.md: Add HASS_TOKEN and HASS_URL to Messaging section. - messaging/index.md: Add Home Assistant to description, architecture diagram, platform toolsets table, and Next Steps links. * fix(terminal): strip provider env vars from background and PTY subprocesses Extends the env var blocklist from #1157 to also cover the two remaining leaky paths in process_registry.py: - spawn_local() PTY path (line 156) - spawn_local() background Popen path (line 197) Both were still using raw os.environ, leaking provider vars to background processes and interactive PTY sessions. Now uses the same dynamic _HERMES_PROVIDER_ENV_BLOCKLIST from local.py. Explicit env_vars passed to spawn_local() still override the blocklist, matching the existing behavior for callers that intentionally need these. Gap identified by PR #1004 (@PeterFile). * feat(delegate): add observability metadata to subagent results Enrich delegate_task results with metadata from the child AIAgent: - model: which model the child used - exit_reason: completed | interrupted | max_iterations - tokens.input / tokens.output: token counts - tool_trace: per-tool-call trace with byte sizes and ok/error status Tool trace uses tool_call_id matching to correctly pair parallel tool calls with their results, with a fallback for messages without IDs. Cherry-picked from PR #872 by @omerkaz, with fixes: - Fixed parallel tool call trace pairing (was always updating last entry) - Removed redundant 'iterations' field (identical to existing 'api_calls') - Added test for parallel tool call trace correctness Co-authored-by: omerkaz <omerkaz@users.noreply.github.com> * feat(stt): add free local whisper transcription via faster-whisper Replace OpenAI-only STT with a dual-provider system mirroring the TTS architecture (Edge TTS free / ElevenLabs paid): STT: faster-whisper local (free, default) / OpenAI Whisper API (paid) Changes: - tools/transcription_tools.py: Full rewrite with provider dispatch, config loading, local faster-whisper backend, and OpenAI API backend. Auto-downloads model (~150MB for 'base') on first voice message. Singleton model instance reused across calls. - pyproject.toml: Add faster-whisper>=1.0.0 as core dependency - hermes_cli/config.py: Expand stt config to match TTS pattern with provider selection and per-provider model settings - agent/context_compressor.py: Fix .strip() crash when LLM returns non-string content (dict from llama.cpp, None). Fixes #1100 partially. - tests/: 23 new tests for STT providers + 2 for compressor fix - docs/: Updated Voice & TTS page with STT provider table, model sizes, config examples, and fallback behavior Fallback behavior: - Local not installed → OpenAI API (if key set) - OpenAI key not set → local whisper (if installed) - Neither → graceful error message to user Co-authored-by: Jah-yee <Jah-yee@users.noreply.github.com> --------- Co-authored-by: omerkaz <omerkaz@users.noreply.github.com> Co-authored-by: Jah-yee <Jah-yee@users.noreply.github.com>
2026-03-13 11:11:05 -07:00
content = response.choices[0].message.content
# Handle cases where content is not a string (e.g., dict from llama.cpp)
if not isinstance(content, str):
content = str(content) if content else ""
summary = content.strip()
return self._with_summary_prefix(summary)
except RuntimeError:
logging.warning("Context compression: no provider available for "
"summary. Middle turns will be dropped without summary.")
return None
except Exception as e:
logging.warning("Failed to generate context summary: %s", e)
return None
@staticmethod
def _with_summary_prefix(summary: str) -> str:
"""Normalize summary text to the current compaction handoff format."""
text = (summary or "").strip()
for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX):
if text.startswith(prefix):
text = text[len(prefix):].lstrip()
break
return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIX
# ------------------------------------------------------------------
# Tool-call / tool-result pair integrity helpers
# ------------------------------------------------------------------
@staticmethod
def _get_tool_call_id(tc) -> str:
"""Extract the call ID from a tool_call entry (dict or SimpleNamespace)."""
if isinstance(tc, dict):
return tc.get("id", "")
return getattr(tc, "id", "") or ""
def _sanitize_tool_pairs(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Fix orphaned tool_call / tool_result pairs after compression.
Two failure modes:
1. A tool *result* references a call_id whose assistant tool_call was
removed (summarized/truncated). The API rejects this with
"No tool call found for function call output with call_id ...".
2. An assistant message has tool_calls whose results were dropped.
The API rejects this because every tool_call must be followed by
a tool result with the matching call_id.
This method removes orphaned results and inserts stub results for
orphaned calls so the message list is always well-formed.
"""
surviving_call_ids: set = set()
for msg in messages:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
cid = self._get_tool_call_id(tc)
if cid:
surviving_call_ids.add(cid)
result_call_ids: set = set()
for msg in messages:
if msg.get("role") == "tool":
cid = msg.get("tool_call_id")
if cid:
result_call_ids.add(cid)
# 1. Remove tool results whose call_id has no matching assistant tool_call
orphaned_results = result_call_ids - surviving_call_ids
if orphaned_results:
messages = [
m for m in messages
if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results)
]
if not self.quiet_mode:
logger.info("Compression sanitizer: removed %d orphaned tool result(s)", len(orphaned_results))
# 2. Add stub results for assistant tool_calls whose results were dropped
missing_results = surviving_call_ids - result_call_ids
if missing_results:
patched: List[Dict[str, Any]] = []
for msg in messages:
patched.append(msg)
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
cid = self._get_tool_call_id(tc)
if cid in missing_results:
patched.append({
"role": "tool",
"content": "[Result from earlier conversation — see context summary above]",
"tool_call_id": cid,
})
messages = patched
if not self.quiet_mode:
logger.info("Compression sanitizer: added %d stub tool result(s)", len(missing_results))
return messages
def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int:
"""Push a compress-start boundary forward past any orphan tool results.
If ``messages[idx]`` is a tool result, slide forward until we hit a
non-tool message so we don't start the summarised region mid-group.
"""
while idx < len(messages) and messages[idx].get("role") == "tool":
idx += 1
return idx
def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int:
"""Pull a compress-end boundary backward to avoid splitting a
tool_call / result group.
If the message just before ``idx`` is an assistant message with
tool_calls, those tool results will start at ``idx`` and would be
separated from their parent. Move backwards to include the whole
group in the summarised region.
"""
if idx <= 0 or idx >= len(messages):
return idx
prev = messages[idx - 1]
if prev.get("role") == "assistant" and prev.get("tool_calls"):
# The results for this assistant turn sit at idx..idx+k.
# Include the assistant message in the summarised region too.
idx -= 1
return idx
2026-02-21 22:31:43 -08:00
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
"""Compress conversation messages by summarizing middle turns.
Keeps first N + last N turns, summarizes everything in between.
After compression, orphaned tool_call / tool_result pairs are cleaned
up so the API never receives mismatched IDs.
2026-02-21 22:31:43 -08:00
"""
n_messages = len(messages)
if n_messages <= self.protect_first_n + self.protect_last_n + 1:
if not self.quiet_mode:
logger.warning(
"Cannot compress: only %d messages (need > %d)",
n_messages,
self.protect_first_n + self.protect_last_n + 1,
)
2026-02-21 22:31:43 -08:00
return messages
compress_start = self.protect_first_n
compress_end = n_messages - self.protect_last_n
if compress_start >= compress_end:
return messages
# Adjust boundaries to avoid splitting tool_call/result groups.
compress_start = self._align_boundary_forward(messages, compress_start)
compress_end = self._align_boundary_backward(messages, compress_end)
if compress_start >= compress_end:
return messages
2026-02-21 22:31:43 -08:00
turns_to_summarize = messages[compress_start:compress_end]
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
if not self.quiet_mode:
logger.info(
"Context compression triggered (%d tokens >= %d threshold)",
display_tokens,
self.threshold_tokens,
)
logger.info(
"Model context limit: %d tokens (%.0f%% = %d)",
self.context_length,
self.threshold_percent * 100,
self.threshold_tokens,
)
logger.info(
"Summarizing turns %d-%d (%d turns)",
compress_start + 1,
compress_end,
len(turns_to_summarize),
)
2026-02-21 22:31:43 -08:00
summary = self._generate_summary(turns_to_summarize)
compressed = []
for i in range(compress_start):
msg = messages[i].copy()
if i == 0 and msg.get("role") == "system" and self.compression_count == 0:
msg["content"] = (
(msg.get("content") or "")
+ "\n\n[Note: Some earlier conversation turns have been compacted into a handoff summary to preserve context space. The current session state may still reflect earlier work, so build on that summary and state rather than re-doing work.]"
)
2026-02-21 22:31:43 -08:00
compressed.append(msg)
_merge_summary_into_tail = False
if summary:
last_head_role = messages[compress_start - 1].get("role", "user") if compress_start > 0 else "user"
first_tail_role = messages[compress_end].get("role", "user") if compress_end < n_messages else "user"
# Pick a role that avoids consecutive same-role with both neighbors.
# Priority: avoid colliding with head (already committed), then tail.
if last_head_role in ("assistant", "tool"):
summary_role = "user"
else:
summary_role = "assistant"
# If the chosen role collides with the tail AND flipping wouldn't
# collide with the head, flip it.
if summary_role == first_tail_role:
flipped = "assistant" if summary_role == "user" else "user"
if flipped != last_head_role:
summary_role = flipped
else:
# Both roles would create consecutive same-role messages
# (e.g. head=assistant, tail=user — neither role works).
# Merge the summary into the first tail message instead
# of inserting a standalone message that breaks alternation.
_merge_summary_into_tail = True
if not _merge_summary_into_tail:
compressed.append({"role": summary_role, "content": summary})
else:
if not self.quiet_mode:
logger.warning("No summary model available — middle turns dropped without summary")
2026-02-21 22:31:43 -08:00
for i in range(compress_end, n_messages):
msg = messages[i].copy()
if _merge_summary_into_tail and i == compress_end:
original = msg.get("content") or ""
msg["content"] = summary + "\n\n" + original
_merge_summary_into_tail = False
compressed.append(msg)
2026-02-21 22:31:43 -08:00
self.compression_count += 1
compressed = self._sanitize_tool_pairs(compressed)
2026-02-21 22:31:43 -08:00
if not self.quiet_mode:
new_estimate = estimate_messages_tokens_rough(compressed)
saved_estimate = display_tokens - new_estimate
logger.info(
"Compressed: %d -> %d messages (~%d tokens saved)",
n_messages,
len(compressed),
saved_estimate,
)
logger.info("Compression #%d complete", self.compression_count)
2026-02-21 22:31:43 -08:00
return compressed