fix(api_server): persist ResponseStore to SQLite across restarts (#2472)
The /v1/responses endpoint used an in-memory OrderedDict that lost all conversation state on gateway restart. Replace with SQLite-backed storage at ~/.hermes/response_store.db. - Responses and conversation name mappings survive restarts - Same LRU eviction behavior (configurable max_size) - WAL mode for concurrent read performance - Falls back to in-memory SQLite if disk path unavailable - Conversation name→response_id mapping moved into the store
This commit is contained in:
@@ -18,10 +18,10 @@ Requires:
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any, Dict, List, Optional
|
||||
@@ -54,41 +54,109 @@ def check_api_server_requirements() -> bool:
|
||||
|
||||
class ResponseStore:
|
||||
"""
|
||||
In-memory LRU store for Responses API state.
|
||||
SQLite-backed LRU store for Responses API state.
|
||||
|
||||
Each stored response includes the full internal conversation history
|
||||
(with tool calls and results) so it can be reconstructed on subsequent
|
||||
requests via previous_response_id.
|
||||
|
||||
Persists across gateway restarts. Falls back to in-memory SQLite
|
||||
if the on-disk path is unavailable.
|
||||
"""
|
||||
|
||||
def __init__(self, max_size: int = MAX_STORED_RESPONSES):
|
||||
self._store: collections.OrderedDict[str, Dict[str, Any]] = collections.OrderedDict()
|
||||
def __init__(self, max_size: int = MAX_STORED_RESPONSES, db_path: str = None):
|
||||
self._max_size = max_size
|
||||
if db_path is None:
|
||||
try:
|
||||
from hermes_cli.config import get_hermes_home
|
||||
db_path = str(get_hermes_home() / "response_store.db")
|
||||
except Exception:
|
||||
db_path = ":memory:"
|
||||
try:
|
||||
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
||||
except Exception:
|
||||
self._conn = sqlite3.connect(":memory:", check_same_thread=False)
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS responses (
|
||||
response_id TEXT PRIMARY KEY,
|
||||
data TEXT NOT NULL,
|
||||
accessed_at REAL NOT NULL
|
||||
)"""
|
||||
)
|
||||
self._conn.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS conversations (
|
||||
name TEXT PRIMARY KEY,
|
||||
response_id TEXT NOT NULL
|
||||
)"""
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def get(self, response_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve a stored response by ID (moves to end for LRU)."""
|
||||
if response_id in self._store:
|
||||
self._store.move_to_end(response_id)
|
||||
return self._store[response_id]
|
||||
return None
|
||||
"""Retrieve a stored response by ID (updates access time for LRU)."""
|
||||
row = self._conn.execute(
|
||||
"SELECT data FROM responses WHERE response_id = ?", (response_id,)
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
import time
|
||||
self._conn.execute(
|
||||
"UPDATE responses SET accessed_at = ? WHERE response_id = ?",
|
||||
(time.time(), response_id),
|
||||
)
|
||||
self._conn.commit()
|
||||
return json.loads(row[0])
|
||||
|
||||
def put(self, response_id: str, data: Dict[str, Any]) -> None:
|
||||
"""Store a response, evicting the oldest if at capacity."""
|
||||
if response_id in self._store:
|
||||
self._store.move_to_end(response_id)
|
||||
self._store[response_id] = data
|
||||
while len(self._store) > self._max_size:
|
||||
self._store.popitem(last=False)
|
||||
import time
|
||||
self._conn.execute(
|
||||
"INSERT OR REPLACE INTO responses (response_id, data, accessed_at) VALUES (?, ?, ?)",
|
||||
(response_id, json.dumps(data, default=str), time.time()),
|
||||
)
|
||||
# Evict oldest entries beyond max_size
|
||||
count = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0]
|
||||
if count > self._max_size:
|
||||
self._conn.execute(
|
||||
"DELETE FROM responses WHERE response_id IN "
|
||||
"(SELECT response_id FROM responses ORDER BY accessed_at ASC LIMIT ?)",
|
||||
(count - self._max_size,),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def delete(self, response_id: str) -> bool:
|
||||
"""Remove a response from the store. Returns True if found and deleted."""
|
||||
if response_id in self._store:
|
||||
del self._store[response_id]
|
||||
return True
|
||||
return False
|
||||
cursor = self._conn.execute(
|
||||
"DELETE FROM responses WHERE response_id = ?", (response_id,)
|
||||
)
|
||||
self._conn.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
def get_conversation(self, name: str) -> Optional[str]:
|
||||
"""Get the latest response_id for a conversation name."""
|
||||
row = self._conn.execute(
|
||||
"SELECT response_id FROM conversations WHERE name = ?", (name,)
|
||||
).fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
def set_conversation(self, name: str, response_id: str) -> None:
|
||||
"""Map a conversation name to its latest response_id."""
|
||||
self._conn.execute(
|
||||
"INSERT OR REPLACE INTO conversations (name, response_id) VALUES (?, ?)",
|
||||
(name, response_id),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the database connection."""
|
||||
try:
|
||||
self._conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._store)
|
||||
row = self._conn.execute("SELECT COUNT(*) FROM responses").fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -147,8 +215,6 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
self._runner: Optional["web.AppRunner"] = None
|
||||
self._site: Optional["web.TCPSite"] = None
|
||||
self._response_store = ResponseStore()
|
||||
# Conversation name → latest response_id mapping
|
||||
self._conversations: Dict[str, str] = {}
|
||||
|
||||
@staticmethod
|
||||
def _parse_cors_origins(value: Any) -> tuple[str, ...]:
|
||||
@@ -520,7 +586,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
|
||||
# Resolve conversation name to latest response_id
|
||||
if conversation:
|
||||
previous_response_id = self._conversations.get(conversation)
|
||||
previous_response_id = self._response_store.get_conversation(conversation)
|
||||
# No error if conversation doesn't exist yet — it's a new conversation
|
||||
|
||||
# Normalize input to message list
|
||||
@@ -643,7 +709,7 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
# Update conversation mapping so the next request with the same
|
||||
# conversation name automatically chains to this response
|
||||
if conversation:
|
||||
self._conversations[conversation] = response_id
|
||||
self._response_store.set_conversation(conversation, response_id)
|
||||
|
||||
return web.json_response(response_data)
|
||||
|
||||
|
||||
@@ -1295,7 +1295,7 @@ class TestConversationParameter:
|
||||
data = await resp.json()
|
||||
assert data["status"] == "completed"
|
||||
# Conversation mapping should be set
|
||||
assert "my-chat" in adapter._conversations
|
||||
assert adapter._response_store.get_conversation("my-chat") is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversation_chains_automatically(self, adapter):
|
||||
@@ -1369,7 +1369,7 @@ class TestConversationParameter:
|
||||
await cli.post("/v1/responses", json={"input": "conv-b msg", "conversation": "conv-b"})
|
||||
|
||||
# They should have different response IDs in the mapping
|
||||
assert adapter._conversations["conv-a"] != adapter._conversations["conv-b"]
|
||||
assert adapter._response_store.get_conversation("conv-a") != adapter._response_store.get_conversation("conv-b")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversation_store_false_no_mapping(self, adapter):
|
||||
@@ -1388,4 +1388,4 @@ class TestConversationParameter:
|
||||
})
|
||||
assert resp.status == 200
|
||||
# Conversation mapping should NOT be set since store=false
|
||||
assert "ephemeral-chat" not in adapter._conversations
|
||||
assert adapter._response_store.get_conversation("ephemeral-chat") is None
|
||||
|
||||
Reference in New Issue
Block a user