fix(honcho): isolate session routing for multi-user gateway (#1500)
Salvaged from PR #1470 by adavyas. Core fix: Honcho tool calls in a multi-session gateway could route to the wrong session because honcho_tools.py relied on process-global state. Now threads session context through the call chain: AIAgent._invoke_tool() → handle_function_call() → registry.dispatch() → handler **kw → _resolve_session_context() Changes: - Add _resolve_session_context() to prefer per-call context over globals - Plumb honcho_manager + honcho_session_key through handle_function_call - Add sync_honcho=False to run_conversation() for synthetic flush turns - Pass honcho_session_key through gateway memory flush lifecycle - Harden gateway PID detection when /proc cmdline is unreadable - Make interrupt test scripts import-safe for pytest-xdist - Wrap BibTeX examples in Jekyll raw blocks for docs build - Fix thread-order-dependent assertion in client lifecycle test - Expand Honcho docs: session isolation, lifecycle, routing internals Dropped from original PR: - Indentation change in _create_request_openai_client that would move client creation inside the lock (causes unnecessary contention) Co-authored-by: adavyas <adavyas@users.noreply.github.com>
This commit is contained in:
@@ -478,7 +478,11 @@ class GatewayRunner:
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
|
||||
def _flush_memories_for_session(self, old_session_id: str):
|
||||
def _flush_memories_for_session(
|
||||
self,
|
||||
old_session_id: str,
|
||||
honcho_session_key: Optional[str] = None,
|
||||
):
|
||||
"""Prompt the agent to save memories/skills before context is lost.
|
||||
|
||||
Synchronous worker — meant to be called via run_in_executor from
|
||||
@@ -506,6 +510,7 @@ class GatewayRunner:
|
||||
quiet_mode=True,
|
||||
enabled_toolsets=["memory", "skills"],
|
||||
session_id=old_session_id,
|
||||
honcho_session_key=honcho_session_key,
|
||||
)
|
||||
|
||||
# Build conversation history from transcript
|
||||
@@ -533,6 +538,7 @@ class GatewayRunner:
|
||||
tmp_agent.run_conversation(
|
||||
user_message=flush_prompt,
|
||||
conversation_history=msgs,
|
||||
sync_honcho=False,
|
||||
)
|
||||
logger.info("Pre-reset memory flush completed for session %s", old_session_id)
|
||||
# Flush any queued Honcho writes before the session is dropped
|
||||
@@ -544,10 +550,19 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.debug("Pre-reset memory flush failed for session %s: %s", old_session_id, e)
|
||||
|
||||
async def _async_flush_memories(self, old_session_id: str):
|
||||
async def _async_flush_memories(
|
||||
self,
|
||||
old_session_id: str,
|
||||
honcho_session_key: Optional[str] = None,
|
||||
):
|
||||
"""Run the sync memory flush in a thread pool so it won't block the event loop."""
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._flush_memories_for_session, old_session_id)
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
self._flush_memories_for_session,
|
||||
old_session_id,
|
||||
honcho_session_key,
|
||||
)
|
||||
|
||||
@property
|
||||
def should_exit_cleanly(self) -> bool:
|
||||
@@ -923,7 +938,7 @@ class GatewayRunner:
|
||||
entry.session_id, key,
|
||||
)
|
||||
try:
|
||||
await self._async_flush_memories(entry.session_id)
|
||||
await self._async_flush_memories(entry.session_id, key)
|
||||
self._shutdown_gateway_honcho(key)
|
||||
self.session_store._pre_flushed_sessions.add(entry.session_id)
|
||||
except Exception as e:
|
||||
@@ -1904,7 +1919,9 @@ class GatewayRunner:
|
||||
try:
|
||||
old_entry = self.session_store._entries.get(session_key)
|
||||
if old_entry:
|
||||
asyncio.create_task(self._async_flush_memories(old_entry.session_id))
|
||||
asyncio.create_task(
|
||||
self._async_flush_memories(old_entry.session_id, session_key)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Gateway memory flush on reset failed: %s", e)
|
||||
|
||||
@@ -3171,7 +3188,9 @@ class GatewayRunner:
|
||||
|
||||
# Flush memories for current session before switching
|
||||
try:
|
||||
asyncio.create_task(self._async_flush_memories(current_entry.session_id))
|
||||
asyncio.create_task(
|
||||
self._async_flush_memories(current_entry.session_id, session_key)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Memory flush on resume failed: %s", e)
|
||||
|
||||
|
||||
@@ -83,8 +83,7 @@ def _looks_like_gateway_process(pid: int) -> bool:
|
||||
"""Return True when the live PID still looks like the Hermes gateway."""
|
||||
cmdline = _read_process_cmdline(pid)
|
||||
if not cmdline:
|
||||
# If we cannot inspect the process, fall back to the liveness check.
|
||||
return True
|
||||
return False
|
||||
|
||||
patterns = (
|
||||
"hermes_cli.main gateway",
|
||||
@@ -94,6 +93,24 @@ def _looks_like_gateway_process(pid: int) -> bool:
|
||||
return any(pattern in cmdline for pattern in patterns)
|
||||
|
||||
|
||||
def _record_looks_like_gateway(record: dict[str, Any]) -> bool:
|
||||
"""Validate gateway identity from PID-file metadata when cmdline is unavailable."""
|
||||
if record.get("kind") != _GATEWAY_KIND:
|
||||
return False
|
||||
|
||||
argv = record.get("argv")
|
||||
if not isinstance(argv, list) or not argv:
|
||||
return False
|
||||
|
||||
cmdline = " ".join(str(part) for part in argv)
|
||||
patterns = (
|
||||
"hermes_cli.main gateway",
|
||||
"hermes gateway",
|
||||
"gateway/run.py",
|
||||
)
|
||||
return any(pattern in cmdline for pattern in patterns)
|
||||
|
||||
|
||||
def _build_pid_record() -> dict:
|
||||
return {
|
||||
"pid": os.getpid(),
|
||||
@@ -325,8 +342,9 @@ def get_running_pid() -> Optional[int]:
|
||||
return None
|
||||
|
||||
if not _looks_like_gateway_process(pid):
|
||||
remove_pid_file()
|
||||
return None
|
||||
if not _record_looks_like_gateway(record):
|
||||
remove_pid_file()
|
||||
return None
|
||||
|
||||
return pid
|
||||
|
||||
|
||||
Reference in New Issue
Block a user