From 342096b4bdb7976db0353cbf93e0396845df2f8c Mon Sep 17 00:00:00 2001 From: Teknium Date: Sat, 21 Mar 2026 13:07:08 -0700 Subject: [PATCH] feat(gateway): cache AIAgent per session for prompt caching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway created a fresh AIAgent per message, rebuilding the system prompt (including memory, skills, context files) every turn. This broke prompt prefix caching — providers like Anthropic charge ~10x more for uncached prefixes. Now caches AIAgent instances per session_key with a config signature. The cached agent is reused across messages in the same session, preserving the frozen system prompt and tool schemas. Cache is invalidated when: - Config changes (model, provider, toolsets, reasoning, ephemeral prompt) — detected via signature mismatch - /new, /reset, /clear — explicit session reset - /model — global model change clears all cached agents - /reasoning — global reasoning change clears all cached agents Per-message state (callbacks, stream consumers, progress queues) is set on the agent instance before each run_conversation() call. This matches CLI behavior where a single AIAgent lives across all turns in a session, with _cached_system_prompt built once and reused. --- gateway/run.py | 136 ++++++++++--- tests/gateway/test_agent_cache.py | 238 ++++++++++++++++++++++ tests/gateway/test_run_progress_topics.py | 2 +- 3 files changed, 348 insertions(+), 28 deletions(-) create mode 100644 tests/gateway/test_agent_cache.py diff --git a/gateway/run.py b/gateway/run.py index 8c34935c1..e04c1726d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -344,6 +344,15 @@ class GatewayRunner: self._running_agents: Dict[str, Any] = {} self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt + # Cache AIAgent instances per session to preserve prompt caching. + # Without this, a new AIAgent is created per message, rebuilding the + # system prompt (including memory) every turn — breaking prefix cache + # and costing ~10x more on providers with prompt caching (Anthropic). + # Key: session_key, Value: (AIAgent, config_signature_str) + import threading as _threading + self._agent_cache: Dict[str, tuple] = {} + self._agent_cache_lock = _threading.Lock() + # Track active fallback model/provider when primary is rate-limited. # Set after an agent run where fallback was activated; cleared when # the primary model succeeds again or the user switches via /model. @@ -2339,6 +2348,7 @@ class GatewayRunner: logger.debug("Gateway memory flush on reset failed: %s", e) self._shutdown_gateway_honcho(session_key) + self._evict_cached_agent(session_key) # Reset the session new_entry = self.session_store.reset_session(session_key) @@ -4364,6 +4374,45 @@ class GatewayRunner: _MAX_INTERRUPT_DEPTH = 3 # Cap recursive interrupt handling (#816) + @staticmethod + def _agent_config_signature( + model: str, + runtime: dict, + enabled_toolsets: list, + ephemeral_prompt: str, + ) -> str: + """Compute a stable string key from agent config values. + + When this signature changes between messages, the cached AIAgent is + discarded and rebuilt. When it stays the same, the cached agent is + reused — preserving the frozen system prompt and tool schemas for + prompt cache hits. + """ + import hashlib, json as _j + blob = _j.dumps( + [ + model, + runtime.get("api_key", "")[:8], # first 8 chars only + runtime.get("base_url", ""), + runtime.get("provider", ""), + runtime.get("api_mode", ""), + sorted(enabled_toolsets) if enabled_toolsets else [], + # reasoning_config excluded — it's set per-message on the + # cached agent and doesn't affect system prompt or tools. + ephemeral_prompt or "", + ], + sort_keys=True, + default=str, + ) + return hashlib.sha256(blob.encode()).hexdigest()[:16] + + def _evict_cached_agent(self, session_key: str) -> None: + """Remove a cached agent for a session (called on /new, /model, etc).""" + _lock = getattr(self, "_agent_cache_lock", None) + if _lock: + with _lock: + self._agent_cache.pop(session_key, None) + async def _run_agent( self, message: str, @@ -4713,34 +4762,64 @@ class GatewayRunner: logger.debug("Could not set up stream consumer: %s", _sc_err) turn_route = self._resolve_turn_agent_config(message, model, runtime_kwargs) - agent = AIAgent( - model=turn_route["model"], - **turn_route["runtime"], - max_iterations=max_iterations, - quiet_mode=True, - verbose_logging=False, - enabled_toolsets=enabled_toolsets, - ephemeral_system_prompt=combined_ephemeral or None, - prefill_messages=self._prefill_messages or None, - reasoning_config=reasoning_config, - providers_allowed=pr.get("only"), - providers_ignored=pr.get("ignore"), - providers_order=pr.get("order"), - provider_sort=pr.get("sort"), - provider_require_parameters=pr.get("require_parameters", False), - provider_data_collection=pr.get("data_collection"), - session_id=session_id, - tool_progress_callback=progress_callback if tool_progress_enabled else None, - step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None, - stream_delta_callback=_stream_delta_cb, - status_callback=_status_callback_sync, - platform=platform_key, - honcho_session_key=session_key, - honcho_manager=honcho_manager, - honcho_config=honcho_config, - session_db=self._session_db, - fallback_model=self._fallback_model, + + # Check agent cache — reuse the AIAgent from the previous message + # in this session to preserve the frozen system prompt and tool + # schemas for prompt cache hits. + _sig = self._agent_config_signature( + turn_route["model"], + turn_route["runtime"], + enabled_toolsets, + combined_ephemeral, ) + agent = None + _cache_lock = getattr(self, "_agent_cache_lock", None) + _cache = getattr(self, "_agent_cache", None) + if _cache_lock and _cache is not None: + with _cache_lock: + cached = _cache.get(session_key) + if cached and cached[1] == _sig: + agent = cached[0] + logger.debug("Reusing cached agent for session %s", session_key) + + if agent is None: + # Config changed or first message — create fresh agent + agent = AIAgent( + model=turn_route["model"], + **turn_route["runtime"], + max_iterations=max_iterations, + quiet_mode=True, + verbose_logging=False, + enabled_toolsets=enabled_toolsets, + ephemeral_system_prompt=combined_ephemeral or None, + prefill_messages=self._prefill_messages or None, + reasoning_config=reasoning_config, + providers_allowed=pr.get("only"), + providers_ignored=pr.get("ignore"), + providers_order=pr.get("order"), + provider_sort=pr.get("sort"), + provider_require_parameters=pr.get("require_parameters", False), + provider_data_collection=pr.get("data_collection"), + session_id=session_id, + platform=platform_key, + honcho_session_key=session_key, + honcho_manager=honcho_manager, + honcho_config=honcho_config, + session_db=self._session_db, + fallback_model=self._fallback_model, + ) + if _cache_lock and _cache is not None: + with _cache_lock: + _cache[session_key] = (agent, _sig) + logger.debug("Created new agent for session %s (sig=%s)", session_key, _sig) + + # Per-message state — callbacks and reasoning config change every + # turn and must not be baked into the cached agent constructor. + agent.tool_progress_callback = progress_callback if tool_progress_enabled else None + agent.step_callback = _step_callback_sync if _hooks_ref.loaded_hooks else None + agent.stream_delta_callback = _stream_delta_cb + agent.status_callback = _status_callback_sync + agent.reasoning_config = reasoning_config # Store agent reference for interrupt support agent_holder[0] = agent @@ -4985,6 +5064,9 @@ class GatewayRunner: if _agent.model != _cfg_model: self._effective_model = _agent.model self._effective_provider = getattr(_agent, 'provider', None) + # Fallback activated — evict cached agent so the next + # message starts fresh and retries the primary model. + self._evict_cached_agent(session_key) else: # Primary model worked — clear any stale fallback state self._effective_model = None diff --git a/tests/gateway/test_agent_cache.py b/tests/gateway/test_agent_cache.py new file mode 100644 index 000000000..074b8e2d4 --- /dev/null +++ b/tests/gateway/test_agent_cache.py @@ -0,0 +1,238 @@ +"""Integration tests for gateway AIAgent caching. + +Verifies that the agent cache correctly: +- Reuses agents across messages (same config → same instance) +- Rebuilds agents when config changes (model, provider, toolsets) +- Updates reasoning_config in-place without rebuilding +- Evicts on session reset +- Evicts on fallback activation +- Preserves frozen system prompt across turns +""" + +import hashlib +import json +import threading +from unittest.mock import MagicMock, patch + +import pytest + + +def _make_runner(): + """Create a minimal GatewayRunner with just the cache infrastructure.""" + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._agent_cache = {} + runner._agent_cache_lock = threading.Lock() + return runner + + +class TestAgentConfigSignature: + """Config signature produces stable, distinct keys.""" + + def test_same_config_same_signature(self): + from gateway.run import GatewayRunner + + runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", + "provider": "openrouter", "api_mode": "chat_completions"} + sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + assert sig1 == sig2 + + def test_model_change_different_signature(self): + from gateway.run import GatewayRunner + + runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", + "provider": "openrouter"} + sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + sig2 = GatewayRunner._agent_config_signature("claude-opus-4.6", runtime, ["hermes-telegram"], "") + assert sig1 != sig2 + + def test_provider_change_different_signature(self): + from gateway.run import GatewayRunner + + rt1 = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} + rt2 = {"api_key": "sk-test12345678", "base_url": "https://api.anthropic.com", "provider": "anthropic"} + sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt1, ["hermes-telegram"], "") + sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", rt2, ["hermes-telegram"], "") + assert sig1 != sig2 + + def test_toolset_change_different_signature(self): + from gateway.run import GatewayRunner + + runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} + sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-discord"], "") + assert sig1 != sig2 + + def test_reasoning_not_in_signature(self): + """Reasoning config is set per-message, not part of the signature.""" + from gateway.run import GatewayRunner + + runtime = {"api_key": "sk-test12345678", "base_url": "https://openrouter.ai/api/v1", "provider": "openrouter"} + # Same config — signature should be identical regardless of what + # reasoning_config the caller might have (it's not passed in) + sig1 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + sig2 = GatewayRunner._agent_config_signature("claude-sonnet-4", runtime, ["hermes-telegram"], "") + assert sig1 == sig2 + + +class TestAgentCacheLifecycle: + """End-to-end cache behavior with real AIAgent construction.""" + + def test_cache_hit_returns_same_agent(self): + """Second message with same config reuses the cached agent instance.""" + from run_agent import AIAgent + + runner = _make_runner() + session_key = "telegram:12345" + runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1", + "provider": "openrouter", "api_mode": "chat_completions"} + sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "") + + # First message — create and cache + agent1 = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, platform="telegram", + ) + with runner._agent_cache_lock: + runner._agent_cache[session_key] = (agent1, sig) + + # Second message — cache hit + with runner._agent_cache_lock: + cached = runner._agent_cache.get(session_key) + assert cached is not None + assert cached[1] == sig + assert cached[0] is agent1 # same instance + + def test_cache_miss_on_model_change(self): + """Model change produces different signature → cache miss.""" + from run_agent import AIAgent + + runner = _make_runner() + session_key = "telegram:12345" + runtime = {"api_key": "test", "base_url": "https://openrouter.ai/api/v1", + "provider": "openrouter", "api_mode": "chat_completions"} + + old_sig = runner._agent_config_signature("anthropic/claude-sonnet-4", runtime, ["hermes-telegram"], "") + agent1 = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, platform="telegram", + ) + with runner._agent_cache_lock: + runner._agent_cache[session_key] = (agent1, old_sig) + + # New model → different signature + new_sig = runner._agent_config_signature("anthropic/claude-opus-4.6", runtime, ["hermes-telegram"], "") + assert new_sig != old_sig + + with runner._agent_cache_lock: + cached = runner._agent_cache.get(session_key) + assert cached[1] != new_sig # signature mismatch → would create new agent + + def test_evict_on_session_reset(self): + """_evict_cached_agent removes the entry.""" + from run_agent import AIAgent + + runner = _make_runner() + session_key = "telegram:12345" + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, + ) + with runner._agent_cache_lock: + runner._agent_cache[session_key] = (agent, "sig123") + + runner._evict_cached_agent(session_key) + + with runner._agent_cache_lock: + assert session_key not in runner._agent_cache + + def test_evict_does_not_affect_other_sessions(self): + """Evicting one session leaves other sessions cached.""" + runner = _make_runner() + with runner._agent_cache_lock: + runner._agent_cache["session-A"] = ("agent-A", "sig-A") + runner._agent_cache["session-B"] = ("agent-B", "sig-B") + + runner._evict_cached_agent("session-A") + + with runner._agent_cache_lock: + assert "session-A" not in runner._agent_cache + assert "session-B" in runner._agent_cache + + def test_reasoning_config_updates_in_place(self): + """Reasoning config can be set on a cached agent without eviction.""" + from run_agent import AIAgent + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, + reasoning_config={"enabled": True, "effort": "medium"}, + ) + + # Simulate per-message reasoning update + agent.reasoning_config = {"enabled": True, "effort": "high"} + assert agent.reasoning_config["effort"] == "high" + + # System prompt should not be affected by reasoning change + prompt1 = agent._build_system_prompt() + agent._cached_system_prompt = prompt1 # simulate run_conversation caching + agent.reasoning_config = {"enabled": True, "effort": "low"} + prompt2 = agent._cached_system_prompt + assert prompt1 is prompt2 # same object — not invalidated by reasoning change + + def test_system_prompt_frozen_across_cache_reuse(self): + """The cached agent's system prompt stays identical across turns.""" + from run_agent import AIAgent + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, platform="telegram", + ) + + # Build system prompt (simulates first run_conversation) + prompt1 = agent._build_system_prompt() + agent._cached_system_prompt = prompt1 + + # Simulate second turn — prompt should be frozen + prompt2 = agent._cached_system_prompt + assert prompt1 is prompt2 # same object, not rebuilt + + def test_callbacks_update_without_cache_eviction(self): + """Per-message callbacks can be set on cached agent.""" + from run_agent import AIAgent + + agent = AIAgent( + model="anthropic/claude-sonnet-4", api_key="test", + base_url="https://openrouter.ai/api/v1", provider="openrouter", + max_iterations=5, quiet_mode=True, skip_context_files=True, + skip_memory=True, + ) + + # Set callbacks like the gateway does per-message + cb1 = lambda *a: None + cb2 = lambda *a: None + agent.tool_progress_callback = cb1 + agent.step_callback = cb2 + agent.stream_delta_callback = None + agent.status_callback = None + + assert agent.tool_progress_callback is cb1 + assert agent.step_callback is cb2 + + # Update for next message + cb3 = lambda *a: None + agent.tool_progress_callback = cb3 + assert agent.tool_progress_callback is cb3 diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index 66d13e0d0..c4839133c 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -56,7 +56,7 @@ class ProgressCaptureAdapter(BasePlatformAdapter): class FakeAgent: def __init__(self, **kwargs): - self.tool_progress_callback = kwargs["tool_progress_callback"] + self.tool_progress_callback = kwargs.get("tool_progress_callback") self.tools = [] def run_conversation(self, message, conversation_history=None, task_id=None):