diff --git a/run_agent.py b/run_agent.py index 078f8f0f4..711330625 100644 --- a/run_agent.py +++ b/run_agent.py @@ -20,6 +20,7 @@ Usage: response = agent.run_conversation("Tell me about the latest Python updates") """ +import copy import json import logging import os @@ -457,6 +458,94 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.""" return compressed +# ============================================================================= +# Anthropic Prompt Caching (system_and_3 strategy) +# ============================================================================= +# Reduces input token costs by ~75% on multi-turn conversations by caching +# the conversation prefix. Uses 4 cache_control breakpoints (Anthropic max): +# 1. System prompt (stable across all turns) +# 2-4. Last 3 non-system messages (rolling window) +# +# Cached tokens are read at 0.1x input price. Cache writes cost 1.25x (5m TTL) +# or 2x (1h TTL). Only applied to Claude models via OpenRouter. + +def _apply_cache_marker(msg: dict, cache_marker: dict) -> None: + """ + Add cache_control to a single message, handling all format variations. + + - tool messages: cache_control at message level (Anthropic API quirk) + - string content: converted to multipart content array + - list content: marker added to last item + - None content (assistant with tool_calls): message level + """ + role = msg.get("role", "") + content = msg.get("content") + + if role == "tool": + msg["cache_control"] = cache_marker + return + + if content is None: + msg["cache_control"] = cache_marker + return + + if isinstance(content, str): + msg["content"] = [{"type": "text", "text": content, "cache_control": cache_marker}] + return + + if isinstance(content, list) and content: + last = content[-1] + if isinstance(last, dict): + last["cache_control"] = cache_marker + + +def apply_anthropic_cache_control( + api_messages: List[Dict[str, Any]], + cache_ttl: str = "5m", +) -> List[Dict[str, Any]]: + """ + Apply system_and_3 caching strategy to messages for Anthropic models. + + Places up to 4 cache_control breakpoints: + 1. System prompt (index 0, stable across all turns) + 2-4. Last 3 non-system messages (rolling cache frontier) + + Each breakpoint tells Anthropic "cache everything from the start up to here." + Multiple breakpoints create a ladder of cached prefixes at different depths, + which provides robust cache hits even when the most recent cache entry hasn't + propagated yet. + + Args: + api_messages: Fully assembled message list (system prompt first). + cache_ttl: "5m" (default, 1.25x write cost) or "1h" (2x write cost). + + Returns: + Deep copy of messages with cache_control breakpoints injected. + """ + messages = copy.deepcopy(api_messages) + if not messages: + return messages + + marker = {"type": "ephemeral"} + if cache_ttl == "1h": + marker["ttl"] = "1h" + + breakpoints_used = 0 + + # Breakpoint 1: System prompt (always stable, gives a guaranteed minimum hit) + if messages[0].get("role") == "system": + _apply_cache_marker(messages[0], marker) + breakpoints_used += 1 + + # Breakpoints 2-4: Last 3 non-system messages (rolling window) + remaining = 4 - breakpoints_used + non_sys = [i for i in range(len(messages)) if messages[i].get("role") != "system"] + for idx in non_sys[-remaining:]: + _apply_cache_marker(messages[idx], marker) + + return messages + + # ============================================================================= # Default System Prompt Components # ============================================================================= @@ -645,7 +734,9 @@ class AIAgent: self.ephemeral_system_prompt = ephemeral_system_prompt self.log_prefix_chars = log_prefix_chars self.log_prefix = f"{log_prefix} " if log_prefix else "" - self.base_url = base_url or "" # Store for OpenRouter detection + # Store effective base URL for feature detection (prompt caching, reasoning, etc.) + # When no base_url is provided, the client defaults to OpenRouter, so reflect that here. + self.base_url = base_url or "https://openrouter.ai/api/v1" self.tool_progress_callback = tool_progress_callback self._last_reported_tool = None # Track for "new tool" mode @@ -668,6 +759,14 @@ class AIAgent: self.reasoning_config = reasoning_config # None = use default (xhigh for OpenRouter) self.prefill_messages = prefill_messages or [] # Prefilled conversation turns + # Anthropic prompt caching: auto-enabled for Claude models via OpenRouter. + # Reduces input costs by ~75% on multi-turn conversations by caching the + # conversation prefix. Uses system_and_3 strategy (4 breakpoints). + is_openrouter = "openrouter" in self.base_url.lower() + is_claude = "claude" in self.model.lower() + self._use_prompt_caching = is_openrouter and is_claude + self._cache_ttl = "5m" # Default 5-minute TTL (1.25x write cost) + # Configure logging if self.verbose_logging: logging.basicConfig( @@ -773,6 +872,10 @@ class AIAgent: prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)") + # Show prompt caching status + if self._use_prompt_caching and not self.quiet_mode: + print(f"💾 Prompt caching: ENABLED (Claude via OpenRouter, {self._cache_ttl} TTL)") + # Session logging setup - auto-save conversation trajectories for debugging self.session_start = datetime.now() if session_id: @@ -1582,6 +1685,13 @@ class AIAgent: # Insert system message at the beginning api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages + # Apply Anthropic prompt caching for Claude models via OpenRouter. + # Auto-detected: if model name contains "claude" and base_url is OpenRouter, + # inject cache_control breakpoints (system + last 3 messages) to reduce + # input token costs by ~75% on multi-turn conversations. + if self._use_prompt_caching: + api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl) + # Calculate approximate request size for logging total_chars = sum(len(str(msg)) for msg in api_messages) approx_tokens = total_chars // 4 # Rough estimate: 4 chars per token @@ -1811,6 +1921,16 @@ class AIAgent: if self.verbose_logging: logging.debug(f"Token usage: prompt={usage_dict['prompt_tokens']:,}, completion={usage_dict['completion_tokens']:,}, total={usage_dict['total_tokens']:,}") + + # Log cache hit stats when prompt caching is active + if self._use_prompt_caching: + details = getattr(response.usage, 'prompt_tokens_details', None) + cached = getattr(details, 'cached_tokens', 0) or 0 if details else 0 + written = getattr(details, 'cache_write_tokens', 0) or 0 if details else 0 + prompt = usage_dict["prompt_tokens"] + hit_pct = (cached / prompt * 100) if prompt > 0 else 0 + if not self.quiet_mode: + print(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)") break # Success, exit retry loop