* fix: force-close TCP sockets on client cleanup, detect and recover dead connections When a provider drops connections mid-stream (e.g. OpenRouter outage), httpx's graceful close leaves sockets in CLOSE-WAIT indefinitely. These zombie connections accumulate and can prevent recovery without restarting. Changes: - _force_close_tcp_sockets: walks the httpx connection pool and issues socket.shutdown(SHUT_RDWR) + close() to force TCP RST on every socket when a client is closed, preventing CLOSE-WAIT accumulation - _cleanup_dead_connections: probes the primary client's pool for dead sockets (recv MSG_PEEK), rebuilds the client if any are found - Pre-turn health check at the start of each run_conversation call that auto-recovers with a user-facing status message - Primary client rebuild after stale stream detection to purge pool - User-facing messages on streaming connection failures: "Connection to provider dropped — Reconnecting (attempt 2/3)" "Connection failed after 3 attempts — try again in a moment" Made-with: Cursor * fix: pool entry missing base_url for openrouter, clean error messages - _resolve_runtime_from_pool_entry: add OPENROUTER_BASE_URL fallback when pool entry has no runtime_base_url (pool entries from auth.json credential_pool often omit base_url) - Replace Rich console.print for auth errors with plain print() to prevent ANSI escape code mangling through prompt_toolkit's stdout patch - Force-close TCP sockets on client cleanup to prevent CLOSE-WAIT accumulation after provider outages - Pre-turn dead connection detection with auto-recovery and user message - Primary client rebuild after stale stream detection - User-facing status messages on streaming connection failures/retries Made-with: Cursor * fix(gateway): persist memory flush state to prevent redundant re-flushes on restart The _session_expiry_watcher tracked flushed sessions in an in-memory set (_pre_flushed_sessions) that was lost on gateway restart. Expired sessions remained in sessions.json and were re-discovered every restart, causing redundant AIAgent runs that burned API credits and blocked the event loop. Fix: Add a memory_flushed boolean field to SessionEntry, persisted in sessions.json. The watcher sets it after a successful flush. On restart, the flag survives and the watcher skips already-flushed sessions. - Add memory_flushed field to SessionEntry with to_dict/from_dict support - Old sessions.json entries without the field default to False (backward compat) - Remove the ephemeral _pre_flushed_sessions set from SessionStore - Update tests: save/load roundtrip, legacy entry compat, auto-reset behavior
250 lines
8.9 KiB
Python
250 lines
8.9 KiB
Python
"""Tests for proactive memory flush on session expiry.
|
|
|
|
Verifies that:
|
|
1. _is_session_expired() works from a SessionEntry alone (no source needed)
|
|
2. The sync callback is no longer called in get_or_create_session
|
|
3. memory_flushed flag persists across save/load cycles (prevents restart re-flush)
|
|
4. The background watcher can detect expired sessions
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from gateway.config import Platform, GatewayConfig, SessionResetPolicy
|
|
from gateway.session import SessionSource, SessionStore, SessionEntry
|
|
|
|
|
|
@pytest.fixture()
|
|
def idle_store(tmp_path):
|
|
"""SessionStore with a 60-minute idle reset policy."""
|
|
config = GatewayConfig(
|
|
default_reset_policy=SessionResetPolicy(mode="idle", idle_minutes=60),
|
|
)
|
|
with patch("gateway.session.SessionStore._ensure_loaded"):
|
|
s = SessionStore(sessions_dir=tmp_path, config=config)
|
|
s._db = None
|
|
s._loaded = True
|
|
return s
|
|
|
|
|
|
@pytest.fixture()
|
|
def no_reset_store(tmp_path):
|
|
"""SessionStore with no reset policy (mode=none)."""
|
|
config = GatewayConfig(
|
|
default_reset_policy=SessionResetPolicy(mode="none"),
|
|
)
|
|
with patch("gateway.session.SessionStore._ensure_loaded"):
|
|
s = SessionStore(sessions_dir=tmp_path, config=config)
|
|
s._db = None
|
|
s._loaded = True
|
|
return s
|
|
|
|
|
|
class TestIsSessionExpired:
|
|
"""_is_session_expired should detect expiry from entry alone."""
|
|
|
|
def test_idle_session_expired(self, idle_store):
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm",
|
|
session_id="sid_1",
|
|
created_at=datetime.now() - timedelta(hours=3),
|
|
updated_at=datetime.now() - timedelta(minutes=120),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert idle_store._is_session_expired(entry) is True
|
|
|
|
def test_active_session_not_expired(self, idle_store):
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm",
|
|
session_id="sid_2",
|
|
created_at=datetime.now() - timedelta(hours=1),
|
|
updated_at=datetime.now() - timedelta(minutes=10),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert idle_store._is_session_expired(entry) is False
|
|
|
|
def test_none_mode_never_expires(self, no_reset_store):
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm",
|
|
session_id="sid_3",
|
|
created_at=datetime.now() - timedelta(days=30),
|
|
updated_at=datetime.now() - timedelta(days=30),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert no_reset_store._is_session_expired(entry) is False
|
|
|
|
def test_active_processes_prevent_expiry(self, idle_store):
|
|
"""Sessions with active background processes should never expire."""
|
|
idle_store._has_active_processes_fn = lambda key: True
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm",
|
|
session_id="sid_4",
|
|
created_at=datetime.now() - timedelta(hours=5),
|
|
updated_at=datetime.now() - timedelta(hours=5),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert idle_store._is_session_expired(entry) is False
|
|
|
|
def test_daily_mode_expired(self, tmp_path):
|
|
"""Daily mode should expire sessions from before today's reset hour."""
|
|
config = GatewayConfig(
|
|
default_reset_policy=SessionResetPolicy(mode="daily", at_hour=4),
|
|
)
|
|
with patch("gateway.session.SessionStore._ensure_loaded"):
|
|
store = SessionStore(sessions_dir=tmp_path, config=config)
|
|
store._db = None
|
|
store._loaded = True
|
|
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm",
|
|
session_id="sid_5",
|
|
created_at=datetime.now() - timedelta(days=2),
|
|
updated_at=datetime.now() - timedelta(days=2),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert store._is_session_expired(entry) is True
|
|
|
|
|
|
class TestGetOrCreateSessionNoCallback:
|
|
"""get_or_create_session should NOT call a sync flush callback."""
|
|
|
|
def test_auto_reset_creates_new_session_after_flush(self, idle_store):
|
|
"""When a flushed session auto-resets, a new session_id is created."""
|
|
source = SessionSource(
|
|
platform=Platform.TELEGRAM,
|
|
chat_id="123",
|
|
chat_type="dm",
|
|
)
|
|
# Create initial session
|
|
entry1 = idle_store.get_or_create_session(source)
|
|
old_sid = entry1.session_id
|
|
|
|
# Simulate the watcher having flushed it
|
|
entry1.memory_flushed = True
|
|
|
|
# Simulate the session going idle
|
|
entry1.updated_at = datetime.now() - timedelta(minutes=120)
|
|
idle_store._save()
|
|
|
|
# Next call should auto-reset
|
|
entry2 = idle_store.get_or_create_session(source)
|
|
assert entry2.session_id != old_sid
|
|
assert entry2.was_auto_reset is True
|
|
# New session starts with memory_flushed=False
|
|
assert entry2.memory_flushed is False
|
|
|
|
def test_no_sync_callback_invoked(self, idle_store):
|
|
"""No synchronous callback should block during auto-reset."""
|
|
source = SessionSource(
|
|
platform=Platform.TELEGRAM,
|
|
chat_id="123",
|
|
chat_type="dm",
|
|
)
|
|
entry1 = idle_store.get_or_create_session(source)
|
|
entry1.updated_at = datetime.now() - timedelta(minutes=120)
|
|
idle_store._save()
|
|
|
|
# Verify no _on_auto_reset attribute
|
|
assert not hasattr(idle_store, '_on_auto_reset')
|
|
|
|
# This should NOT block (no sync LLM call)
|
|
entry2 = idle_store.get_or_create_session(source)
|
|
assert entry2.was_auto_reset is True
|
|
|
|
|
|
class TestMemoryFlushedFlag:
|
|
"""The memory_flushed flag on SessionEntry prevents double-flushing."""
|
|
|
|
def test_defaults_to_false(self):
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm:123",
|
|
session_id="sid_new",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
assert entry.memory_flushed is False
|
|
|
|
def test_persists_through_save_load(self, idle_store):
|
|
"""memory_flushed=True must survive a save/load cycle (simulates restart)."""
|
|
key = "agent:main:discord:thread:789"
|
|
entry = SessionEntry(
|
|
session_key=key,
|
|
session_id="sid_flushed",
|
|
created_at=datetime.now() - timedelta(hours=5),
|
|
updated_at=datetime.now() - timedelta(hours=5),
|
|
platform=Platform.DISCORD,
|
|
chat_type="thread",
|
|
memory_flushed=True,
|
|
)
|
|
idle_store._entries[key] = entry
|
|
idle_store._save()
|
|
|
|
# Simulate restart: clear in-memory state, reload from disk
|
|
idle_store._entries.clear()
|
|
idle_store._loaded = False
|
|
idle_store._ensure_loaded()
|
|
|
|
reloaded = idle_store._entries[key]
|
|
assert reloaded.memory_flushed is True
|
|
|
|
def test_unflushed_entry_survives_restart_as_unflushed(self, idle_store):
|
|
"""An entry without memory_flushed stays False after reload."""
|
|
key = "agent:main:telegram:dm:456"
|
|
entry = SessionEntry(
|
|
session_key=key,
|
|
session_id="sid_not_flushed",
|
|
created_at=datetime.now() - timedelta(hours=2),
|
|
updated_at=datetime.now() - timedelta(hours=2),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
idle_store._entries[key] = entry
|
|
idle_store._save()
|
|
|
|
idle_store._entries.clear()
|
|
idle_store._loaded = False
|
|
idle_store._ensure_loaded()
|
|
|
|
reloaded = idle_store._entries[key]
|
|
assert reloaded.memory_flushed is False
|
|
|
|
def test_roundtrip_to_dict_from_dict(self):
|
|
"""to_dict/from_dict must preserve memory_flushed."""
|
|
entry = SessionEntry(
|
|
session_key="agent:main:telegram:dm:999",
|
|
session_id="sid_rt",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
memory_flushed=True,
|
|
)
|
|
d = entry.to_dict()
|
|
assert d["memory_flushed"] is True
|
|
|
|
restored = SessionEntry.from_dict(d)
|
|
assert restored.memory_flushed is True
|
|
|
|
def test_legacy_entry_without_field_defaults_false(self):
|
|
"""Old sessions.json entries missing memory_flushed should default to False."""
|
|
data = {
|
|
"session_key": "agent:main:telegram:dm:legacy",
|
|
"session_id": "sid_legacy",
|
|
"created_at": datetime.now().isoformat(),
|
|
"updated_at": datetime.now().isoformat(),
|
|
"platform": "telegram",
|
|
"chat_type": "dm",
|
|
# no memory_flushed key
|
|
}
|
|
entry = SessionEntry.from_dict(data)
|
|
assert entry.memory_flushed is False
|