fix: Gitea webhook adapter — normalize events to sensory bus (#309)

Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit is contained in:
2026-03-18 18:37:01 -04:00
committed by hermes
parent 0ab1ee9378
commit 39939270b7
4 changed files with 377 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Adapters — normalize external data streams into sensory events."""

View File

@@ -0,0 +1,136 @@
"""Gitea webhook adapter — normalize webhook payloads to event bus events.
Receives raw Gitea webhook payloads and emits typed events via the
infrastructure event bus. Bot-only activity is filtered unless it
represents a PR merge (which is always noteworthy).
"""
import logging
from typing import Any
from infrastructure.events.bus import emit
logger = logging.getLogger(__name__)
# Gitea usernames considered "bot" accounts
BOT_USERNAMES = frozenset({"hermes", "kimi", "manus"})
# Owner username — activity from this user is always emitted
OWNER_USERNAME = "rockachopa"
# Mapping from Gitea webhook event type to our bus event type
_EVENT_TYPE_MAP = {
"push": "gitea.push",
"issues": "gitea.issue.opened",
"issue_comment": "gitea.issue.comment",
"pull_request": "gitea.pull_request",
}
def _extract_actor(payload: dict[str, Any]) -> str:
"""Extract the actor username from a webhook payload."""
# Gitea puts actor in sender.login for most events
sender = payload.get("sender", {})
return sender.get("login", "unknown")
def _is_bot(username: str) -> bool:
return username.lower() in BOT_USERNAMES
def _is_pr_merge(event_type: str, payload: dict[str, Any]) -> bool:
"""Check if this is a pull_request merge event."""
if event_type != "pull_request":
return False
action = payload.get("action", "")
pr = payload.get("pull_request", {})
return action == "closed" and pr.get("merged", False)
def _normalize_push(payload: dict[str, Any], actor: str) -> dict[str, Any]:
"""Normalize a push event payload."""
commits = payload.get("commits", [])
return {
"actor": actor,
"ref": payload.get("ref", ""),
"repo": payload.get("repository", {}).get("full_name", ""),
"num_commits": len(commits),
"head_message": commits[0].get("message", "").split("\n", 1)[0].strip() if commits else "",
}
def _normalize_issue_opened(payload: dict[str, Any], actor: str) -> dict[str, Any]:
"""Normalize an issue-opened event payload."""
issue = payload.get("issue", {})
return {
"actor": actor,
"action": payload.get("action", "opened"),
"repo": payload.get("repository", {}).get("full_name", ""),
"issue_number": issue.get("number", 0),
"title": issue.get("title", ""),
}
def _normalize_issue_comment(payload: dict[str, Any], actor: str) -> dict[str, Any]:
"""Normalize an issue-comment event payload."""
issue = payload.get("issue", {})
comment = payload.get("comment", {})
return {
"actor": actor,
"action": payload.get("action", "created"),
"repo": payload.get("repository", {}).get("full_name", ""),
"issue_number": issue.get("number", 0),
"issue_title": issue.get("title", ""),
"comment_body": (comment.get("body", "")[:200]),
}
def _normalize_pull_request(payload: dict[str, Any], actor: str) -> dict[str, Any]:
"""Normalize a pull-request event payload."""
pr = payload.get("pull_request", {})
return {
"actor": actor,
"action": payload.get("action", ""),
"repo": payload.get("repository", {}).get("full_name", ""),
"pr_number": pr.get("number", 0),
"title": pr.get("title", ""),
"merged": pr.get("merged", False),
}
_NORMALIZERS = {
"push": _normalize_push,
"issues": _normalize_issue_opened,
"issue_comment": _normalize_issue_comment,
"pull_request": _normalize_pull_request,
}
async def handle_webhook(event_type: str, payload: dict[str, Any]) -> bool:
"""Normalize a Gitea webhook payload and emit it to the event bus.
Args:
event_type: The Gitea event type header (e.g. "push", "issues").
payload: The raw JSON payload from the webhook.
Returns:
True if an event was emitted, False if filtered or unsupported.
"""
bus_event_type = _EVENT_TYPE_MAP.get(event_type)
if bus_event_type is None:
logger.debug("Unsupported Gitea event type: %s", event_type)
return False
actor = _extract_actor(payload)
# Filter bot-only activity — except PR merges
if _is_bot(actor) and not _is_pr_merge(event_type, payload):
logger.debug("Filtered bot activity from %s on %s", actor, event_type)
return False
normalizer = _NORMALIZERS[event_type]
data = normalizer(payload, actor)
await emit(bus_event_type, source="gitea", data=data)
logger.info("Emitted %s from %s", bus_event_type, actor)
return True

View File

View File

@@ -0,0 +1,240 @@
"""Tests for the Gitea webhook adapter."""
from unittest.mock import AsyncMock, patch
import pytest
from timmy.adapters.gitea_adapter import (
BOT_USERNAMES,
_extract_actor,
_is_bot,
_is_pr_merge,
_normalize_issue_comment,
_normalize_issue_opened,
_normalize_pull_request,
_normalize_push,
handle_webhook,
)
# ── Fixtures: sample payloads ────────────────────────────────────────────────
def _sender(login: str) -> dict:
return {"sender": {"login": login}}
def _push_payload(actor: str = "rockachopa", ref: str = "refs/heads/main") -> dict:
return {
**_sender(actor),
"ref": ref,
"repository": {"full_name": "rockachopa/Timmy-time-dashboard"},
"commits": [
{"message": "fix: something\n\nDetails here"},
{"message": "chore: cleanup"},
],
}
def _issue_payload(actor: str = "rockachopa", action: str = "opened") -> dict:
return {
**_sender(actor),
"action": action,
"repository": {"full_name": "rockachopa/Timmy-time-dashboard"},
"issue": {"number": 42, "title": "Bug in dashboard"},
}
def _issue_comment_payload(actor: str = "rockachopa") -> dict:
return {
**_sender(actor),
"action": "created",
"repository": {"full_name": "rockachopa/Timmy-time-dashboard"},
"issue": {"number": 42, "title": "Bug in dashboard"},
"comment": {"body": "I think this is related to the config change"},
}
def _pr_payload(
actor: str = "rockachopa",
action: str = "opened",
merged: bool = False,
) -> dict:
return {
**_sender(actor),
"action": action,
"repository": {"full_name": "rockachopa/Timmy-time-dashboard"},
"pull_request": {
"number": 99,
"title": "feat: add new feature",
"merged": merged,
},
}
# ── Unit tests: helpers ──────────────────────────────────────────────────────
class TestExtractActor:
def test_normal_sender(self):
assert _extract_actor({"sender": {"login": "rockachopa"}}) == "rockachopa"
def test_missing_sender(self):
assert _extract_actor({}) == "unknown"
class TestIsBot:
@pytest.mark.parametrize("name", list(BOT_USERNAMES))
def test_known_bots(self, name):
assert _is_bot(name) is True
def test_owner_not_bot(self):
assert _is_bot("rockachopa") is False
def test_case_insensitive(self):
assert _is_bot("Kimi") is True
class TestIsPrMerge:
def test_merged_pr(self):
payload = _pr_payload(action="closed", merged=True)
assert _is_pr_merge("pull_request", payload) is True
def test_closed_not_merged(self):
payload = _pr_payload(action="closed", merged=False)
assert _is_pr_merge("pull_request", payload) is False
def test_opened_pr(self):
payload = _pr_payload(action="opened")
assert _is_pr_merge("pull_request", payload) is False
def test_non_pr_event(self):
assert _is_pr_merge("push", {}) is False
# ── Unit tests: normalizers ──────────────────────────────────────────────────
class TestNormalizePush:
def test_basic(self):
data = _normalize_push(_push_payload(), "rockachopa")
assert data["actor"] == "rockachopa"
assert data["ref"] == "refs/heads/main"
assert data["num_commits"] == 2
assert data["head_message"] == "fix: something"
assert data["repo"] == "rockachopa/Timmy-time-dashboard"
def test_empty_commits(self):
payload = {**_push_payload(), "commits": []}
data = _normalize_push(payload, "rockachopa")
assert data["num_commits"] == 0
assert data["head_message"] == ""
class TestNormalizeIssueOpened:
def test_basic(self):
data = _normalize_issue_opened(_issue_payload(), "rockachopa")
assert data["issue_number"] == 42
assert data["title"] == "Bug in dashboard"
assert data["action"] == "opened"
class TestNormalizeIssueComment:
def test_basic(self):
data = _normalize_issue_comment(_issue_comment_payload(), "rockachopa")
assert data["issue_number"] == 42
assert data["comment_body"].startswith("I think this is related")
def test_long_comment_truncated(self):
payload = _issue_comment_payload()
payload["comment"]["body"] = "x" * 500
data = _normalize_issue_comment(payload, "rockachopa")
assert len(data["comment_body"]) == 200
class TestNormalizePullRequest:
def test_opened(self):
data = _normalize_pull_request(_pr_payload(), "rockachopa")
assert data["pr_number"] == 99
assert data["merged"] is False
assert data["action"] == "opened"
def test_merged(self):
payload = _pr_payload(action="closed", merged=True)
data = _normalize_pull_request(payload, "rockachopa")
assert data["merged"] is True
# ── Integration tests: handle_webhook ────────────────────────────────────────
@pytest.mark.asyncio
class TestHandleWebhook:
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_push_emitted(self, mock_emit):
result = await handle_webhook("push", _push_payload())
assert result is True
mock_emit.assert_called_once()
args = mock_emit.call_args
assert args[0][0] == "gitea.push"
assert args[1]["data"]["num_commits"] == 2
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_issue_opened_emitted(self, mock_emit):
result = await handle_webhook("issues", _issue_payload())
assert result is True
mock_emit.assert_called_once()
assert mock_emit.call_args[0][0] == "gitea.issue.opened"
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_issue_comment_emitted(self, mock_emit):
result = await handle_webhook("issue_comment", _issue_comment_payload())
assert result is True
assert mock_emit.call_args[0][0] == "gitea.issue.comment"
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_pull_request_emitted(self, mock_emit):
result = await handle_webhook("pull_request", _pr_payload())
assert result is True
assert mock_emit.call_args[0][0] == "gitea.pull_request"
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_unsupported_event_filtered(self, mock_emit):
result = await handle_webhook("fork", {"sender": {"login": "someone"}})
assert result is False
mock_emit.assert_not_called()
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_bot_push_filtered(self, mock_emit):
result = await handle_webhook("push", _push_payload(actor="kimi"))
assert result is False
mock_emit.assert_not_called()
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_bot_issue_filtered(self, mock_emit):
result = await handle_webhook("issues", _issue_payload(actor="hermes"))
assert result is False
mock_emit.assert_not_called()
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_bot_pr_merge_not_filtered(self, mock_emit):
"""Bot PR merges should still be emitted."""
payload = _pr_payload(actor="kimi", action="closed", merged=True)
result = await handle_webhook("pull_request", payload)
assert result is True
mock_emit.assert_called_once()
data = mock_emit.call_args[1]["data"]
assert data["merged"] is True
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_bot_pr_close_without_merge_filtered(self, mock_emit):
"""Bot PR close (not merge) should be filtered."""
payload = _pr_payload(actor="manus", action="closed", merged=False)
result = await handle_webhook("pull_request", payload)
assert result is False
mock_emit.assert_not_called()
@patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock)
async def test_owner_activity_always_emitted(self, mock_emit):
result = await handle_webhook("push", _push_payload(actor="rockachopa"))
assert result is True
mock_emit.assert_called_once()