From b81d49dc450a56791a4ba39401f785eaff573d59 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 26 Mar 2026 13:47:14 -0700 Subject: [PATCH] fix(state): SQLite concurrency hardening + session transcript integrity (#3249) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(session-db): survive CLI/gateway concurrent write contention Closes #3139 Three layered fixes for the scenario where CLI and gateway write to state.db concurrently, causing create_session() to fail with 'database is locked' and permanently disabling session_search on the gateway side. 1. Increase SQLite connection timeout: 10s -> 30s hermes_state.py: longer window for the WAL writer to finish a batch flush before the other process gives up entirely. 2. INSERT OR IGNORE in create_session hermes_state.py: prevents IntegrityError on duplicate session IDs (e.g. gateway restarts while CLI session is still alive). 3. Don't null out _session_db on create_session failure (main fix) run_agent.py: a transient lock at agent startup must not permanently disable session_search for the lifetime of that agent instance. _session_db now stays alive so subsequent flushes and searches work once the lock clears. 4. New ensure_session() helper + call it during flush hermes_state.py: INSERT OR IGNORE for a minimal session row. run_agent.py _flush_messages_to_session_db: calls ensure_session() before appending messages, so the FK constraint is satisfied even when create_session() failed at startup. No-op when the row exists. * fix(state): release lock between context queries in search_messages The context-window queries (one per FTS5 match) were running inside the same lock acquisition as the primary FTS5 query, holding the lock for O(N) sequential SQLite round-trips. Move per-match context fetches outside the outer lock block so each acquires the lock independently, keeping critical sections short and allowing other threads to interleave. * fix(session): prefer longer source in load_transcript to prevent legacy truncation When a long-lived session pre-dates SQLite storage (e.g. sessions created before the DB layer was introduced, or after a clean deployment that reset the DB), _flush_messages_to_session_db only writes the *new* messages from the current turn to SQLite — it skips messages already present in conversation_history, assuming they are already persisted. That assumption fails for legacy JSONL-only sessions: Turn N (first after DB migration): load_transcript(id) → SQLite: 0 → falls back to JSONL: 994 ✓ _flush_messages_to_session_db: skip first 994, write 2 new → SQLite: 2 Turn N+1: load_transcript(id) → SQLite: 2 → returns immediately ✗ Agent sees 2 messages of history instead of 996 The same pattern causes the reported symptom: session JSON truncated to 4 messages (_save_session_log writes agent.messages which only has 2 history + 2 new = 4). Fix: always load both sources and return whichever is longer. For a fully-migrated session SQLite will always be ≥ JSONL, so there is no regression. For a legacy session that hasn't been bootstrapped yet, JSONL wins and the full history is restored. Closes #3212 * test: add load_transcript source preference tests for #3212 Covers: JSONL longer returns JSONL, SQLite longer returns SQLite, SQLite empty falls back to JSONL, both empty returns empty, equal length prefers SQLite (richer reasoning fields). --------- Co-authored-by: Mibayy Co-authored-by: kewe63 Co-authored-by: Mibayy --- gateway/session.py | 62 ++++++++++++++--------- hermes_state.py | 42 +++++++++++++--- run_agent.py | 19 ++++++- tests/gateway/test_session.py | 94 +++++++++++++++++++++++++++++++++++ tests/test_hermes_state.py | 63 +++++++++++++++++++++++ 5 files changed, 247 insertions(+), 33 deletions(-) diff --git a/gateway/session.py b/gateway/session.py index 68bac4b8..b85ac3e3 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -974,35 +974,51 @@ class SessionStore: def load_transcript(self, session_id: str) -> List[Dict[str, Any]]: """Load all messages from a session's transcript.""" + db_messages = [] # Try SQLite first if self._db: try: - messages = self._db.get_messages_as_conversation(session_id) - if messages: - return messages + db_messages = self._db.get_messages_as_conversation(session_id) except Exception as e: logger.debug("Could not load messages from DB: %s", e) - - # Fall back to legacy JSONL + + # Load legacy JSONL transcript (may contain more history than SQLite + # for sessions created before the DB layer was introduced). transcript_path = self.get_transcript_path(session_id) - - if not transcript_path.exists(): - return [] - - messages = [] - with open(transcript_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - try: - messages.append(json.loads(line)) - except json.JSONDecodeError: - logger.warning( - "Skipping corrupt line in transcript %s: %s", - session_id, line[:120], - ) - - return messages + jsonl_messages = [] + if transcript_path.exists(): + with open(transcript_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + try: + jsonl_messages.append(json.loads(line)) + except json.JSONDecodeError: + logger.warning( + "Skipping corrupt line in transcript %s: %s", + session_id, line[:120], + ) + + # Prefer whichever source has more messages. + # + # Background: when a session pre-dates SQLite storage (or when the DB + # layer was added while a long-lived session was already active), the + # first post-migration turn writes only the *new* messages to SQLite + # (because _flush_messages_to_session_db skips messages already in + # conversation_history, assuming they're persisted). On the *next* + # turn load_transcript returns those few SQLite rows and ignores the + # full JSONL history — the model sees a context of 1-4 messages instead + # of hundreds. Using the longer source prevents this silent truncation. + if len(jsonl_messages) > len(db_messages): + if db_messages: + logger.debug( + "Session %s: JSONL has %d messages vs SQLite %d — " + "using JSONL (legacy session not yet fully migrated)", + session_id, len(jsonl_messages), len(db_messages), + ) + return jsonl_messages + + return db_messages def build_session_context( diff --git a/hermes_state.py b/hermes_state.py index eb62bc2b..3c06f101 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -124,7 +124,10 @@ class SessionDB: self._conn = sqlite3.connect( str(self.db_path), check_same_thread=False, - timeout=10.0, + # 30s gives the WAL writer (CLI or gateway) time to finish a batch + # flush before the concurrent reader/writer gives up. 10s was too + # short when the CLI is doing frequent memory flushes. + timeout=30.0, ) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA journal_mode=WAL") @@ -255,7 +258,7 @@ class SessionDB: """Create a new session record. Returns the session_id.""" with self._lock: self._conn.execute( - """INSERT INTO sessions (id, source, user_id, model, model_config, + """INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config, system_prompt, parent_session_id, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( @@ -351,6 +354,27 @@ class SessionDB: ) self._conn.commit() + def ensure_session( + self, + session_id: str, + source: str = "unknown", + model: str = None, + ) -> None: + """Ensure a session row exists, creating it with minimal metadata if absent. + + Used by _flush_messages_to_session_db to recover from a failed + create_session() call (e.g. transient SQLite lock at agent startup). + INSERT OR IGNORE is safe to call even when the row already exists. + """ + with self._lock: + self._conn.execute( + """INSERT OR IGNORE INTO sessions + (id, source, model, started_at) + VALUES (?, ?, ?, ?)""", + (session_id, source, model, time.time()), + ) + self._conn.commit() + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get a session by ID.""" with self._lock: @@ -862,9 +886,11 @@ class SessionDB: return [] matches = [dict(row) for row in cursor.fetchall()] - # Add surrounding context (1 message before + after each match) - for match in matches: - try: + # Add surrounding context (1 message before + after each match). + # Done outside the lock so we don't hold it across N sequential queries. + for match in matches: + try: + with self._lock: ctx_cursor = self._conn.execute( """SELECT role, content FROM messages WHERE session_id = ? AND id >= ? - 1 AND id <= ? + 1 @@ -875,9 +901,9 @@ class SessionDB: {"role": r["role"], "content": (r["content"] or "")[:200]} for r in ctx_cursor.fetchall() ] - match["context"] = context_msgs - except Exception: - match["context"] = [] + match["context"] = context_msgs + except Exception: + match["context"] = [] # Remove full content from result (snippet is enough, saves tokens) for match in matches: diff --git a/run_agent.py b/run_agent.py index 3a75d040..f2f02395 100644 --- a/run_agent.py +++ b/run_agent.py @@ -893,8 +893,15 @@ class AIAgent: user_id=None, ) except Exception as e: - logger.warning("Session DB create_session failed — messages will NOT be indexed: %s", e) - self._session_db = None # prevent silent data loss on every subsequent flush + # Transient SQLite lock contention (e.g. CLI and gateway writing + # concurrently) must NOT permanently disable session_search for + # this agent. Keep _session_db alive — subsequent message + # flushes and session_search calls will still work once the + # lock clears. The session row may be missing from the index + # for this run, but that is recoverable (flushes upsert rows). + logger.warning( + "Session DB create_session failed (session_search still available): %s", e + ) # In-memory todo list for task planning (one per agent/session) from tools.todo_tool import TodoStore @@ -1578,6 +1585,14 @@ class AIAgent: return self._apply_persist_user_message_override(messages) try: + # If create_session() failed at startup (e.g. transient lock), the + # session row may not exist yet. ensure_session() uses INSERT OR + # IGNORE so it is a no-op when the row is already there. + self._session_db.ensure_session( + self.session_id, + source=self.platform or "cli", + model=self.model, + ) start_idx = len(conversation_history) if conversation_history else 0 flush_from = max(start_idx, self._last_flushed_db_idx) for msg in messages[flush_from:]: diff --git a/tests/gateway/test_session.py b/tests/gateway/test_session.py index bf698cdd..f31a80c3 100644 --- a/tests/gateway/test_session.py +++ b/tests/gateway/test_session.py @@ -386,6 +386,100 @@ class TestLoadTranscriptCorruptLines: assert messages[1]["content"] == "b" +class TestLoadTranscriptPreferLongerSource: + """Regression: load_transcript must return whichever source (SQLite or JSONL) + has more messages to prevent silent truncation. GH-3212.""" + + @pytest.fixture() + def store_with_db(self, tmp_path): + """SessionStore with both SQLite and JSONL active.""" + from hermes_state import SessionDB + + config = GatewayConfig() + with patch("gateway.session.SessionStore._ensure_loaded"): + s = SessionStore(sessions_dir=tmp_path, config=config) + s._db = SessionDB(db_path=tmp_path / "state.db") + s._loaded = True + return s + + def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db): + """Legacy session: JSONL has full history, SQLite has only recent turn.""" + sid = "legacy_session" + store_with_db._db.create_session(session_id=sid, source="gateway", model="m") + # JSONL has 10 messages (legacy history — written before SQLite existed) + for i in range(10): + role = "user" if i % 2 == 0 else "assistant" + store_with_db.append_to_transcript( + sid, {"role": role, "content": f"msg-{i}"}, skip_db=True, + ) + # SQLite has only 2 messages (recent turn after migration) + store_with_db._db.append_message(session_id=sid, role="user", content="new-q") + store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a") + + result = store_with_db.load_transcript(sid) + assert len(result) == 10 + assert result[0]["content"] == "msg-0" + + def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db): + """Fully migrated session: SQLite has more (JSONL stopped growing).""" + sid = "migrated_session" + store_with_db._db.create_session(session_id=sid, source="gateway", model="m") + # JSONL has 2 old messages + store_with_db.append_to_transcript( + sid, {"role": "user", "content": "old-q"}, skip_db=True, + ) + store_with_db.append_to_transcript( + sid, {"role": "assistant", "content": "old-a"}, skip_db=True, + ) + # SQLite has 4 messages (superset after migration) + for i in range(4): + role = "user" if i % 2 == 0 else "assistant" + store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}") + + result = store_with_db.load_transcript(sid) + assert len(result) == 4 + assert result[0]["content"] == "db-0" + + def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db): + """No SQLite rows — falls back to JSONL (original behavior preserved).""" + sid = "no_db_rows" + store_with_db.append_to_transcript( + sid, {"role": "user", "content": "hello"}, skip_db=True, + ) + store_with_db.append_to_transcript( + sid, {"role": "assistant", "content": "hi"}, skip_db=True, + ) + + result = store_with_db.load_transcript(sid) + assert len(result) == 2 + assert result[0]["content"] == "hello" + + def test_both_empty_returns_empty(self, store_with_db): + """Neither source has data — returns empty list.""" + result = store_with_db.load_transcript("nonexistent") + assert result == [] + + def test_equal_length_prefers_sqlite(self, store_with_db): + """When both have same count, SQLite wins (has richer fields like reasoning).""" + sid = "equal_session" + store_with_db._db.create_session(session_id=sid, source="gateway", model="m") + # Write 2 messages to JSONL only + store_with_db.append_to_transcript( + sid, {"role": "user", "content": "jsonl-q"}, skip_db=True, + ) + store_with_db.append_to_transcript( + sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True, + ) + # Write 2 different messages to SQLite only + store_with_db._db.append_message(session_id=sid, role="user", content="db-q") + store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a") + + result = store_with_db.load_transcript(sid) + assert len(result) == 2 + # Should be the SQLite version (equal count → prefers SQLite) + assert result[0]["content"] == "db-q" + + class TestWhatsAppDMSessionKeyConsistency: """Regression: all session-key construction must go through build_session_key so DMs are isolated by chat_id across platforms.""" diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index 381bb9d1..d4712450 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -1116,3 +1116,66 @@ class TestResolveSessionByNameOrId: db.set_session_title("s1", "my project") result = db.resolve_session_by_title("my project") assert result == "s1" + + +# ========================================================================= +# Concurrent write safety / lock contention fixes (#3139) +# ========================================================================= + +class TestConcurrentWriteSafety: + def test_create_session_insert_or_ignore_is_idempotent(self, db): + """create_session with the same ID twice must not raise (INSERT OR IGNORE).""" + db.create_session(session_id="dup-1", source="cli", model="m") + # Second call should be silent — no IntegrityError + db.create_session(session_id="dup-1", source="gateway", model="m2") + session = db.get_session("dup-1") + # Row should exist (first write wins with OR IGNORE) + assert session is not None + assert session["source"] == "cli" + + def test_ensure_session_creates_missing_row(self, db): + """ensure_session must create a minimal row when the session doesn't exist.""" + assert db.get_session("orphan-session") is None + db.ensure_session("orphan-session", source="gateway", model="test-model") + row = db.get_session("orphan-session") + assert row is not None + assert row["source"] == "gateway" + assert row["model"] == "test-model" + + def test_ensure_session_is_idempotent(self, db): + """ensure_session on an existing row must be a no-op (no overwrite).""" + db.create_session(session_id="existing", source="cli", model="original-model") + db.ensure_session("existing", source="gateway", model="overwrite-model") + row = db.get_session("existing") + # First write wins — ensure_session must not overwrite + assert row["source"] == "cli" + assert row["model"] == "original-model" + + def test_ensure_session_allows_append_message_after_failed_create(self, db): + """Messages can be flushed even when create_session failed at startup. + + Simulates the #3139 scenario: create_session raises (lock), then + ensure_session is called during flush, then append_message succeeds. + """ + # Simulate failed create_session — row absent + db.ensure_session("late-session", source="gateway", model="gpt-4") + db.append_message( + session_id="late-session", + role="user", + content="hello after lock", + ) + msgs = db.get_messages("late-session") + assert len(msgs) == 1 + assert msgs[0]["content"] == "hello after lock" + + def test_sqlite_timeout_is_at_least_30s(self, db): + """Connection timeout should be >= 30s to survive CLI/gateway contention.""" + # Access the underlying connection timeout via sqlite3 introspection. + # There is no public API, so we check the kwarg via the module default. + import sqlite3 + import inspect + from hermes_state import SessionDB as _SessionDB + src = inspect.getsource(_SessionDB.__init__) + assert "30" in src, ( + "SQLite timeout should be at least 30s to handle CLI/gateway lock contention" + )