[SYNC] Merge upstream NousResearch/hermes-agent — 499 commits #201
@@ -684,6 +684,13 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
if isinstance(resp, nio.SyncError):
|
||||
if self._closing:
|
||||
return
|
||||
err_msg = str(getattr(resp, "message", resp)).lower()
|
||||
if "m_unknown_token" in err_msg or "m_forbidden" in err_msg or "401" in err_msg:
|
||||
logger.error(
|
||||
"Matrix: permanent auth error from sync: %s — stopping sync",
|
||||
getattr(resp, "message", resp),
|
||||
)
|
||||
return
|
||||
logger.warning(
|
||||
"Matrix: sync returned %s: %s — retrying in 5s",
|
||||
type(resp).__name__,
|
||||
@@ -698,6 +705,12 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
except Exception as exc:
|
||||
if self._closing:
|
||||
return
|
||||
# Detect permanent auth/permission failures that will never
|
||||
# succeed on retry — stop syncing instead of looping forever.
|
||||
err_str = str(exc).lower()
|
||||
if "401" in err_str or "403" in err_str or "unauthorized" in err_str or "forbidden" in err_str:
|
||||
logger.error("Matrix: permanent auth error: %s — stopping sync", exc)
|
||||
return
|
||||
logger.warning("Matrix: sync error: %s — retrying in 5s", exc)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
@@ -513,6 +513,16 @@ class MattermostAdapter(BasePlatformAdapter):
|
||||
except Exception as exc:
|
||||
if self._closing:
|
||||
return
|
||||
# Detect permanent auth/permission failures that will never
|
||||
# succeed on retry — stop reconnecting instead of looping forever.
|
||||
import aiohttp
|
||||
err_str = str(exc).lower()
|
||||
if isinstance(exc, aiohttp.WSServerHandshakeError) and exc.status in (401, 403):
|
||||
logger.error("Mattermost WS auth failed (HTTP %d) — stopping reconnect", exc.status)
|
||||
return
|
||||
if "401" in err_str or "403" in err_str or "unauthorized" in err_str:
|
||||
logger.error("Mattermost WS permanent error: %s — stopping reconnect", exc)
|
||||
return
|
||||
logger.warning("Mattermost WS error: %s — reconnecting in %.0fs", exc, delay)
|
||||
|
||||
if self._closing:
|
||||
|
||||
216
tests/gateway/test_ws_auth_retry.py
Normal file
216
tests/gateway/test_ws_auth_retry.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""Tests for auth-aware retry in Mattermost WS and Matrix sync loops.
|
||||
|
||||
Both Mattermost's _ws_loop and Matrix's _sync_loop previously caught all
|
||||
exceptions with a broad ``except Exception`` and retried forever. Permanent
|
||||
auth failures (401, 403, M_UNKNOWN_TOKEN) would loop indefinitely instead
|
||||
of stopping. These tests verify that auth errors now stop the reconnect.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mattermost: _ws_loop auth-aware retry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMattermostWSAuthRetry:
|
||||
"""gateway/platforms/mattermost.py — _ws_loop()"""
|
||||
|
||||
def test_401_handshake_stops_reconnect(self):
|
||||
"""A WSServerHandshakeError with status 401 should stop the loop."""
|
||||
import aiohttp
|
||||
|
||||
exc = aiohttp.WSServerHandshakeError(
|
||||
request_info=MagicMock(),
|
||||
history=(),
|
||||
status=401,
|
||||
message="Unauthorized",
|
||||
headers=MagicMock(),
|
||||
)
|
||||
|
||||
from gateway.platforms.mattermost import MattermostAdapter
|
||||
adapter = MattermostAdapter.__new__(MattermostAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_connect():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
raise exc
|
||||
|
||||
adapter._ws_connect_and_listen = fake_connect
|
||||
|
||||
asyncio.run(adapter._ws_loop())
|
||||
|
||||
# Should have attempted once and stopped, not retried
|
||||
assert call_count == 1
|
||||
|
||||
def test_403_handshake_stops_reconnect(self):
|
||||
"""A WSServerHandshakeError with status 403 should stop the loop."""
|
||||
import aiohttp
|
||||
|
||||
exc = aiohttp.WSServerHandshakeError(
|
||||
request_info=MagicMock(),
|
||||
history=(),
|
||||
status=403,
|
||||
message="Forbidden",
|
||||
headers=MagicMock(),
|
||||
)
|
||||
|
||||
from gateway.platforms.mattermost import MattermostAdapter
|
||||
adapter = MattermostAdapter.__new__(MattermostAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_connect():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
raise exc
|
||||
|
||||
adapter._ws_connect_and_listen = fake_connect
|
||||
|
||||
asyncio.run(adapter._ws_loop())
|
||||
assert call_count == 1
|
||||
|
||||
def test_transient_error_retries(self):
|
||||
"""A transient ConnectionError should retry (not stop immediately)."""
|
||||
from gateway.platforms.mattermost import MattermostAdapter
|
||||
adapter = MattermostAdapter.__new__(MattermostAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_connect():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count >= 2:
|
||||
# Stop the loop after 2 attempts
|
||||
adapter._closing = True
|
||||
return
|
||||
raise ConnectionError("connection reset")
|
||||
|
||||
adapter._ws_connect_and_listen = fake_connect
|
||||
|
||||
async def run():
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._ws_loop()
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
# Should have retried at least once
|
||||
assert call_count >= 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Matrix: _sync_loop auth-aware retry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMatrixSyncAuthRetry:
|
||||
"""gateway/platforms/matrix.py — _sync_loop()"""
|
||||
|
||||
def test_unknown_token_sync_error_stops_loop(self):
|
||||
"""A SyncError with M_UNKNOWN_TOKEN should stop syncing."""
|
||||
import types
|
||||
nio_mock = types.ModuleType("nio")
|
||||
|
||||
class SyncError:
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
|
||||
nio_mock.SyncError = SyncError
|
||||
|
||||
from gateway.platforms.matrix import MatrixAdapter
|
||||
adapter = MatrixAdapter.__new__(MatrixAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
sync_count = 0
|
||||
|
||||
async def fake_sync(timeout=30000):
|
||||
nonlocal sync_count
|
||||
sync_count += 1
|
||||
return SyncError("M_UNKNOWN_TOKEN: Invalid access token")
|
||||
|
||||
adapter._client = MagicMock()
|
||||
adapter._client.sync = fake_sync
|
||||
|
||||
async def run():
|
||||
import sys
|
||||
sys.modules["nio"] = nio_mock
|
||||
try:
|
||||
await adapter._sync_loop()
|
||||
finally:
|
||||
del sys.modules["nio"]
|
||||
|
||||
asyncio.run(run())
|
||||
assert sync_count == 1
|
||||
|
||||
def test_exception_with_401_stops_loop(self):
|
||||
"""An exception containing '401' should stop syncing."""
|
||||
from gateway.platforms.matrix import MatrixAdapter
|
||||
adapter = MatrixAdapter.__new__(MatrixAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_sync(timeout=30000):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
raise RuntimeError("HTTP 401 Unauthorized")
|
||||
|
||||
adapter._client = MagicMock()
|
||||
adapter._client.sync = fake_sync
|
||||
|
||||
async def run():
|
||||
import types
|
||||
nio_mock = types.ModuleType("nio")
|
||||
nio_mock.SyncError = type("SyncError", (), {})
|
||||
|
||||
import sys
|
||||
sys.modules["nio"] = nio_mock
|
||||
try:
|
||||
await adapter._sync_loop()
|
||||
finally:
|
||||
del sys.modules["nio"]
|
||||
|
||||
asyncio.run(run())
|
||||
assert call_count == 1
|
||||
|
||||
def test_transient_error_retries(self):
|
||||
"""A transient error should retry (not stop immediately)."""
|
||||
from gateway.platforms.matrix import MatrixAdapter
|
||||
adapter = MatrixAdapter.__new__(MatrixAdapter)
|
||||
adapter._closing = False
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def fake_sync(timeout=30000):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count >= 2:
|
||||
adapter._closing = True
|
||||
return MagicMock() # Normal response
|
||||
raise ConnectionError("network timeout")
|
||||
|
||||
adapter._client = MagicMock()
|
||||
adapter._client.sync = fake_sync
|
||||
|
||||
async def run():
|
||||
import types
|
||||
nio_mock = types.ModuleType("nio")
|
||||
nio_mock.SyncError = type("SyncError", (), {})
|
||||
|
||||
import sys
|
||||
sys.modules["nio"] = nio_mock
|
||||
try:
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._sync_loop()
|
||||
finally:
|
||||
del sys.modules["nio"]
|
||||
|
||||
asyncio.run(run())
|
||||
assert call_count >= 2
|
||||
Reference in New Issue
Block a user