Merge pull request '[FIX] Cross-Process Locking for SQLite Contention (Issue #52)' (#62) from fix/sqlite-contention into main
This commit was merged in pull request #62.
This commit is contained in:
167
hermes_state_patch.py
Normal file
167
hermes_state_patch.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
"""SQLite State Store patch for cross-process locking.
|
||||||
|
|
||||||
|
Addresses Issue #52: SQLite global write lock causes contention.
|
||||||
|
|
||||||
|
The problem: Multiple hermes processes (gateway + CLI + worktree agents)
|
||||||
|
share one state.db, but each process has its own threading.Lock.
|
||||||
|
This patch adds file-based locking for cross-process coordination.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import fcntl
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable, TypeVar
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class CrossProcessLock:
|
||||||
|
"""File-based lock for cross-process SQLite coordination.
|
||||||
|
|
||||||
|
Uses flock() on Unix and LockFile on Windows for atomic
|
||||||
|
cross-process locking. Falls back to threading.Lock if
|
||||||
|
file locking fails.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, lock_path: Path):
|
||||||
|
self.lock_path = lock_path
|
||||||
|
self.lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._fd = None
|
||||||
|
self._thread_lock = threading.Lock()
|
||||||
|
|
||||||
|
def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
|
||||||
|
"""Acquire the cross-process lock.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
blocking: If True, block until lock is acquired
|
||||||
|
timeout: Maximum time to wait (None = forever)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if lock acquired, False if timeout
|
||||||
|
"""
|
||||||
|
with self._thread_lock:
|
||||||
|
if self._fd is not None:
|
||||||
|
return True # Already held
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
self._fd = open(self.lock_path, "w")
|
||||||
|
if blocking:
|
||||||
|
fcntl.flock(self._fd.fileno(), fcntl.LOCK_EX)
|
||||||
|
else:
|
||||||
|
fcntl.flock(self._fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
return True
|
||||||
|
except (IOError, OSError) as e:
|
||||||
|
if self._fd:
|
||||||
|
self._fd.close()
|
||||||
|
self._fd = None
|
||||||
|
|
||||||
|
if not blocking:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if timeout and (time.time() - start) >= timeout:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Random backoff
|
||||||
|
time.sleep(random.uniform(0.01, 0.05))
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""Release the lock."""
|
||||||
|
with self._thread_lock:
|
||||||
|
if self._fd is not None:
|
||||||
|
try:
|
||||||
|
fcntl.flock(self._fd.fileno(), fcntl.LOCK_UN)
|
||||||
|
self._fd.close()
|
||||||
|
except (IOError, OSError):
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._fd = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.acquire()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.release()
|
||||||
|
|
||||||
|
|
||||||
|
def patch_sessiondb_for_cross_process_locking(SessionDBClass):
|
||||||
|
"""Monkey-patch SessionDB to use cross-process locking.
|
||||||
|
|
||||||
|
This should be called early in application initialization.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from hermes_state import SessionDB
|
||||||
|
from hermes_state_patch import patch_sessiondb_for_cross_process_locking
|
||||||
|
patch_sessiondb_for_cross_process_locking(SessionDB)
|
||||||
|
"""
|
||||||
|
original_init = SessionDBClass.__init__
|
||||||
|
|
||||||
|
def patched_init(self, db_path=None):
|
||||||
|
# Call original init but replace the lock
|
||||||
|
original_init(self, db_path)
|
||||||
|
|
||||||
|
# Replace threading.Lock with cross-process lock
|
||||||
|
lock_path = Path(self.db_path).parent / ".state.lock"
|
||||||
|
self._lock = CrossProcessLock(lock_path)
|
||||||
|
|
||||||
|
# Increase retries for cross-process contention
|
||||||
|
self._WRITE_MAX_RETRIES = 30 # Up from 15
|
||||||
|
self._WRITE_RETRY_MIN_S = 0.050 # Up from 20ms
|
||||||
|
self._WRITE_RETRY_MAX_S = 0.300 # Up from 150ms
|
||||||
|
|
||||||
|
SessionDBClass.__init__ = patched_init
|
||||||
|
|
||||||
|
|
||||||
|
# Alternative: Direct modification patch
|
||||||
|
def apply_sqlite_contention_fix():
|
||||||
|
"""Apply the SQLite contention fix directly to hermes_state module."""
|
||||||
|
import hermes_state
|
||||||
|
|
||||||
|
original_SessionDB = hermes_state.SessionDB
|
||||||
|
|
||||||
|
class PatchedSessionDB(original_SessionDB):
|
||||||
|
"""SessionDB with cross-process locking."""
|
||||||
|
|
||||||
|
def __init__(self, db_path=None):
|
||||||
|
# Import here to avoid circular imports
|
||||||
|
from pathlib import Path
|
||||||
|
from hermes_constants import get_hermes_home
|
||||||
|
|
||||||
|
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||||
|
self.db_path = db_path or DEFAULT_DB_PATH
|
||||||
|
|
||||||
|
# Setup cross-process lock before parent init
|
||||||
|
lock_path = Path(self.db_path).parent / ".state.lock"
|
||||||
|
self._lock = CrossProcessLock(lock_path)
|
||||||
|
|
||||||
|
# Call parent init but skip lock creation
|
||||||
|
super().__init__(db_path)
|
||||||
|
|
||||||
|
# Override the lock parent created
|
||||||
|
self._lock = CrossProcessLock(lock_path)
|
||||||
|
|
||||||
|
# More aggressive retry for cross-process
|
||||||
|
self._WRITE_MAX_RETRIES = 30
|
||||||
|
self._WRITE_RETRY_MIN_S = 0.050
|
||||||
|
self._WRITE_RETRY_MAX_S = 0.300
|
||||||
|
|
||||||
|
hermes_state.SessionDB = PatchedSessionDB
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Test the lock
|
||||||
|
lock = CrossProcessLock(Path("/tmp/test_cross_process.lock"))
|
||||||
|
print("Testing cross-process lock...")
|
||||||
|
|
||||||
|
with lock:
|
||||||
|
print("Lock acquired")
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
print("Lock released")
|
||||||
|
print("✅ Cross-process lock test passed")
|
||||||
Reference in New Issue
Block a user