Implement Anthropic prompt caching for Claude models via OpenRouter
- Introduced a caching strategy that reduces input token costs by ~75% on multi-turn conversations by caching the conversation prefix. - Added functions to apply cache control markers to messages, enhancing efficiency in token usage. - Updated AIAgent to auto-enable prompt caching for Claude models, with configurable cache TTL. - Enhanced logging to track cache hit statistics when caching is active, improving monitoring of token usage.
This commit is contained in:
122
run_agent.py
122
run_agent.py
@@ -20,6 +20,7 @@ Usage:
|
||||
response = agent.run_conversation("Tell me about the latest Python updates")
|
||||
"""
|
||||
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -457,6 +458,94 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
|
||||
return compressed
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Anthropic Prompt Caching (system_and_3 strategy)
|
||||
# =============================================================================
|
||||
# Reduces input token costs by ~75% on multi-turn conversations by caching
|
||||
# the conversation prefix. Uses 4 cache_control breakpoints (Anthropic max):
|
||||
# 1. System prompt (stable across all turns)
|
||||
# 2-4. Last 3 non-system messages (rolling window)
|
||||
#
|
||||
# Cached tokens are read at 0.1x input price. Cache writes cost 1.25x (5m TTL)
|
||||
# or 2x (1h TTL). Only applied to Claude models via OpenRouter.
|
||||
|
||||
def _apply_cache_marker(msg: dict, cache_marker: dict) -> None:
|
||||
"""
|
||||
Add cache_control to a single message, handling all format variations.
|
||||
|
||||
- tool messages: cache_control at message level (Anthropic API quirk)
|
||||
- string content: converted to multipart content array
|
||||
- list content: marker added to last item
|
||||
- None content (assistant with tool_calls): message level
|
||||
"""
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content")
|
||||
|
||||
if role == "tool":
|
||||
msg["cache_control"] = cache_marker
|
||||
return
|
||||
|
||||
if content is None:
|
||||
msg["cache_control"] = cache_marker
|
||||
return
|
||||
|
||||
if isinstance(content, str):
|
||||
msg["content"] = [{"type": "text", "text": content, "cache_control": cache_marker}]
|
||||
return
|
||||
|
||||
if isinstance(content, list) and content:
|
||||
last = content[-1]
|
||||
if isinstance(last, dict):
|
||||
last["cache_control"] = cache_marker
|
||||
|
||||
|
||||
def apply_anthropic_cache_control(
|
||||
api_messages: List[Dict[str, Any]],
|
||||
cache_ttl: str = "5m",
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Apply system_and_3 caching strategy to messages for Anthropic models.
|
||||
|
||||
Places up to 4 cache_control breakpoints:
|
||||
1. System prompt (index 0, stable across all turns)
|
||||
2-4. Last 3 non-system messages (rolling cache frontier)
|
||||
|
||||
Each breakpoint tells Anthropic "cache everything from the start up to here."
|
||||
Multiple breakpoints create a ladder of cached prefixes at different depths,
|
||||
which provides robust cache hits even when the most recent cache entry hasn't
|
||||
propagated yet.
|
||||
|
||||
Args:
|
||||
api_messages: Fully assembled message list (system prompt first).
|
||||
cache_ttl: "5m" (default, 1.25x write cost) or "1h" (2x write cost).
|
||||
|
||||
Returns:
|
||||
Deep copy of messages with cache_control breakpoints injected.
|
||||
"""
|
||||
messages = copy.deepcopy(api_messages)
|
||||
if not messages:
|
||||
return messages
|
||||
|
||||
marker = {"type": "ephemeral"}
|
||||
if cache_ttl == "1h":
|
||||
marker["ttl"] = "1h"
|
||||
|
||||
breakpoints_used = 0
|
||||
|
||||
# Breakpoint 1: System prompt (always stable, gives a guaranteed minimum hit)
|
||||
if messages[0].get("role") == "system":
|
||||
_apply_cache_marker(messages[0], marker)
|
||||
breakpoints_used += 1
|
||||
|
||||
# Breakpoints 2-4: Last 3 non-system messages (rolling window)
|
||||
remaining = 4 - breakpoints_used
|
||||
non_sys = [i for i in range(len(messages)) if messages[i].get("role") != "system"]
|
||||
for idx in non_sys[-remaining:]:
|
||||
_apply_cache_marker(messages[idx], marker)
|
||||
|
||||
return messages
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Default System Prompt Components
|
||||
# =============================================================================
|
||||
@@ -645,7 +734,9 @@ class AIAgent:
|
||||
self.ephemeral_system_prompt = ephemeral_system_prompt
|
||||
self.log_prefix_chars = log_prefix_chars
|
||||
self.log_prefix = f"{log_prefix} " if log_prefix else ""
|
||||
self.base_url = base_url or "" # Store for OpenRouter detection
|
||||
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
|
||||
# When no base_url is provided, the client defaults to OpenRouter, so reflect that here.
|
||||
self.base_url = base_url or "https://openrouter.ai/api/v1"
|
||||
self.tool_progress_callback = tool_progress_callback
|
||||
self._last_reported_tool = None # Track for "new tool" mode
|
||||
|
||||
@@ -668,6 +759,14 @@ class AIAgent:
|
||||
self.reasoning_config = reasoning_config # None = use default (xhigh for OpenRouter)
|
||||
self.prefill_messages = prefill_messages or [] # Prefilled conversation turns
|
||||
|
||||
# Anthropic prompt caching: auto-enabled for Claude models via OpenRouter.
|
||||
# Reduces input costs by ~75% on multi-turn conversations by caching the
|
||||
# conversation prefix. Uses system_and_3 strategy (4 breakpoints).
|
||||
is_openrouter = "openrouter" in self.base_url.lower()
|
||||
is_claude = "claude" in self.model.lower()
|
||||
self._use_prompt_caching = is_openrouter and is_claude
|
||||
self._cache_ttl = "5m" # Default 5-minute TTL (1.25x write cost)
|
||||
|
||||
# Configure logging
|
||||
if self.verbose_logging:
|
||||
logging.basicConfig(
|
||||
@@ -773,6 +872,10 @@ class AIAgent:
|
||||
prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt
|
||||
print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)")
|
||||
|
||||
# Show prompt caching status
|
||||
if self._use_prompt_caching and not self.quiet_mode:
|
||||
print(f"💾 Prompt caching: ENABLED (Claude via OpenRouter, {self._cache_ttl} TTL)")
|
||||
|
||||
# Session logging setup - auto-save conversation trajectories for debugging
|
||||
self.session_start = datetime.now()
|
||||
if session_id:
|
||||
@@ -1582,6 +1685,13 @@ class AIAgent:
|
||||
# Insert system message at the beginning
|
||||
api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages
|
||||
|
||||
# Apply Anthropic prompt caching for Claude models via OpenRouter.
|
||||
# Auto-detected: if model name contains "claude" and base_url is OpenRouter,
|
||||
# inject cache_control breakpoints (system + last 3 messages) to reduce
|
||||
# input token costs by ~75% on multi-turn conversations.
|
||||
if self._use_prompt_caching:
|
||||
api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl)
|
||||
|
||||
# Calculate approximate request size for logging
|
||||
total_chars = sum(len(str(msg)) for msg in api_messages)
|
||||
approx_tokens = total_chars // 4 # Rough estimate: 4 chars per token
|
||||
@@ -1811,6 +1921,16 @@ class AIAgent:
|
||||
|
||||
if self.verbose_logging:
|
||||
logging.debug(f"Token usage: prompt={usage_dict['prompt_tokens']:,}, completion={usage_dict['completion_tokens']:,}, total={usage_dict['total_tokens']:,}")
|
||||
|
||||
# Log cache hit stats when prompt caching is active
|
||||
if self._use_prompt_caching:
|
||||
details = getattr(response.usage, 'prompt_tokens_details', None)
|
||||
cached = getattr(details, 'cached_tokens', 0) or 0 if details else 0
|
||||
written = getattr(details, 'cache_write_tokens', 0) or 0 if details else 0
|
||||
prompt = usage_dict["prompt_tokens"]
|
||||
hit_pct = (cached / prompt * 100) if prompt > 0 else 0
|
||||
if not self.quiet_mode:
|
||||
print(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)")
|
||||
|
||||
break # Success, exit retry loop
|
||||
|
||||
|
||||
Reference in New Issue
Block a user