From 8d528e00458ba313ad3ebde1e302b2694bc11a8b Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 22 Mar 2026 04:56:06 -0700 Subject: [PATCH] fix(api_server): persist ResponseStore to SQLite across restarts (#2472) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /v1/responses endpoint used an in-memory OrderedDict that lost all conversation state on gateway restart. Replace with SQLite-backed storage at ~/.hermes/response_store.db. - Responses and conversation name mappings survive restarts - Same LRU eviction behavior (configurable max_size) - WAL mode for concurrent read performance - Falls back to in-memory SQLite if disk path unavailable - Conversation name→response_id mapping moved into the store --- gateway/platforms/api_server.py | 112 ++++++++++++++++++++++++------- tests/gateway/test_api_server.py | 6 +- 2 files changed, 92 insertions(+), 26 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 78ea1137c..01339608d 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -18,10 +18,10 @@ Requires: """ import asyncio -import collections import json import logging import os +import sqlite3 import time import uuid from typing import Any, Dict, List, Optional @@ -54,41 +54,109 @@ def check_api_server_requirements() -> bool: class ResponseStore: """ - In-memory LRU store for Responses API state. + SQLite-backed LRU store for Responses API state. Each stored response includes the full internal conversation history (with tool calls and results) so it can be reconstructed on subsequent requests via previous_response_id. + + Persists across gateway restarts. Falls back to in-memory SQLite + if the on-disk path is unavailable. """ - def __init__(self, max_size: int = MAX_STORED_RESPONSES): - self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict() + def __init__(self, max_size: int = MAX_STORED_RESPONSES, db_path: str = None): self._max_size = max_size + if db_path is None: + try: + from hermes_cli.config import get_hermes_home + db_path = str(get_hermes_home() / "response_store.db") + except Exception: + db_path = ":memory:" + try: + self._conn = sqlite3.connect(db_path, check_same_thread=False) + except Exception: + self._conn = sqlite3.connect(":memory:", check_same_thread=False) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute( + """CREATE TABLE IF NOT EXISTS responses ( + response_id TEXT PRIMARY KEY, + data TEXT NOT NULL, + accessed_at REAL NOT NULL + )""" + ) + self._conn.execute( + """CREATE TABLE IF NOT EXISTS conversations ( + name TEXT PRIMARY KEY, + response_id TEXT NOT NULL + )""" + ) + self._conn.commit() def get(self, response_id: str) -> Optional[Dict[str, Any]]: - """Retrieve a stored response by ID (moves to end for LRU).""" - if response_id in self._store: - self._store.move_to_end(response_id) - return self._store[response_id] - return None + """Retrieve a stored response by ID (updates access time for LRU).""" + row = self._conn.execute( + "SELECT data FROM responses WHERE response_id = ?", (response_id,) + ).fetchone() + if row is None: + return None + import time + self._conn.execute( + "UPDATE responses SET accessed_at = ? WHERE response_id = ?", + (time.time(), response_id), + ) + self._conn.commit() + return json.loads(row[0]) def put(self, response_id: str, data: Dict[str, Any]) -> None: """Store a response, evicting the oldest if at capacity.""" - if response_id in self._store: - self._store.move_to_end(response_id) - self._store[response_id] = data - while len(self._store) > self._max_size: - self._store.popitem(last=False) + import time + self._conn.execute( + "INSERT OR REPLACE INTO responses (response_id, data, accessed_at) VALUES (?, ?, ?)", + (response_id, json.dumps(data, default=str), time.time()), + ) + # Evict oldest entries beyond max_size + count = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0] + if count > self._max_size: + self._conn.execute( + "DELETE FROM responses WHERE response_id IN " + "(SELECT response_id FROM responses ORDER BY accessed_at ASC LIMIT ?)", + (count - self._max_size,), + ) + self._conn.commit() def delete(self, response_id: str) -> bool: """Remove a response from the store. Returns True if found and deleted.""" - if response_id in self._store: - del self._store[response_id] - return True - return False + cursor = self._conn.execute( + "DELETE FROM responses WHERE response_id = ?", (response_id,) + ) + self._conn.commit() + return cursor.rowcount > 0 + + def get_conversation(self, name: str) -> Optional[str]: + """Get the latest response_id for a conversation name.""" + row = self._conn.execute( + "SELECT response_id FROM conversations WHERE name = ?", (name,) + ).fetchone() + return row[0] if row else None + + def set_conversation(self, name: str, response_id: str) -> None: + """Map a conversation name to its latest response_id.""" + self._conn.execute( + "INSERT OR REPLACE INTO conversations (name, response_id) VALUES (?, ?)", + (name, response_id), + ) + self._conn.commit() + + def close(self) -> None: + """Close the database connection.""" + try: + self._conn.close() + except Exception: + pass def __len__(self) -> int: - return len(self._store) + row = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone() + return row[0] if row else 0 # --------------------------------------------------------------------------- @@ -147,8 +215,6 @@ class APIServerAdapter(BasePlatformAdapter): self._runner: Optional["web.AppRunner"] = None self._site: Optional["web.TCPSite"] = None self._response_store = ResponseStore() - # Conversation name → latest response_id mapping - self._conversations: Dict[str, str] = {} @staticmethod def _parse_cors_origins(value: Any) -> tuple[str, ...]: @@ -520,7 +586,7 @@ class APIServerAdapter(BasePlatformAdapter): # Resolve conversation name to latest response_id if conversation: - previous_response_id = self._conversations.get(conversation) + previous_response_id = self._response_store.get_conversation(conversation) # No error if conversation doesn't exist yet — it's a new conversation # Normalize input to message list @@ -643,7 +709,7 @@ class APIServerAdapter(BasePlatformAdapter): # Update conversation mapping so the next request with the same # conversation name automatically chains to this response if conversation: - self._conversations[conversation] = response_id + self._response_store.set_conversation(conversation, response_id) return web.json_response(response_data) diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 89d713ef0..96160b5a5 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1295,7 +1295,7 @@ class TestConversationParameter: data = await resp.json() assert data["status"] == "completed" # Conversation mapping should be set - assert "my-chat" in adapter._conversations + assert adapter._response_store.get_conversation("my-chat") is not None @pytest.mark.asyncio async def test_conversation_chains_automatically(self, adapter): @@ -1369,7 +1369,7 @@ class TestConversationParameter: await cli.post("/v1/responses", json={"input": "conv-b msg", "conversation": "conv-b"}) # They should have different response IDs in the mapping - assert adapter._conversations["conv-a"] != adapter._conversations["conv-b"] + assert adapter._response_store.get_conversation("conv-a") != adapter._response_store.get_conversation("conv-b") @pytest.mark.asyncio async def test_conversation_store_false_no_mapping(self, adapter): @@ -1388,4 +1388,4 @@ class TestConversationParameter: }) assert resp.status == 200 # Conversation mapping should NOT be set since store=false - assert "ephemeral-chat" not in adapter._conversations + assert adapter._response_store.get_conversation("ephemeral-chat") is None