Replace the fragile hardcoded context length system with a multi-source resolution chain that correctly identifies context windows per provider. Key changes: - New agent/models_dev.py: Fetches and caches the models.dev registry (3800+ models across 100+ providers with per-provider context windows). In-memory cache (1hr TTL) + disk cache for cold starts. - Rewritten get_model_context_length() resolution chain: 0. Config override (model.context_length) 1. Custom providers per-model context_length 2. Persistent disk cache 3. Endpoint /models (local servers) 4. Anthropic /v1/models API (max_input_tokens, API-key only) 5. OpenRouter live API (existing, unchanged) 6. Nous suffix-match via OpenRouter (dot/dash normalization) 7. models.dev registry lookup (provider-aware) 8. Thin hardcoded defaults (broad family patterns) 9. 128K fallback (was 2M) - Provider-aware context: same model now correctly resolves to different context windows per provider (e.g. claude-opus-4.6: 1M on Anthropic, 128K on GitHub Copilot). Provider name flows through ContextCompressor. - DEFAULT_CONTEXT_LENGTHS shrunk from 80+ entries to ~16 broad patterns. models.dev replaces the per-model hardcoding. - CONTEXT_PROBE_TIERS changed from [2M, 1M, 512K, 200K, 128K, 64K, 32K] to [128K, 64K, 32K, 16K, 8K]. Unknown models no longer start at 2M. - hermes model: prompts for context_length when configuring custom endpoints. Supports shorthand (32k, 128K). Saved to custom_providers per-model config. - custom_providers schema extended with optional models dict for per-model context_length (backward compatible). - Nous Portal: suffix-matches bare IDs (claude-opus-4-6) against OpenRouter's prefixed IDs (anthropic/claude-opus-4.6) with dot/dash normalization. Handles all 15 current Nous models. - Anthropic direct: queries /v1/models for max_input_tokens. Only works with regular API keys (sk-ant-api*), not OAuth tokens. Falls through to models.dev for OAuth users. Tests: 5574 passed (18 new tests for models_dev + updated probe tiers) Docs: Updated configuration.md context length section, AGENTS.md Co-authored-by: Test <test@test.com>
397 lines
17 KiB
Python
397 lines
17 KiB
Python
"""Automatic context window compression for long conversations.
|
|
|
|
Self-contained class with its own OpenAI client for summarization.
|
|
Uses Gemini Flash (cheap/fast) to summarize middle turns while
|
|
protecting head and tail context.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from agent.auxiliary_client import call_llm
|
|
from agent.model_metadata import (
|
|
get_model_context_length,
|
|
estimate_messages_tokens_rough,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SUMMARY_PREFIX = (
|
|
"[CONTEXT COMPACTION] Earlier turns in this conversation were compacted "
|
|
"to save context space. The summary below describes work that was "
|
|
"already completed, and the current session state may still reflect "
|
|
"that work (for example, files may already be changed). Use the summary "
|
|
"and the current state to continue from where things left off, and "
|
|
"avoid repeating work:"
|
|
)
|
|
LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:"
|
|
|
|
|
|
class ContextCompressor:
|
|
"""Compresses conversation context when approaching the model's context limit.
|
|
|
|
Algorithm: protect first N + last N turns, summarize everything in between.
|
|
Token tracking uses actual counts from API responses for accuracy.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
model: str,
|
|
threshold_percent: float = 0.50,
|
|
protect_first_n: int = 3,
|
|
protect_last_n: int = 4,
|
|
summary_target_tokens: int = 2500,
|
|
quiet_mode: bool = False,
|
|
summary_model_override: str = None,
|
|
base_url: str = "",
|
|
api_key: str = "",
|
|
config_context_length: int | None = None,
|
|
provider: str = "",
|
|
):
|
|
self.model = model
|
|
self.base_url = base_url
|
|
self.api_key = api_key
|
|
self.provider = provider
|
|
self.threshold_percent = threshold_percent
|
|
self.protect_first_n = protect_first_n
|
|
self.protect_last_n = protect_last_n
|
|
self.summary_target_tokens = summary_target_tokens
|
|
self.quiet_mode = quiet_mode
|
|
|
|
self.context_length = get_model_context_length(
|
|
model, base_url=base_url, api_key=api_key,
|
|
config_context_length=config_context_length,
|
|
provider=provider,
|
|
)
|
|
self.threshold_tokens = int(self.context_length * threshold_percent)
|
|
self.compression_count = 0
|
|
self._context_probed = False # True after a step-down from context error
|
|
|
|
self.last_prompt_tokens = 0
|
|
self.last_completion_tokens = 0
|
|
self.last_total_tokens = 0
|
|
|
|
self.summary_model = summary_model_override or ""
|
|
|
|
def update_from_response(self, usage: Dict[str, Any]):
|
|
"""Update tracked token usage from API response."""
|
|
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
|
|
self.last_completion_tokens = usage.get("completion_tokens", 0)
|
|
self.last_total_tokens = usage.get("total_tokens", 0)
|
|
|
|
def should_compress(self, prompt_tokens: int = None) -> bool:
|
|
"""Check if context exceeds the compression threshold."""
|
|
tokens = prompt_tokens if prompt_tokens is not None else self.last_prompt_tokens
|
|
return tokens >= self.threshold_tokens
|
|
|
|
def should_compress_preflight(self, messages: List[Dict[str, Any]]) -> bool:
|
|
"""Quick pre-flight check using rough estimate (before API call)."""
|
|
rough_estimate = estimate_messages_tokens_rough(messages)
|
|
return rough_estimate >= self.threshold_tokens
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""Get current compression status for display/logging."""
|
|
return {
|
|
"last_prompt_tokens": self.last_prompt_tokens,
|
|
"threshold_tokens": self.threshold_tokens,
|
|
"context_length": self.context_length,
|
|
"usage_percent": (self.last_prompt_tokens / self.context_length * 100) if self.context_length else 0,
|
|
"compression_count": self.compression_count,
|
|
}
|
|
|
|
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
|
|
"""Generate a concise summary of conversation turns.
|
|
|
|
Tries the auxiliary model first, then falls back to the user's main
|
|
model. Returns None if all attempts fail — the caller should drop
|
|
the middle turns without a summary rather than inject a useless
|
|
placeholder.
|
|
"""
|
|
parts = []
|
|
for msg in turns_to_summarize:
|
|
role = msg.get("role", "unknown")
|
|
content = msg.get("content") or ""
|
|
if len(content) > 2000:
|
|
content = content[:1000] + "\n...[truncated]...\n" + content[-500:]
|
|
tool_calls = msg.get("tool_calls", [])
|
|
if tool_calls:
|
|
tool_names = [tc.get("function", {}).get("name", "?") for tc in tool_calls if isinstance(tc, dict)]
|
|
content += f"\n[Tool calls: {', '.join(tool_names)}]"
|
|
parts.append(f"[{role.upper()}]: {content}")
|
|
|
|
content_to_summarize = "\n\n".join(parts)
|
|
prompt = f"""Create a concise handoff summary for a later assistant that will continue this conversation after earlier turns are compacted.
|
|
|
|
Describe:
|
|
1. What actions were taken (tool calls, searches, file operations)
|
|
2. Key information or results obtained
|
|
3. Important decisions, constraints, or user preferences
|
|
4. Relevant data, file names, outputs, or next steps needed to continue
|
|
|
|
Keep it factual, concise, and focused on helping the next assistant resume without repeating work. Target ~{self.summary_target_tokens} tokens.
|
|
|
|
---
|
|
TURNS TO SUMMARIZE:
|
|
{content_to_summarize}
|
|
---
|
|
|
|
Write only the summary body. Do not include any preamble or prefix; the system will add the handoff wrapper."""
|
|
|
|
# Use the centralized LLM router — handles provider resolution,
|
|
# auth, and fallback internally.
|
|
try:
|
|
call_kwargs = {
|
|
"task": "compression",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.3,
|
|
"max_tokens": self.summary_target_tokens * 2,
|
|
"timeout": 30.0,
|
|
}
|
|
if self.summary_model:
|
|
call_kwargs["model"] = self.summary_model
|
|
response = call_llm(**call_kwargs)
|
|
content = response.choices[0].message.content
|
|
# Handle cases where content is not a string (e.g., dict from llama.cpp)
|
|
if not isinstance(content, str):
|
|
content = str(content) if content else ""
|
|
summary = content.strip()
|
|
return self._with_summary_prefix(summary)
|
|
except RuntimeError:
|
|
logging.warning("Context compression: no provider available for "
|
|
"summary. Middle turns will be dropped without summary.")
|
|
return None
|
|
except Exception as e:
|
|
logging.warning("Failed to generate context summary: %s", e)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _with_summary_prefix(summary: str) -> str:
|
|
"""Normalize summary text to the current compaction handoff format."""
|
|
text = (summary or "").strip()
|
|
for prefix in (LEGACY_SUMMARY_PREFIX, SUMMARY_PREFIX):
|
|
if text.startswith(prefix):
|
|
text = text[len(prefix):].lstrip()
|
|
break
|
|
return f"{SUMMARY_PREFIX}\n{text}" if text else SUMMARY_PREFIX
|
|
|
|
# ------------------------------------------------------------------
|
|
# Tool-call / tool-result pair integrity helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _get_tool_call_id(tc) -> str:
|
|
"""Extract the call ID from a tool_call entry (dict or SimpleNamespace)."""
|
|
if isinstance(tc, dict):
|
|
return tc.get("id", "")
|
|
return getattr(tc, "id", "") or ""
|
|
|
|
def _sanitize_tool_pairs(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Fix orphaned tool_call / tool_result pairs after compression.
|
|
|
|
Two failure modes:
|
|
1. A tool *result* references a call_id whose assistant tool_call was
|
|
removed (summarized/truncated). The API rejects this with
|
|
"No tool call found for function call output with call_id ...".
|
|
2. An assistant message has tool_calls whose results were dropped.
|
|
The API rejects this because every tool_call must be followed by
|
|
a tool result with the matching call_id.
|
|
|
|
This method removes orphaned results and inserts stub results for
|
|
orphaned calls so the message list is always well-formed.
|
|
"""
|
|
surviving_call_ids: set = set()
|
|
for msg in messages:
|
|
if msg.get("role") == "assistant":
|
|
for tc in msg.get("tool_calls") or []:
|
|
cid = self._get_tool_call_id(tc)
|
|
if cid:
|
|
surviving_call_ids.add(cid)
|
|
|
|
result_call_ids: set = set()
|
|
for msg in messages:
|
|
if msg.get("role") == "tool":
|
|
cid = msg.get("tool_call_id")
|
|
if cid:
|
|
result_call_ids.add(cid)
|
|
|
|
# 1. Remove tool results whose call_id has no matching assistant tool_call
|
|
orphaned_results = result_call_ids - surviving_call_ids
|
|
if orphaned_results:
|
|
messages = [
|
|
m for m in messages
|
|
if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results)
|
|
]
|
|
if not self.quiet_mode:
|
|
logger.info("Compression sanitizer: removed %d orphaned tool result(s)", len(orphaned_results))
|
|
|
|
# 2. Add stub results for assistant tool_calls whose results were dropped
|
|
missing_results = surviving_call_ids - result_call_ids
|
|
if missing_results:
|
|
patched: List[Dict[str, Any]] = []
|
|
for msg in messages:
|
|
patched.append(msg)
|
|
if msg.get("role") == "assistant":
|
|
for tc in msg.get("tool_calls") or []:
|
|
cid = self._get_tool_call_id(tc)
|
|
if cid in missing_results:
|
|
patched.append({
|
|
"role": "tool",
|
|
"content": "[Result from earlier conversation — see context summary above]",
|
|
"tool_call_id": cid,
|
|
})
|
|
messages = patched
|
|
if not self.quiet_mode:
|
|
logger.info("Compression sanitizer: added %d stub tool result(s)", len(missing_results))
|
|
|
|
return messages
|
|
|
|
def _align_boundary_forward(self, messages: List[Dict[str, Any]], idx: int) -> int:
|
|
"""Push a compress-start boundary forward past any orphan tool results.
|
|
|
|
If ``messages[idx]`` is a tool result, slide forward until we hit a
|
|
non-tool message so we don't start the summarised region mid-group.
|
|
"""
|
|
while idx < len(messages) and messages[idx].get("role") == "tool":
|
|
idx += 1
|
|
return idx
|
|
|
|
def _align_boundary_backward(self, messages: List[Dict[str, Any]], idx: int) -> int:
|
|
"""Pull a compress-end boundary backward to avoid splitting a
|
|
tool_call / result group.
|
|
|
|
If the boundary falls in the middle of a tool-result group (i.e.
|
|
there are consecutive tool messages before ``idx``), walk backward
|
|
past all of them to find the parent assistant message. If found,
|
|
move the boundary before the assistant so the entire
|
|
assistant + tool_results group is included in the summarised region
|
|
rather than being split (which causes silent data loss when
|
|
``_sanitize_tool_pairs`` removes the orphaned tail results).
|
|
"""
|
|
if idx <= 0 or idx >= len(messages):
|
|
return idx
|
|
# Walk backward past consecutive tool results
|
|
check = idx - 1
|
|
while check >= 0 and messages[check].get("role") == "tool":
|
|
check -= 1
|
|
# If we landed on the parent assistant with tool_calls, pull the
|
|
# boundary before it so the whole group gets summarised together.
|
|
if check >= 0 and messages[check].get("role") == "assistant" and messages[check].get("tool_calls"):
|
|
idx = check
|
|
return idx
|
|
|
|
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
|
|
"""Compress conversation messages by summarizing middle turns.
|
|
|
|
Keeps first N + last N turns, summarizes everything in between.
|
|
After compression, orphaned tool_call / tool_result pairs are cleaned
|
|
up so the API never receives mismatched IDs.
|
|
"""
|
|
n_messages = len(messages)
|
|
if n_messages <= self.protect_first_n + self.protect_last_n + 1:
|
|
if not self.quiet_mode:
|
|
logger.warning(
|
|
"Cannot compress: only %d messages (need > %d)",
|
|
n_messages,
|
|
self.protect_first_n + self.protect_last_n + 1,
|
|
)
|
|
return messages
|
|
|
|
compress_start = self.protect_first_n
|
|
compress_end = n_messages - self.protect_last_n
|
|
if compress_start >= compress_end:
|
|
return messages
|
|
|
|
# Adjust boundaries to avoid splitting tool_call/result groups.
|
|
compress_start = self._align_boundary_forward(messages, compress_start)
|
|
compress_end = self._align_boundary_backward(messages, compress_end)
|
|
if compress_start >= compress_end:
|
|
return messages
|
|
|
|
turns_to_summarize = messages[compress_start:compress_end]
|
|
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
|
|
|
|
if not self.quiet_mode:
|
|
logger.info(
|
|
"Context compression triggered (%d tokens >= %d threshold)",
|
|
display_tokens,
|
|
self.threshold_tokens,
|
|
)
|
|
logger.info(
|
|
"Model context limit: %d tokens (%.0f%% = %d)",
|
|
self.context_length,
|
|
self.threshold_percent * 100,
|
|
self.threshold_tokens,
|
|
)
|
|
logger.info(
|
|
"Summarizing turns %d-%d (%d turns)",
|
|
compress_start + 1,
|
|
compress_end,
|
|
len(turns_to_summarize),
|
|
)
|
|
|
|
summary = self._generate_summary(turns_to_summarize)
|
|
|
|
compressed = []
|
|
for i in range(compress_start):
|
|
msg = messages[i].copy()
|
|
if i == 0 and msg.get("role") == "system" and self.compression_count == 0:
|
|
msg["content"] = (
|
|
(msg.get("content") or "")
|
|
+ "\n\n[Note: Some earlier conversation turns have been compacted into a handoff summary to preserve context space. The current session state may still reflect earlier work, so build on that summary and state rather than re-doing work.]"
|
|
)
|
|
compressed.append(msg)
|
|
|
|
_merge_summary_into_tail = False
|
|
if summary:
|
|
last_head_role = messages[compress_start - 1].get("role", "user") if compress_start > 0 else "user"
|
|
first_tail_role = messages[compress_end].get("role", "user") if compress_end < n_messages else "user"
|
|
# Pick a role that avoids consecutive same-role with both neighbors.
|
|
# Priority: avoid colliding with head (already committed), then tail.
|
|
if last_head_role in ("assistant", "tool"):
|
|
summary_role = "user"
|
|
else:
|
|
summary_role = "assistant"
|
|
# If the chosen role collides with the tail AND flipping wouldn't
|
|
# collide with the head, flip it.
|
|
if summary_role == first_tail_role:
|
|
flipped = "assistant" if summary_role == "user" else "user"
|
|
if flipped != last_head_role:
|
|
summary_role = flipped
|
|
else:
|
|
# Both roles would create consecutive same-role messages
|
|
# (e.g. head=assistant, tail=user — neither role works).
|
|
# Merge the summary into the first tail message instead
|
|
# of inserting a standalone message that breaks alternation.
|
|
_merge_summary_into_tail = True
|
|
if not _merge_summary_into_tail:
|
|
compressed.append({"role": summary_role, "content": summary})
|
|
else:
|
|
if not self.quiet_mode:
|
|
logger.warning("No summary model available — middle turns dropped without summary")
|
|
|
|
for i in range(compress_end, n_messages):
|
|
msg = messages[i].copy()
|
|
if _merge_summary_into_tail and i == compress_end:
|
|
original = msg.get("content") or ""
|
|
msg["content"] = summary + "\n\n" + original
|
|
_merge_summary_into_tail = False
|
|
compressed.append(msg)
|
|
|
|
self.compression_count += 1
|
|
|
|
compressed = self._sanitize_tool_pairs(compressed)
|
|
|
|
if not self.quiet_mode:
|
|
new_estimate = estimate_messages_tokens_rough(compressed)
|
|
saved_estimate = display_tokens - new_estimate
|
|
logger.info(
|
|
"Compressed: %d -> %d messages (~%d tokens saved)",
|
|
n_messages,
|
|
len(compressed),
|
|
saved_estimate,
|
|
)
|
|
logger.info("Compression #%d complete", self.compression_count)
|
|
|
|
return compressed
|