Files
hermes-agent/tests/gateway/test_webhook_adapter.py
Test e140c02d51 feat(gateway): add webhook platform adapter for external event triggers
Add a generic webhook platform adapter that receives HTTP POSTs from
external services (GitHub, GitLab, JIRA, Stripe, etc.), validates HMAC
signatures, transforms payloads into agent prompts, and routes responses
back to the source or to another platform.

Features:
- Configurable routes with per-route HMAC secrets, event filters,
  prompt templates with dot-notation payload access, skill loading,
  and pluggable delivery (github_comment, telegram, discord, log)
- HMAC signature validation (GitHub SHA-256, GitLab token, generic)
- Rate limiting (30 req/min per route, configurable)
- Idempotency cache (1hr TTL, prevents duplicate runs on retries)
- Body size limits (1MB default, checked before reading payload)
- Setup wizard integration with security warnings and docs links
- 33 tests (29 unit + 4 integration), all passing

Security:
- HMAC secret required per route (startup validation)
- Setup wizard warns about internet exposure for webhook/SMS platforms
- Sandboxing (Docker/VM) recommended in docs for public-facing deployments

Files changed:
- gateway/config.py — Platform.WEBHOOK enum + env var overrides
- gateway/platforms/webhook.py — WebhookAdapter (~420 lines)
- gateway/run.py — factory wiring + auth bypass for webhook events
- hermes_cli/config.py — WEBHOOK_* env var definitions
- hermes_cli/setup.py — webhook section in setup_gateway()
- tests/gateway/test_webhook_adapter.py — 29 unit tests
- tests/gateway/test_webhook_integration.py — 4 integration tests
- website/docs/user-guide/messaging/webhooks.md — full user docs
- website/docs/reference/environment-variables.md — WEBHOOK_* vars
- website/sidebars.ts — nav entry
2026-03-20 06:33:36 -07:00

620 lines
22 KiB
Python

"""Unit tests for the generic webhook platform adapter.
Covers:
- HMAC signature validation (GitHub, GitLab, generic)
- Prompt rendering with dot-notation template variables
- Event type filtering
- HTTP handler behaviour (404, 202, health)
- Idempotency cache (duplicate delivery IDs)
- Rate limiting (fixed-window, per route)
- Body size limits
- INSECURE_NO_AUTH bypass
- Session isolation for concurrent webhooks
- Delivery info cleanup after send()
- connect / disconnect lifecycle
"""
import asyncio
import hashlib
import hmac
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType, SendResult
from gateway.platforms.webhook import (
WebhookAdapter,
_INSECURE_NO_AUTH,
check_webhook_requirements,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_config(
routes=None,
secret="",
rate_limit=30,
max_body_bytes=1_048_576,
host="0.0.0.0",
port=0, # let OS pick a free port in tests
):
"""Build a PlatformConfig suitable for WebhookAdapter."""
extra = {
"host": host,
"port": port,
"routes": routes or {},
"rate_limit": rate_limit,
"max_body_bytes": max_body_bytes,
}
if secret:
extra["secret"] = secret
return PlatformConfig(enabled=True, extra=extra)
def _make_adapter(routes=None, **kwargs):
"""Create a WebhookAdapter with sensible defaults for testing."""
config = _make_config(routes=routes, **kwargs)
return WebhookAdapter(config)
def _create_app(adapter: WebhookAdapter) -> web.Application:
"""Build the aiohttp Application from the adapter (without starting a full server)."""
app = web.Application()
app.router.add_get("/health", adapter._handle_health)
app.router.add_post("/webhooks/{route_name}", adapter._handle_webhook)
return app
def _mock_request(headers=None, body=b"", content_length=None, match_info=None):
"""Build a lightweight mock aiohttp request for non-HTTP tests."""
req = MagicMock()
req.headers = headers or {}
req.content_length = content_length if content_length is not None else len(body)
req.match_info = match_info or {}
req.method = "POST"
async def _read():
return body
req.read = _read
return req
def _github_signature(body: bytes, secret: str) -> str:
"""Compute X-Hub-Signature-256 for *body* using *secret*."""
return "sha256=" + hmac.new(
secret.encode(), body, hashlib.sha256
).hexdigest()
def _generic_signature(body: bytes, secret: str) -> str:
"""Compute X-Webhook-Signature (plain HMAC-SHA256 hex) for *body*."""
return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
# ===================================================================
# Signature validation
# ===================================================================
class TestValidateSignature:
"""Tests for WebhookAdapter._validate_signature."""
def test_validate_github_signature_valid(self):
"""Valid X-Hub-Signature-256 is accepted."""
adapter = _make_adapter()
body = b'{"action": "opened"}'
secret = "webhook-secret-42"
sig = _github_signature(body, secret)
req = _mock_request(headers={"X-Hub-Signature-256": sig})
assert adapter._validate_signature(req, body, secret) is True
def test_validate_github_signature_invalid(self):
"""Wrong X-Hub-Signature-256 is rejected."""
adapter = _make_adapter()
body = b'{"action": "opened"}'
secret = "webhook-secret-42"
req = _mock_request(headers={"X-Hub-Signature-256": "sha256=deadbeef"})
assert adapter._validate_signature(req, body, secret) is False
def test_validate_gitlab_token(self):
"""GitLab plain-token match via X-Gitlab-Token."""
adapter = _make_adapter()
secret = "gl-token-value"
req = _mock_request(headers={"X-Gitlab-Token": secret})
assert adapter._validate_signature(req, b"{}", secret) is True
def test_validate_gitlab_token_wrong(self):
"""Wrong X-Gitlab-Token is rejected."""
adapter = _make_adapter()
req = _mock_request(headers={"X-Gitlab-Token": "wrong"})
assert adapter._validate_signature(req, b"{}", "correct") is False
def test_validate_no_signature_with_secret_rejects(self):
"""Secret configured but no recognised signature header → reject."""
adapter = _make_adapter()
req = _mock_request(headers={}) # no sig headers at all
assert adapter._validate_signature(req, b"{}", "my-secret") is False
def test_validate_no_secret_allows_all(self):
"""When the secret is empty/falsy, the validator is never even called
by the handler (secret check is 'if secret and secret != _INSECURE...').
Verify that an empty secret isn't accidentally passed to the validator."""
# This tests the semantics: empty secret means skip validation entirely.
# The handler code does: if secret and secret != _INSECURE_NO_AUTH: validate
# So with an empty secret, _validate_signature is never reached.
# We just verify the code path is correct by constructing an adapter
# with no secret and confirming the route config resolves to "".
adapter = _make_adapter(
routes={"test": {"prompt": "hello"}},
secret="",
)
# The route has no secret, global secret is empty
route_secret = adapter._routes["test"].get("secret", adapter._global_secret)
assert not route_secret # empty → validation is skipped in handler
def test_validate_generic_signature_valid(self):
"""Valid X-Webhook-Signature (generic HMAC-SHA256 hex) is accepted."""
adapter = _make_adapter()
body = b'{"event": "push"}'
secret = "generic-secret"
sig = _generic_signature(body, secret)
req = _mock_request(headers={"X-Webhook-Signature": sig})
assert adapter._validate_signature(req, body, secret) is True
# ===================================================================
# Prompt rendering
# ===================================================================
class TestRenderPrompt:
"""Tests for WebhookAdapter._render_prompt."""
def test_render_prompt_dot_notation(self):
"""Dot-notation {pull_request.title} resolves nested keys."""
adapter = _make_adapter()
payload = {"pull_request": {"title": "Fix bug", "number": 42}}
result = adapter._render_prompt(
"PR #{pull_request.number}: {pull_request.title}",
payload,
"pull_request",
"github",
)
assert result == "PR #42: Fix bug"
def test_render_prompt_missing_key_preserved(self):
"""{nonexistent} is left as-is when key doesn't exist in payload."""
adapter = _make_adapter()
result = adapter._render_prompt(
"Hello {nonexistent}!",
{"action": "opened"},
"push",
"test",
)
assert "{nonexistent}" in result
def test_render_prompt_no_template_dumps_json(self):
"""Empty template → JSON dump fallback with event/route context."""
adapter = _make_adapter()
payload = {"key": "value"}
result = adapter._render_prompt("", payload, "push", "my-route")
assert "push" in result
assert "my-route" in result
assert "key" in result
# ===================================================================
# Delivery extra rendering
# ===================================================================
class TestRenderDeliveryExtra:
def test_render_delivery_extra_templates(self):
"""String values in deliver_extra are rendered with payload data."""
adapter = _make_adapter()
extra = {"repo": "{repository.full_name}", "pr_number": "{number}", "static": 42}
payload = {"repository": {"full_name": "org/repo"}, "number": 7}
result = adapter._render_delivery_extra(extra, payload)
assert result["repo"] == "org/repo"
assert result["pr_number"] == "7"
assert result["static"] == 42 # non-string left as-is
# ===================================================================
# Event filtering
# ===================================================================
class TestEventFilter:
"""Tests for event type filtering in _handle_webhook."""
@pytest.mark.asyncio
async def test_event_filter_accepts_matching(self):
"""Matching event type passes through."""
routes = {
"gh": {
"secret": _INSECURE_NO_AUTH,
"events": ["pull_request"],
"prompt": "PR: {action}",
}
}
adapter = _make_adapter(routes=routes)
# Stub handle_message to avoid running the agent
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/gh",
json={"action": "opened"},
headers={"X-GitHub-Event": "pull_request"},
)
assert resp.status == 202
@pytest.mark.asyncio
async def test_event_filter_rejects_non_matching(self):
"""Non-matching event type returns 200 with status=ignored."""
routes = {
"gh": {
"secret": _INSECURE_NO_AUTH,
"events": ["pull_request"],
"prompt": "test",
}
}
adapter = _make_adapter(routes=routes)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/gh",
json={"action": "opened"},
headers={"X-GitHub-Event": "push"},
)
assert resp.status == 200
data = await resp.json()
assert data["status"] == "ignored"
@pytest.mark.asyncio
async def test_event_filter_empty_allows_all(self):
"""No events list → accept any event type."""
routes = {
"all": {
"secret": _INSECURE_NO_AUTH,
"prompt": "got it",
}
}
adapter = _make_adapter(routes=routes)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/all",
json={"action": "any"},
headers={"X-GitHub-Event": "whatever"},
)
assert resp.status == 202
# ===================================================================
# HTTP handling
# ===================================================================
class TestHTTPHandling:
@pytest.mark.asyncio
async def test_unknown_route_returns_404(self):
"""POST to an unknown route returns 404."""
adapter = _make_adapter(routes={"real": {"secret": _INSECURE_NO_AUTH, "prompt": "x"}})
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/webhooks/nonexistent", json={"a": 1})
assert resp.status == 404
@pytest.mark.asyncio
async def test_webhook_handler_returns_202(self):
"""Valid request returns 202 Accepted."""
routes = {"test": {"secret": _INSECURE_NO_AUTH, "prompt": "hi"}}
adapter = _make_adapter(routes=routes)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/webhooks/test", json={"data": "value"})
assert resp.status == 202
data = await resp.json()
assert data["status"] == "accepted"
assert data["route"] == "test"
@pytest.mark.asyncio
async def test_health_endpoint(self):
"""GET /health returns 200 with status=ok."""
adapter = _make_adapter()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.get("/health")
assert resp.status == 200
data = await resp.json()
assert data["status"] == "ok"
assert data["platform"] == "webhook"
@pytest.mark.asyncio
async def test_connect_starts_server(self):
"""connect() starts the HTTP listener and marks adapter as connected."""
routes = {"r1": {"secret": _INSECURE_NO_AUTH, "prompt": "x"}}
adapter = _make_adapter(routes=routes, port=0)
# Use port 0 — the OS picks a free port, but aiohttp requires a real bind.
# We just test that the method completes and marks connected.
# Need to mock TCPSite to avoid actual binding.
with patch("gateway.platforms.webhook.web.AppRunner") as MockRunner, \
patch("gateway.platforms.webhook.web.TCPSite") as MockSite:
mock_runner_inst = AsyncMock()
MockRunner.return_value = mock_runner_inst
mock_site_inst = AsyncMock()
MockSite.return_value = mock_site_inst
result = await adapter.connect()
assert result is True
assert adapter.is_connected
mock_runner_inst.setup.assert_awaited_once()
mock_site_inst.start.assert_awaited_once()
await adapter.disconnect()
@pytest.mark.asyncio
async def test_disconnect_cleans_up(self):
"""disconnect() stops the server and marks adapter disconnected."""
adapter = _make_adapter()
# Simulate a runner that was previously set up
mock_runner = AsyncMock()
adapter._runner = mock_runner
adapter._running = True
await adapter.disconnect()
mock_runner.cleanup.assert_awaited_once()
assert adapter._runner is None
assert not adapter.is_connected
# ===================================================================
# Idempotency
# ===================================================================
class TestIdempotency:
@pytest.mark.asyncio
async def test_duplicate_delivery_id_returns_200(self):
"""Second request with same delivery ID returns 200 duplicate."""
routes = {"idem": {"secret": _INSECURE_NO_AUTH, "prompt": "test"}}
adapter = _make_adapter(routes=routes)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
headers = {"X-GitHub-Delivery": "delivery-123"}
resp1 = await cli.post("/webhooks/idem", json={"a": 1}, headers=headers)
assert resp1.status == 202
resp2 = await cli.post("/webhooks/idem", json={"a": 1}, headers=headers)
assert resp2.status == 200
data = await resp2.json()
assert data["status"] == "duplicate"
@pytest.mark.asyncio
async def test_expired_delivery_id_allows_reprocess(self):
"""After TTL expires, the same delivery ID is accepted again."""
routes = {"idem": {"secret": _INSECURE_NO_AUTH, "prompt": "test"}}
adapter = _make_adapter(routes=routes)
adapter._idempotency_ttl = 1 # 1 second TTL for test speed
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
headers = {"X-GitHub-Delivery": "delivery-456"}
resp1 = await cli.post("/webhooks/idem", json={"x": 1}, headers=headers)
assert resp1.status == 202
# Backdate the cache entry so it appears expired
adapter._seen_deliveries["delivery-456"] = time.time() - 3700
resp2 = await cli.post("/webhooks/idem", json={"x": 1}, headers=headers)
assert resp2.status == 202 # re-accepted
# ===================================================================
# Rate limiting
# ===================================================================
class TestRateLimiting:
@pytest.mark.asyncio
async def test_rate_limit_rejects_excess(self):
"""Exceeding the rate limit returns 429."""
routes = {"limited": {"secret": _INSECURE_NO_AUTH, "prompt": "test"}}
adapter = _make_adapter(routes=routes, rate_limit=2)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
# Two requests within limit
for i in range(2):
resp = await cli.post(
"/webhooks/limited",
json={"n": i},
headers={"X-GitHub-Delivery": f"d-{i}"},
)
assert resp.status == 202, f"Request {i} should be accepted"
# Third request should be rate-limited
resp = await cli.post(
"/webhooks/limited",
json={"n": 99},
headers={"X-GitHub-Delivery": "d-99"},
)
assert resp.status == 429
@pytest.mark.asyncio
async def test_rate_limit_window_resets(self):
"""After the 60-second window passes, requests are allowed again."""
routes = {"limited": {"secret": _INSECURE_NO_AUTH, "prompt": "test"}}
adapter = _make_adapter(routes=routes, rate_limit=1)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/limited",
json={"n": 1},
headers={"X-GitHub-Delivery": "d-a"},
)
assert resp.status == 202
# Backdate all rate-limit timestamps to > 60 seconds ago
adapter._rate_counts["limited"] = [time.time() - 120]
resp = await cli.post(
"/webhooks/limited",
json={"n": 2},
headers={"X-GitHub-Delivery": "d-b"},
)
assert resp.status == 202 # allowed again
# ===================================================================
# Body size limit
# ===================================================================
class TestBodySize:
@pytest.mark.asyncio
async def test_oversized_payload_rejected(self):
"""Content-Length > max_body_bytes returns 413."""
routes = {"big": {"secret": _INSECURE_NO_AUTH, "prompt": "test"}}
adapter = _make_adapter(routes=routes, max_body_bytes=100)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
large_payload = {"data": "x" * 200}
resp = await cli.post(
"/webhooks/big",
json=large_payload,
headers={"Content-Length": "999999"},
)
assert resp.status == 413
# ===================================================================
# INSECURE_NO_AUTH
# ===================================================================
class TestInsecureNoAuth:
@pytest.mark.asyncio
async def test_insecure_no_auth_skips_validation(self):
"""Setting secret to _INSECURE_NO_AUTH bypasses signature check."""
routes = {"open": {"secret": _INSECURE_NO_AUTH, "prompt": "hello"}}
adapter = _make_adapter(routes=routes)
adapter.handle_message = AsyncMock()
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
# No signature header at all — should still be accepted
resp = await cli.post("/webhooks/open", json={"test": True})
assert resp.status == 202
# ===================================================================
# Session isolation
# ===================================================================
class TestSessionIsolation:
@pytest.mark.asyncio
async def test_concurrent_webhooks_get_independent_sessions(self):
"""Two events on the same route produce different session keys."""
routes = {"ci": {"secret": _INSECURE_NO_AUTH, "prompt": "build"}}
adapter = _make_adapter(routes=routes)
captured_events = []
async def _capture(event):
captured_events.append(event)
adapter.handle_message = _capture
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp1 = await cli.post(
"/webhooks/ci",
json={"ref": "main"},
headers={"X-GitHub-Delivery": "aaa-111"},
)
assert resp1.status == 202
resp2 = await cli.post(
"/webhooks/ci",
json={"ref": "dev"},
headers={"X-GitHub-Delivery": "bbb-222"},
)
assert resp2.status == 202
# Wait for the async tasks to be created
await asyncio.sleep(0.05)
assert len(captured_events) == 2
ids = {ev.source.chat_id for ev in captured_events}
assert len(ids) == 2, "Each delivery must have a unique session chat_id"
# ===================================================================
# Delivery info cleanup
# ===================================================================
class TestDeliveryCleanup:
@pytest.mark.asyncio
async def test_delivery_info_cleaned_after_send(self):
"""send() pops delivery_info so the entry doesn't leak memory."""
adapter = _make_adapter()
chat_id = "webhook:test:d-xyz"
adapter._delivery_info[chat_id] = {
"deliver": "log",
"deliver_extra": {},
"payload": {"x": 1},
}
result = await adapter.send(chat_id, "Agent response here")
assert result.success is True
assert chat_id not in adapter._delivery_info
# ===================================================================
# check_webhook_requirements
# ===================================================================
class TestCheckRequirements:
def test_returns_true_when_aiohttp_available(self):
assert check_webhook_requirements() is True
@patch("gateway.platforms.webhook.AIOHTTP_AVAILABLE", False)
def test_returns_false_without_aiohttp(self):
assert check_webhook_requirements() is False