fix: SQLite WAL write-lock contention causing 15-20s TUI freeze (#3385)

Multiple hermes processes (gateway + CLI sessions + worktree agents) sharing
one state.db caused WAL write-lock convoy effects. SQLite's built-in busy
handler uses deterministic sleep intervals (up to 100ms) that synchronize
competing writers, creating 15-20 second freezes during agent init.

Root cause: timeout=30.0 with 7+ concurrent connections meant:
- WAL never checkpointed (294MB, readers always blocked it)
- Bloated WAL slowed all reads and writes
- Deterministic backoff caused convoy effects under contention

Fix:
- Replace 30s SQLite timeout with 1s + app-level retry (15 attempts,
  random 20-150ms jitter between retries to break convoys)
- Use BEGIN IMMEDIATE for explicit write-lock acquisition (fail fast)
- Set isolation_level=None for manual transaction control
- PASSIVE WAL checkpoint on close() and every 50 writes
- All 12 write methods converted to _execute_write() helper

Before: 15-20s frozen at create_session during agent init
After:  <1s to API call, WAL stays at ~4MB

Tested: 4355 tests pass, 3 concurrent live sessions with simultaneous
writes showed zero contention on every py-spy sample.
This commit is contained in:
Teknium
2026-03-27 05:22:57 -07:00
committed by GitHub
parent 915df02bbf
commit b7bcae49c6

View File

@@ -15,15 +15,20 @@ Key design decisions:
"""
import json
import logging
import os
import random
import re
import sqlite3
import threading
import time
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Dict, Any, List, Optional
from typing import Any, Callable, Dict, List, Optional, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
@@ -116,18 +121,38 @@ class SessionDB:
single writer via WAL mode). Each method opens its own cursor.
"""
# ── Write-contention tuning ──
# With multiple hermes processes (gateway + CLI sessions + worktree agents)
# all sharing one state.db, WAL write-lock contention causes visible TUI
# freezes. SQLite's built-in busy handler uses a deterministic sleep
# schedule that causes convoy effects under high concurrency.
#
# Instead, we keep the SQLite timeout short (1s) and handle retries at the
# application level with random jitter, which naturally staggers competing
# writers and avoids the convoy.
_WRITE_MAX_RETRIES = 15
_WRITE_RETRY_MIN_S = 0.020 # 20ms
_WRITE_RETRY_MAX_S = 0.150 # 150ms
# Attempt a PASSIVE WAL checkpoint every N successful writes.
_CHECKPOINT_EVERY_N_WRITES = 50
def __init__(self, db_path: Path = None):
self.db_path = db_path or DEFAULT_DB_PATH
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._write_count = 0
self._conn = sqlite3.connect(
str(self.db_path),
check_same_thread=False,
# 30s gives the WAL writer (CLI or gateway) time to finish a batch
# flush before the concurrent reader/writer gives up. 10s was too
# short when the CLI is doing frequent memory flushes.
timeout=30.0,
# Short timeout — application-level retry with random jitter
# handles contention instead of sitting in SQLite's internal
# busy handler for up to 30s.
timeout=1.0,
# Autocommit mode: Python's default isolation_level="" auto-starts
# transactions on DML, which conflicts with our explicit
# BEGIN IMMEDIATE. None = we manage transactions ourselves.
isolation_level=None,
)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
@@ -135,6 +160,96 @@ class SessionDB:
self._init_schema()
# ── Core write helper ──
def _execute_write(self, fn: Callable[[sqlite3.Connection], T]) -> T:
"""Execute a write transaction with BEGIN IMMEDIATE and jitter retry.
*fn* receives the connection and should perform INSERT/UPDATE/DELETE
statements. The caller must NOT call ``commit()`` — that's handled
here after *fn* returns.
BEGIN IMMEDIATE acquires the WAL write lock at transaction start
(not at commit time), so lock contention surfaces immediately.
On ``database is locked``, we release the Python lock, sleep a
random 20-150ms, and retry — breaking the convoy pattern that
SQLite's built-in deterministic backoff creates.
Returns whatever *fn* returns.
"""
last_err: Optional[Exception] = None
for attempt in range(self._WRITE_MAX_RETRIES):
try:
with self._lock:
self._conn.execute("BEGIN IMMEDIATE")
try:
result = fn(self._conn)
self._conn.commit()
except BaseException:
try:
self._conn.rollback()
except Exception:
pass
raise
# Success — periodic best-effort checkpoint.
self._write_count += 1
if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
self._try_wal_checkpoint()
return result
except sqlite3.OperationalError as exc:
err_msg = str(exc).lower()
if "locked" in err_msg or "busy" in err_msg:
last_err = exc
if attempt < self._WRITE_MAX_RETRIES - 1:
jitter = random.uniform(
self._WRITE_RETRY_MIN_S,
self._WRITE_RETRY_MAX_S,
)
time.sleep(jitter)
continue
# Non-lock error or retries exhausted — propagate.
raise
# Retries exhausted (shouldn't normally reach here).
raise last_err or sqlite3.OperationalError(
"database is locked after max retries"
)
def _try_wal_checkpoint(self) -> None:
"""Best-effort PASSIVE WAL checkpoint. Never blocks, never raises.
Flushes committed WAL frames back into the main DB file for any
frames that no other connection currently needs. Keeps the WAL
from growing unbounded when many processes hold persistent
connections.
"""
try:
with self._lock:
result = self._conn.execute(
"PRAGMA wal_checkpoint(PASSIVE)"
).fetchone()
if result and result[1] > 0:
logger.debug(
"WAL checkpoint: %d/%d pages checkpointed",
result[2], result[1],
)
except Exception:
pass # Best effort — never fatal.
def close(self):
"""Close the database connection.
Attempts a PASSIVE WAL checkpoint first so that exiting processes
help keep the WAL file from growing unbounded.
"""
with self._lock:
if self._conn:
try:
self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)")
except Exception:
pass
self._conn.close()
self._conn = None
def _init_schema(self):
"""Create tables and FTS if they don't exist, run migrations."""
cursor = self._conn.cursor()
@@ -256,8 +371,8 @@ class SessionDB:
parent_session_id: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
@@ -272,35 +387,35 @@ class SessionDB:
time.time(),
),
)
self._conn.commit()
self._execute_write(_do)
return session_id
def end_session(self, session_id: str, end_reason: str) -> None:
"""Mark a session as ended."""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?",
(time.time(), end_reason, session_id),
)
self._conn.commit()
self._execute_write(_do)
def reopen_session(self, session_id: str) -> None:
"""Clear ended_at/end_reason so a session can be resumed."""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?",
(session_id,),
)
self._conn.commit()
self._execute_write(_do)
def update_system_prompt(self, session_id: str, system_prompt: str) -> None:
"""Store the full assembled system prompt snapshot."""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"UPDATE sessions SET system_prompt = ? WHERE id = ?",
(system_prompt, session_id),
)
self._conn.commit()
self._execute_write(_do)
def update_token_counts(
self,
@@ -370,29 +485,27 @@ class SessionDB:
billing_mode = COALESCE(billing_mode, ?),
model = COALESCE(model, ?)
WHERE id = ?"""
with self._lock:
self._conn.execute(
sql,
(
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
reasoning_tokens,
estimated_cost_usd,
actual_cost_usd,
actual_cost_usd,
cost_status,
cost_source,
pricing_version,
billing_provider,
billing_base_url,
billing_mode,
model,
session_id,
),
)
self._conn.commit()
params = (
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
reasoning_tokens,
estimated_cost_usd,
actual_cost_usd,
actual_cost_usd,
cost_status,
cost_source,
pricing_version,
billing_provider,
billing_base_url,
billing_mode,
model,
session_id,
)
def _do(conn):
conn.execute(sql, params)
self._execute_write(_do)
def ensure_session(
self,
@@ -406,14 +519,14 @@ class SessionDB:
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
"""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._conn.commit()
self._execute_write(_do)
def set_token_counts(
self,
@@ -439,8 +552,8 @@ class SessionDB:
conversation run (e.g. the gateway, where the cached agent's
session_prompt_tokens already reflects the running total).
"""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"""UPDATE sessions SET
input_tokens = ?,
output_tokens = ?,
@@ -479,7 +592,7 @@ class SessionDB:
session_id,
),
)
self._conn.commit()
self._execute_write(_do)
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get a session by ID."""
@@ -573,10 +686,10 @@ class SessionDB:
Empty/whitespace-only strings are normalized to None (clearing the title).
"""
title = self.sanitize_title(title)
with self._lock:
def _do(conn):
if title:
# Check uniqueness (allow the same session to keep its own title)
cursor = self._conn.execute(
cursor = conn.execute(
"SELECT id FROM sessions WHERE title = ? AND id != ?",
(title, session_id),
)
@@ -585,12 +698,12 @@ class SessionDB:
raise ValueError(
f"Title '{title}' is already in use by session {conflict['id']}"
)
cursor = self._conn.execute(
cursor = conn.execute(
"UPDATE sessions SET title = ? WHERE id = ?",
(title, session_id),
)
self._conn.commit()
rowcount = cursor.rowcount
return cursor.rowcount
rowcount = self._execute_write(_do)
return rowcount > 0
def get_session_title(self, session_id: str) -> Optional[str]:
@@ -762,17 +875,24 @@ class SessionDB:
Also increments the session's message_count (and tool_call_count
if role is 'tool' or tool_calls is present).
"""
with self._lock:
# Serialize structured fields to JSON for storage
reasoning_details_json = (
json.dumps(reasoning_details)
if reasoning_details else None
)
codex_items_json = (
json.dumps(codex_reasoning_items)
if codex_reasoning_items else None
)
cursor = self._conn.execute(
# Serialize structured fields to JSON before entering the write txn
reasoning_details_json = (
json.dumps(reasoning_details)
if reasoning_details else None
)
codex_items_json = (
json.dumps(codex_reasoning_items)
if codex_reasoning_items else None
)
tool_calls_json = json.dumps(tool_calls) if tool_calls else None
# Pre-compute tool call count
num_tool_calls = 0
if tool_calls is not None:
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
def _do(conn):
cursor = conn.execute(
"""INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_details, codex_reasoning_items)
@@ -782,7 +902,7 @@ class SessionDB:
role,
content,
tool_call_id,
json.dumps(tool_calls) if tool_calls else None,
tool_calls_json,
tool_name,
time.time(),
token_count,
@@ -795,25 +915,20 @@ class SessionDB:
msg_id = cursor.lastrowid
# Update counters
# Count actual tool calls from the tool_calls list (not from tool responses).
# A single assistant message can contain multiple parallel tool calls.
num_tool_calls = 0
if tool_calls is not None:
num_tool_calls = len(tool_calls) if isinstance(tool_calls, list) else 1
if num_tool_calls > 0:
self._conn.execute(
conn.execute(
"""UPDATE sessions SET message_count = message_count + 1,
tool_call_count = tool_call_count + ? WHERE id = ?""",
(num_tool_calls, session_id),
)
else:
self._conn.execute(
conn.execute(
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
(session_id,),
)
return msg_id
self._conn.commit()
return msg_id
return self._execute_write(_do)
def get_messages(self, session_id: str) -> List[Dict[str, Any]]:
"""Load all messages for a session, ordered by timestamp."""
@@ -1107,54 +1222,53 @@ class SessionDB:
def clear_messages(self, session_id: str) -> None:
"""Delete all messages for a session and reset its counters."""
with self._lock:
self._conn.execute(
def _do(conn):
conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
self._conn.execute(
conn.execute(
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
(session_id,),
)
self._conn.commit()
self._execute_write(_do)
def delete_session(self, session_id: str) -> bool:
"""Delete a session and all its messages. Returns True if found."""
with self._lock:
cursor = self._conn.execute(
def _do(conn):
cursor = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
)
if cursor.fetchone()[0] == 0:
return False
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
self._conn.commit()
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
return True
return self._execute_write(_do)
def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int:
"""
Delete sessions older than N days. Returns count of deleted sessions.
Only prunes ended sessions (not active ones).
"""
import time as _time
cutoff = _time.time() - (older_than_days * 86400)
cutoff = time.time() - (older_than_days * 86400)
with self._lock:
def _do(conn):
if source:
cursor = self._conn.execute(
cursor = conn.execute(
"""SELECT id FROM sessions
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
cursor = conn.execute(
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
(cutoff,),
)
session_ids = [row["id"] for row in cursor.fetchall()]
for sid in session_ids:
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return len(session_ids)
self._conn.commit()
return len(session_ids)
return self._execute_write(_do)