fix(state): SQLite concurrency hardening + session transcript integrity (#3249)
* fix(session-db): survive CLI/gateway concurrent write contention Closes #3139 Three layered fixes for the scenario where CLI and gateway write to state.db concurrently, causing create_session() to fail with 'database is locked' and permanently disabling session_search on the gateway side. 1. Increase SQLite connection timeout: 10s -> 30s hermes_state.py: longer window for the WAL writer to finish a batch flush before the other process gives up entirely. 2. INSERT OR IGNORE in create_session hermes_state.py: prevents IntegrityError on duplicate session IDs (e.g. gateway restarts while CLI session is still alive). 3. Don't null out _session_db on create_session failure (main fix) run_agent.py: a transient lock at agent startup must not permanently disable session_search for the lifetime of that agent instance. _session_db now stays alive so subsequent flushes and searches work once the lock clears. 4. New ensure_session() helper + call it during flush hermes_state.py: INSERT OR IGNORE for a minimal session row. run_agent.py _flush_messages_to_session_db: calls ensure_session() before appending messages, so the FK constraint is satisfied even when create_session() failed at startup. No-op when the row exists. * fix(state): release lock between context queries in search_messages The context-window queries (one per FTS5 match) were running inside the same lock acquisition as the primary FTS5 query, holding the lock for O(N) sequential SQLite round-trips. Move per-match context fetches outside the outer lock block so each acquires the lock independently, keeping critical sections short and allowing other threads to interleave. * fix(session): prefer longer source in load_transcript to prevent legacy truncation When a long-lived session pre-dates SQLite storage (e.g. sessions created before the DB layer was introduced, or after a clean deployment that reset the DB), _flush_messages_to_session_db only writes the *new* messages from the current turn to SQLite — it skips messages already present in conversation_history, assuming they are already persisted. That assumption fails for legacy JSONL-only sessions: Turn N (first after DB migration): load_transcript(id) → SQLite: 0 → falls back to JSONL: 994 ✓ _flush_messages_to_session_db: skip first 994, write 2 new → SQLite: 2 Turn N+1: load_transcript(id) → SQLite: 2 → returns immediately ✗ Agent sees 2 messages of history instead of 996 The same pattern causes the reported symptom: session JSON truncated to 4 messages (_save_session_log writes agent.messages which only has 2 history + 2 new = 4). Fix: always load both sources and return whichever is longer. For a fully-migrated session SQLite will always be ≥ JSONL, so there is no regression. For a legacy session that hasn't been bootstrapped yet, JSONL wins and the full history is restored. Closes #3212 * test: add load_transcript source preference tests for #3212 Covers: JSONL longer returns JSONL, SQLite longer returns SQLite, SQLite empty falls back to JSONL, both empty returns empty, equal length prefers SQLite (richer reasoning fields). --------- Co-authored-by: Mibayy <mibayy@hermes.ai> Co-authored-by: kewe63 <kewe.3217@gmail.com> Co-authored-by: Mibayy <mibayy@users.noreply.github.com>
This commit is contained in:
@@ -974,35 +974,51 @@ class SessionStore:
|
||||
|
||||
def load_transcript(self, session_id: str) -> List[Dict[str, Any]]:
|
||||
"""Load all messages from a session's transcript."""
|
||||
db_messages = []
|
||||
# Try SQLite first
|
||||
if self._db:
|
||||
try:
|
||||
messages = self._db.get_messages_as_conversation(session_id)
|
||||
if messages:
|
||||
return messages
|
||||
db_messages = self._db.get_messages_as_conversation(session_id)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load messages from DB: %s", e)
|
||||
|
||||
# Fall back to legacy JSONL
|
||||
# Load legacy JSONL transcript (may contain more history than SQLite
|
||||
# for sessions created before the DB layer was introduced).
|
||||
transcript_path = self.get_transcript_path(session_id)
|
||||
|
||||
if not transcript_path.exists():
|
||||
return []
|
||||
|
||||
messages = []
|
||||
jsonl_messages = []
|
||||
if transcript_path.exists():
|
||||
with open(transcript_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
messages.append(json.loads(line))
|
||||
jsonl_messages.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
"Skipping corrupt line in transcript %s: %s",
|
||||
session_id, line[:120],
|
||||
)
|
||||
|
||||
return messages
|
||||
# Prefer whichever source has more messages.
|
||||
#
|
||||
# Background: when a session pre-dates SQLite storage (or when the DB
|
||||
# layer was added while a long-lived session was already active), the
|
||||
# first post-migration turn writes only the *new* messages to SQLite
|
||||
# (because _flush_messages_to_session_db skips messages already in
|
||||
# conversation_history, assuming they're persisted). On the *next*
|
||||
# turn load_transcript returns those few SQLite rows and ignores the
|
||||
# full JSONL history — the model sees a context of 1-4 messages instead
|
||||
# of hundreds. Using the longer source prevents this silent truncation.
|
||||
if len(jsonl_messages) > len(db_messages):
|
||||
if db_messages:
|
||||
logger.debug(
|
||||
"Session %s: JSONL has %d messages vs SQLite %d — "
|
||||
"using JSONL (legacy session not yet fully migrated)",
|
||||
session_id, len(jsonl_messages), len(db_messages),
|
||||
)
|
||||
return jsonl_messages
|
||||
|
||||
return db_messages
|
||||
|
||||
|
||||
def build_session_context(
|
||||
|
||||
@@ -124,7 +124,10 @@ class SessionDB:
|
||||
self._conn = sqlite3.connect(
|
||||
str(self.db_path),
|
||||
check_same_thread=False,
|
||||
timeout=10.0,
|
||||
# 30s gives the WAL writer (CLI or gateway) time to finish a batch
|
||||
# flush before the concurrent reader/writer gives up. 10s was too
|
||||
# short when the CLI is doing frequent memory flushes.
|
||||
timeout=30.0,
|
||||
)
|
||||
self._conn.row_factory = sqlite3.Row
|
||||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
@@ -255,7 +258,7 @@ class SessionDB:
|
||||
"""Create a new session record. Returns the session_id."""
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"""INSERT INTO sessions (id, source, user_id, model, model_config,
|
||||
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
|
||||
system_prompt, parent_session_id, started_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
@@ -351,6 +354,27 @@ class SessionDB:
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def ensure_session(
|
||||
self,
|
||||
session_id: str,
|
||||
source: str = "unknown",
|
||||
model: str = None,
|
||||
) -> None:
|
||||
"""Ensure a session row exists, creating it with minimal metadata if absent.
|
||||
|
||||
Used by _flush_messages_to_session_db to recover from a failed
|
||||
create_session() call (e.g. transient SQLite lock at agent startup).
|
||||
INSERT OR IGNORE is safe to call even when the row already exists.
|
||||
"""
|
||||
with self._lock:
|
||||
self._conn.execute(
|
||||
"""INSERT OR IGNORE INTO sessions
|
||||
(id, source, model, started_at)
|
||||
VALUES (?, ?, ?, ?)""",
|
||||
(session_id, source, model, time.time()),
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get a session by ID."""
|
||||
with self._lock:
|
||||
@@ -862,9 +886,11 @@ class SessionDB:
|
||||
return []
|
||||
matches = [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
# Add surrounding context (1 message before + after each match)
|
||||
# Add surrounding context (1 message before + after each match).
|
||||
# Done outside the lock so we don't hold it across N sequential queries.
|
||||
for match in matches:
|
||||
try:
|
||||
with self._lock:
|
||||
ctx_cursor = self._conn.execute(
|
||||
"""SELECT role, content FROM messages
|
||||
WHERE session_id = ? AND id >= ? - 1 AND id <= ? + 1
|
||||
|
||||
19
run_agent.py
19
run_agent.py
@@ -893,8 +893,15 @@ class AIAgent:
|
||||
user_id=None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Session DB create_session failed — messages will NOT be indexed: %s", e)
|
||||
self._session_db = None # prevent silent data loss on every subsequent flush
|
||||
# Transient SQLite lock contention (e.g. CLI and gateway writing
|
||||
# concurrently) must NOT permanently disable session_search for
|
||||
# this agent. Keep _session_db alive — subsequent message
|
||||
# flushes and session_search calls will still work once the
|
||||
# lock clears. The session row may be missing from the index
|
||||
# for this run, but that is recoverable (flushes upsert rows).
|
||||
logger.warning(
|
||||
"Session DB create_session failed (session_search still available): %s", e
|
||||
)
|
||||
|
||||
# In-memory todo list for task planning (one per agent/session)
|
||||
from tools.todo_tool import TodoStore
|
||||
@@ -1578,6 +1585,14 @@ class AIAgent:
|
||||
return
|
||||
self._apply_persist_user_message_override(messages)
|
||||
try:
|
||||
# If create_session() failed at startup (e.g. transient lock), the
|
||||
# session row may not exist yet. ensure_session() uses INSERT OR
|
||||
# IGNORE so it is a no-op when the row is already there.
|
||||
self._session_db.ensure_session(
|
||||
self.session_id,
|
||||
source=self.platform or "cli",
|
||||
model=self.model,
|
||||
)
|
||||
start_idx = len(conversation_history) if conversation_history else 0
|
||||
flush_from = max(start_idx, self._last_flushed_db_idx)
|
||||
for msg in messages[flush_from:]:
|
||||
|
||||
@@ -386,6 +386,100 @@ class TestLoadTranscriptCorruptLines:
|
||||
assert messages[1]["content"] == "b"
|
||||
|
||||
|
||||
class TestLoadTranscriptPreferLongerSource:
|
||||
"""Regression: load_transcript must return whichever source (SQLite or JSONL)
|
||||
has more messages to prevent silent truncation. GH-3212."""
|
||||
|
||||
@pytest.fixture()
|
||||
def store_with_db(self, tmp_path):
|
||||
"""SessionStore with both SQLite and JSONL active."""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
config = GatewayConfig()
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = SessionDB(db_path=tmp_path / "state.db")
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db):
|
||||
"""Legacy session: JSONL has full history, SQLite has only recent turn."""
|
||||
sid = "legacy_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 10 messages (legacy history — written before SQLite existed)
|
||||
for i in range(10):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": role, "content": f"msg-{i}"}, skip_db=True,
|
||||
)
|
||||
# SQLite has only 2 messages (recent turn after migration)
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="new-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 10
|
||||
assert result[0]["content"] == "msg-0"
|
||||
|
||||
def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db):
|
||||
"""Fully migrated session: SQLite has more (JSONL stopped growing)."""
|
||||
sid = "migrated_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# JSONL has 2 old messages
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "old-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "old-a"}, skip_db=True,
|
||||
)
|
||||
# SQLite has 4 messages (superset after migration)
|
||||
for i in range(4):
|
||||
role = "user" if i % 2 == 0 else "assistant"
|
||||
store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 4
|
||||
assert result[0]["content"] == "db-0"
|
||||
|
||||
def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db):
|
||||
"""No SQLite rows — falls back to JSONL (original behavior preserved)."""
|
||||
sid = "no_db_rows"
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "hello"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "hi"}, skip_db=True,
|
||||
)
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
assert result[0]["content"] == "hello"
|
||||
|
||||
def test_both_empty_returns_empty(self, store_with_db):
|
||||
"""Neither source has data — returns empty list."""
|
||||
result = store_with_db.load_transcript("nonexistent")
|
||||
assert result == []
|
||||
|
||||
def test_equal_length_prefers_sqlite(self, store_with_db):
|
||||
"""When both have same count, SQLite wins (has richer fields like reasoning)."""
|
||||
sid = "equal_session"
|
||||
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
|
||||
# Write 2 messages to JSONL only
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "user", "content": "jsonl-q"}, skip_db=True,
|
||||
)
|
||||
store_with_db.append_to_transcript(
|
||||
sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True,
|
||||
)
|
||||
# Write 2 different messages to SQLite only
|
||||
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
|
||||
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
|
||||
|
||||
result = store_with_db.load_transcript(sid)
|
||||
assert len(result) == 2
|
||||
# Should be the SQLite version (equal count → prefers SQLite)
|
||||
assert result[0]["content"] == "db-q"
|
||||
|
||||
|
||||
class TestWhatsAppDMSessionKeyConsistency:
|
||||
"""Regression: all session-key construction must go through build_session_key
|
||||
so DMs are isolated by chat_id across platforms."""
|
||||
|
||||
@@ -1116,3 +1116,66 @@ class TestResolveSessionByNameOrId:
|
||||
db.set_session_title("s1", "my project")
|
||||
result = db.resolve_session_by_title("my project")
|
||||
assert result == "s1"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Concurrent write safety / lock contention fixes (#3139)
|
||||
# =========================================================================
|
||||
|
||||
class TestConcurrentWriteSafety:
|
||||
def test_create_session_insert_or_ignore_is_idempotent(self, db):
|
||||
"""create_session with the same ID twice must not raise (INSERT OR IGNORE)."""
|
||||
db.create_session(session_id="dup-1", source="cli", model="m")
|
||||
# Second call should be silent — no IntegrityError
|
||||
db.create_session(session_id="dup-1", source="gateway", model="m2")
|
||||
session = db.get_session("dup-1")
|
||||
# Row should exist (first write wins with OR IGNORE)
|
||||
assert session is not None
|
||||
assert session["source"] == "cli"
|
||||
|
||||
def test_ensure_session_creates_missing_row(self, db):
|
||||
"""ensure_session must create a minimal row when the session doesn't exist."""
|
||||
assert db.get_session("orphan-session") is None
|
||||
db.ensure_session("orphan-session", source="gateway", model="test-model")
|
||||
row = db.get_session("orphan-session")
|
||||
assert row is not None
|
||||
assert row["source"] == "gateway"
|
||||
assert row["model"] == "test-model"
|
||||
|
||||
def test_ensure_session_is_idempotent(self, db):
|
||||
"""ensure_session on an existing row must be a no-op (no overwrite)."""
|
||||
db.create_session(session_id="existing", source="cli", model="original-model")
|
||||
db.ensure_session("existing", source="gateway", model="overwrite-model")
|
||||
row = db.get_session("existing")
|
||||
# First write wins — ensure_session must not overwrite
|
||||
assert row["source"] == "cli"
|
||||
assert row["model"] == "original-model"
|
||||
|
||||
def test_ensure_session_allows_append_message_after_failed_create(self, db):
|
||||
"""Messages can be flushed even when create_session failed at startup.
|
||||
|
||||
Simulates the #3139 scenario: create_session raises (lock), then
|
||||
ensure_session is called during flush, then append_message succeeds.
|
||||
"""
|
||||
# Simulate failed create_session — row absent
|
||||
db.ensure_session("late-session", source="gateway", model="gpt-4")
|
||||
db.append_message(
|
||||
session_id="late-session",
|
||||
role="user",
|
||||
content="hello after lock",
|
||||
)
|
||||
msgs = db.get_messages("late-session")
|
||||
assert len(msgs) == 1
|
||||
assert msgs[0]["content"] == "hello after lock"
|
||||
|
||||
def test_sqlite_timeout_is_at_least_30s(self, db):
|
||||
"""Connection timeout should be >= 30s to survive CLI/gateway contention."""
|
||||
# Access the underlying connection timeout via sqlite3 introspection.
|
||||
# There is no public API, so we check the kwarg via the module default.
|
||||
import sqlite3
|
||||
import inspect
|
||||
from hermes_state import SessionDB as _SessionDB
|
||||
src = inspect.getsource(_SessionDB.__init__)
|
||||
assert "30" in src, (
|
||||
"SQLite timeout should be at least 30s to handle CLI/gateway lock contention"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user