fix: improve bluebubbles webhook registration resilience

Follow-up to cherry-picked PR #6592:
- Extract _webhook_url property to deduplicate URL construction
- Add _find_registered_webhooks() helper for reuse
- Crash resilience: check for existing registration before POSTing
  (handles restart after unclean shutdown without creating duplicates)
- Accept 200-299 status range (not just 200) for webhook creation
- Unregister removes ALL matching registrations (cleans up orphaned dupes)
- Add 17 tests covering register/unregister/find/edge cases
This commit is contained in:
Teknium
2026-04-10 03:18:03 -07:00
committed by Teknium
parent c6ff5e5d30
commit f4f8b9579e
2 changed files with 306 additions and 38 deletions

View File

@@ -226,24 +226,44 @@ class BlueBubblesAdapter(BasePlatformAdapter):
self._runner = None
self._mark_disconnected()
@property
def _webhook_url(self) -> str:
"""Compute the external webhook URL for BlueBubbles registration."""
host = self.webhook_host
if host in ("0.0.0.0", "127.0.0.1", "localhost", "::"):
host = "localhost"
return f"http://{host}:{self.webhook_port}{self.webhook_path}"
async def _find_registered_webhooks(self, url: str) -> list:
"""Return list of BB webhook entries matching *url*."""
try:
res = await self._api_get("/api/v1/webhook")
data = res.get("data")
if isinstance(data, list):
return [wh for wh in data if wh.get("url") == url]
except Exception:
pass
return []
async def _register_webhook(self) -> bool:
"""Register this webhook URL with the BlueBubbles server.
BlueBubbles requires webhooks to be registered via API before
it will send events. This method registers our listener URL
for new-message and updated-message events.
it will send events. Checks for an existing registration first
to avoid duplicates (e.g. after a crash without clean shutdown).
"""
if not self.client:
return False
webhook_url = f"http://{self.webhook_host}:{self.webhook_port}{self.webhook_path}"
# Use host.docker.internal or public IP if webhook is 0.0.0.0/127.0.0.1
# and server is on a different host
if self.webhook_host in ("0.0.0.0", "127.0.0.1", "localhost", "::"):
# For local development, we need the external IP that BlueBubbles can reach
# Default to localhost for same-machine setups
external_host = "localhost"
webhook_url = f"http://{external_host}:{self.webhook_port}{self.webhook_path}"
webhook_url = self._webhook_url
# Crash resilience — reuse an existing registration if present
existing = await self._find_registered_webhooks(webhook_url)
if existing:
logger.info(
"[bluebubbles] webhook already registered: %s", webhook_url
)
return True
payload = {
"url": webhook_url,
@@ -252,16 +272,17 @@ class BlueBubblesAdapter(BasePlatformAdapter):
try:
res = await self._api_post("/api/v1/webhook", payload)
if res.get("status") == 200:
status = res.get("status", 0)
if 200 <= status < 300:
logger.info(
"[bluebubbles] webhook registered successfully with server: %s",
"[bluebubbles] webhook registered with server: %s",
webhook_url,
)
return True
else:
logger.warning(
"[bluebubbles] webhook registration returned non-200 status: %s - %s",
res.get("status"),
"[bluebubbles] webhook registration returned status %s: %s",
status,
res.get("message"),
)
return False
@@ -275,41 +296,34 @@ class BlueBubblesAdapter(BasePlatformAdapter):
async def _unregister_webhook(self) -> bool:
"""Unregister this webhook URL from the BlueBubbles server.
Cleans up the webhook registration when the gateway shuts down.
Removes *all* matching registrations to clean up any duplicates
left by prior crashes.
"""
if not self.client:
return False
webhook_url = f"http://{self.webhook_host}:{self.webhook_port}{self.webhook_path}"
if self.webhook_host in ("0.0.0.0", "127.0.0.1", "localhost", "::"):
external_host = "localhost"
webhook_url = f"http://{external_host}:{self.webhook_port}{self.webhook_path}"
webhook_url = self._webhook_url
removed = False
try:
# Get current webhooks
webhooks = await self._api_get("/api/v1/webhook")
if webhooks.get("status") == 200:
data = webhooks.get("data", [])
for webhook in data:
if webhook.get("url") == webhook_url:
# Delete this specific webhook
webhook_id = webhook.get("id")
if webhook_id:
res = await self.client.delete(
self._api_url(f"/api/v1/webhook/{webhook_id}")
)
res.raise_for_status()
logger.info(
"[bluebubbles] webhook unregistered: %s",
webhook_url,
)
return True
for wh in await self._find_registered_webhooks(webhook_url):
wh_id = wh.get("id")
if wh_id:
res = await self.client.delete(
self._api_url(f"/api/v1/webhook/{wh_id}")
)
res.raise_for_status()
removed = True
if removed:
logger.info(
"[bluebubbles] webhook unregistered: %s", webhook_url
)
except Exception as exc:
logger.debug(
"[bluebubbles] failed to unregister webhook (non-critical): %s",
exc,
)
return False
return removed
# ------------------------------------------------------------------
# Chat GUID resolution

View File

@@ -359,3 +359,257 @@ class TestBlueBubblesAttachmentDownload:
adapter._download_attachment("att-guid", {"mimeType": "image/png"})
)
assert result is None
# ---------------------------------------------------------------------------
# Webhook registration
# ---------------------------------------------------------------------------
class TestBlueBubblesWebhookUrl:
"""_webhook_url property normalises local hosts to 'localhost'."""
def test_default_host(self, monkeypatch):
adapter = _make_adapter(monkeypatch)
# Default webhook_host is 0.0.0.0 → normalized to localhost
assert "localhost" in adapter._webhook_url
assert str(adapter.webhook_port) in adapter._webhook_url
assert adapter.webhook_path in adapter._webhook_url
@pytest.mark.parametrize("host", ["0.0.0.0", "127.0.0.1", "localhost", "::"])
def test_local_hosts_normalized(self, monkeypatch, host):
adapter = _make_adapter(monkeypatch, webhook_host=host)
assert adapter._webhook_url.startswith("http://localhost:")
def test_custom_host_preserved(self, monkeypatch):
adapter = _make_adapter(monkeypatch, webhook_host="192.168.1.50")
assert "192.168.1.50" in adapter._webhook_url
class TestBlueBubblesWebhookRegistration:
"""Tests for _register_webhook, _unregister_webhook, _find_registered_webhooks."""
@staticmethod
def _mock_client(get_response=None, post_response=None, delete_ok=True):
"""Build a tiny mock httpx.AsyncClient."""
async def mock_get(*args, **kwargs):
class R:
status_code = 200
def raise_for_status(self):
pass
def json(self):
return get_response or {"status": 200, "data": []}
return R()
async def mock_post(*args, **kwargs):
class R:
status_code = 200
def raise_for_status(self):
pass
def json(self):
return post_response or {"status": 200, "data": {}}
return R()
async def mock_delete(*args, **kwargs):
class R:
status_code = 200 if delete_ok else 500
def raise_for_status(self_inner):
if not delete_ok:
raise Exception("delete failed")
return R()
return type(
"MockClient", (),
{"get": mock_get, "post": mock_post, "delete": mock_delete},
)()
# -- _find_registered_webhooks --
def test_find_registered_webhooks_returns_matches(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
url = adapter._webhook_url
adapter.client = self._mock_client(
get_response={"status": 200, "data": [
{"id": 1, "url": url, "events": ["new-message"]},
{"id": 2, "url": "http://other:9999/hook", "events": ["message"]},
]}
)
result = asyncio.get_event_loop().run_until_complete(
adapter._find_registered_webhooks(url)
)
assert len(result) == 1
assert result[0]["id"] == 1
def test_find_registered_webhooks_empty_when_none(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client(
get_response={"status": 200, "data": []}
)
result = asyncio.get_event_loop().run_until_complete(
adapter._find_registered_webhooks(adapter._webhook_url)
)
assert result == []
def test_find_registered_webhooks_handles_api_error(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client()
# Override _api_get to raise
async def bad_get(path):
raise ConnectionError("server down")
adapter._api_get = bad_get
result = asyncio.get_event_loop().run_until_complete(
adapter._find_registered_webhooks(adapter._webhook_url)
)
assert result == []
# -- _register_webhook --
def test_register_fresh(self, monkeypatch):
"""No existing webhook → POST creates one."""
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client(
get_response={"status": 200, "data": []},
post_response={"status": 200, "data": {"id": 42}},
)
ok = asyncio.get_event_loop().run_until_complete(
adapter._register_webhook()
)
assert ok is True
def test_register_accepts_201(self, monkeypatch):
"""BB might return 201 Created — must still succeed."""
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client(
get_response={"status": 200, "data": []},
post_response={"status": 201, "data": {"id": 43}},
)
ok = asyncio.get_event_loop().run_until_complete(
adapter._register_webhook()
)
assert ok is True
def test_register_reuses_existing(self, monkeypatch):
"""Crash resilience — existing registration is reused, no POST needed."""
import asyncio
adapter = _make_adapter(monkeypatch)
url = adapter._webhook_url
adapter.client = self._mock_client(
get_response={"status": 200, "data": [
{"id": 7, "url": url, "events": ["new-message"]},
]},
)
# Track whether POST was called
post_called = False
orig_api_post = adapter._api_post
async def tracking_post(path, payload):
nonlocal post_called
post_called = True
return await orig_api_post(path, payload)
adapter._api_post = tracking_post
ok = asyncio.get_event_loop().run_until_complete(
adapter._register_webhook()
)
assert ok is True
assert not post_called, "Should reuse existing, not POST again"
def test_register_returns_false_without_client(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = None
ok = asyncio.get_event_loop().run_until_complete(
adapter._register_webhook()
)
assert ok is False
def test_register_returns_false_on_server_error(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client(
get_response={"status": 200, "data": []},
post_response={"status": 500, "message": "internal error"},
)
ok = asyncio.get_event_loop().run_until_complete(
adapter._register_webhook()
)
assert ok is False
# -- _unregister_webhook --
def test_unregister_removes_matching(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
url = adapter._webhook_url
adapter.client = self._mock_client(
get_response={"status": 200, "data": [
{"id": 10, "url": url},
]},
)
ok = asyncio.get_event_loop().run_until_complete(
adapter._unregister_webhook()
)
assert ok is True
def test_unregister_removes_all_duplicates(self, monkeypatch):
"""Multiple orphaned registrations for same URL — all get removed."""
import asyncio
adapter = _make_adapter(monkeypatch)
url = adapter._webhook_url
deleted_ids = []
async def mock_delete(*args, **kwargs):
# Extract ID from URL
url_str = args[0] if args else ""
deleted_ids.append(url_str)
class R:
status_code = 200
def raise_for_status(self):
pass
return R()
adapter.client = self._mock_client(
get_response={"status": 200, "data": [
{"id": 1, "url": url},
{"id": 2, "url": url},
{"id": 3, "url": "http://other/hook"},
]},
)
adapter.client.delete = mock_delete
ok = asyncio.get_event_loop().run_until_complete(
adapter._unregister_webhook()
)
assert ok is True
assert len(deleted_ids) == 2
def test_unregister_returns_false_without_client(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = None
ok = asyncio.get_event_loop().run_until_complete(
adapter._unregister_webhook()
)
assert ok is False
def test_unregister_handles_api_failure_gracefully(self, monkeypatch):
import asyncio
adapter = _make_adapter(monkeypatch)
adapter.client = self._mock_client()
async def bad_get(path):
raise ConnectionError("server down")
adapter._api_get = bad_get
ok = asyncio.get_event_loop().run_until_complete(
adapter._unregister_webhook()
)
assert ok is False