diff --git a/src/timmy/adapters/__init__.py b/src/timmy/adapters/__init__.py new file mode 100644 index 0000000..ca62a9d --- /dev/null +++ b/src/timmy/adapters/__init__.py @@ -0,0 +1 @@ +"""Adapters — normalize external data streams into sensory events.""" diff --git a/src/timmy/adapters/gitea_adapter.py b/src/timmy/adapters/gitea_adapter.py new file mode 100644 index 0000000..be3f4c9 --- /dev/null +++ b/src/timmy/adapters/gitea_adapter.py @@ -0,0 +1,136 @@ +"""Gitea webhook adapter — normalize webhook payloads to event bus events. + +Receives raw Gitea webhook payloads and emits typed events via the +infrastructure event bus. Bot-only activity is filtered unless it +represents a PR merge (which is always noteworthy). +""" + +import logging +from typing import Any + +from infrastructure.events.bus import emit + +logger = logging.getLogger(__name__) + +# Gitea usernames considered "bot" accounts +BOT_USERNAMES = frozenset({"hermes", "kimi", "manus"}) + +# Owner username — activity from this user is always emitted +OWNER_USERNAME = "rockachopa" + +# Mapping from Gitea webhook event type to our bus event type +_EVENT_TYPE_MAP = { + "push": "gitea.push", + "issues": "gitea.issue.opened", + "issue_comment": "gitea.issue.comment", + "pull_request": "gitea.pull_request", +} + + +def _extract_actor(payload: dict[str, Any]) -> str: + """Extract the actor username from a webhook payload.""" + # Gitea puts actor in sender.login for most events + sender = payload.get("sender", {}) + return sender.get("login", "unknown") + + +def _is_bot(username: str) -> bool: + return username.lower() in BOT_USERNAMES + + +def _is_pr_merge(event_type: str, payload: dict[str, Any]) -> bool: + """Check if this is a pull_request merge event.""" + if event_type != "pull_request": + return False + action = payload.get("action", "") + pr = payload.get("pull_request", {}) + return action == "closed" and pr.get("merged", False) + + +def _normalize_push(payload: dict[str, Any], actor: str) -> dict[str, Any]: + """Normalize a push event payload.""" + commits = payload.get("commits", []) + return { + "actor": actor, + "ref": payload.get("ref", ""), + "repo": payload.get("repository", {}).get("full_name", ""), + "num_commits": len(commits), + "head_message": commits[0].get("message", "").split("\n", 1)[0].strip() if commits else "", + } + + +def _normalize_issue_opened(payload: dict[str, Any], actor: str) -> dict[str, Any]: + """Normalize an issue-opened event payload.""" + issue = payload.get("issue", {}) + return { + "actor": actor, + "action": payload.get("action", "opened"), + "repo": payload.get("repository", {}).get("full_name", ""), + "issue_number": issue.get("number", 0), + "title": issue.get("title", ""), + } + + +def _normalize_issue_comment(payload: dict[str, Any], actor: str) -> dict[str, Any]: + """Normalize an issue-comment event payload.""" + issue = payload.get("issue", {}) + comment = payload.get("comment", {}) + return { + "actor": actor, + "action": payload.get("action", "created"), + "repo": payload.get("repository", {}).get("full_name", ""), + "issue_number": issue.get("number", 0), + "issue_title": issue.get("title", ""), + "comment_body": (comment.get("body", "")[:200]), + } + + +def _normalize_pull_request(payload: dict[str, Any], actor: str) -> dict[str, Any]: + """Normalize a pull-request event payload.""" + pr = payload.get("pull_request", {}) + return { + "actor": actor, + "action": payload.get("action", ""), + "repo": payload.get("repository", {}).get("full_name", ""), + "pr_number": pr.get("number", 0), + "title": pr.get("title", ""), + "merged": pr.get("merged", False), + } + + +_NORMALIZERS = { + "push": _normalize_push, + "issues": _normalize_issue_opened, + "issue_comment": _normalize_issue_comment, + "pull_request": _normalize_pull_request, +} + + +async def handle_webhook(event_type: str, payload: dict[str, Any]) -> bool: + """Normalize a Gitea webhook payload and emit it to the event bus. + + Args: + event_type: The Gitea event type header (e.g. "push", "issues"). + payload: The raw JSON payload from the webhook. + + Returns: + True if an event was emitted, False if filtered or unsupported. + """ + bus_event_type = _EVENT_TYPE_MAP.get(event_type) + if bus_event_type is None: + logger.debug("Unsupported Gitea event type: %s", event_type) + return False + + actor = _extract_actor(payload) + + # Filter bot-only activity — except PR merges + if _is_bot(actor) and not _is_pr_merge(event_type, payload): + logger.debug("Filtered bot activity from %s on %s", actor, event_type) + return False + + normalizer = _NORMALIZERS[event_type] + data = normalizer(payload, actor) + + await emit(bus_event_type, source="gitea", data=data) + logger.info("Emitted %s from %s", bus_event_type, actor) + return True diff --git a/tests/timmy/adapters/__init__.py b/tests/timmy/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/timmy/adapters/test_gitea_adapter.py b/tests/timmy/adapters/test_gitea_adapter.py new file mode 100644 index 0000000..05e8a36 --- /dev/null +++ b/tests/timmy/adapters/test_gitea_adapter.py @@ -0,0 +1,240 @@ +"""Tests for the Gitea webhook adapter.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from timmy.adapters.gitea_adapter import ( + BOT_USERNAMES, + _extract_actor, + _is_bot, + _is_pr_merge, + _normalize_issue_comment, + _normalize_issue_opened, + _normalize_pull_request, + _normalize_push, + handle_webhook, +) + +# ── Fixtures: sample payloads ──────────────────────────────────────────────── + + +def _sender(login: str) -> dict: + return {"sender": {"login": login}} + + +def _push_payload(actor: str = "rockachopa", ref: str = "refs/heads/main") -> dict: + return { + **_sender(actor), + "ref": ref, + "repository": {"full_name": "rockachopa/Timmy-time-dashboard"}, + "commits": [ + {"message": "fix: something\n\nDetails here"}, + {"message": "chore: cleanup"}, + ], + } + + +def _issue_payload(actor: str = "rockachopa", action: str = "opened") -> dict: + return { + **_sender(actor), + "action": action, + "repository": {"full_name": "rockachopa/Timmy-time-dashboard"}, + "issue": {"number": 42, "title": "Bug in dashboard"}, + } + + +def _issue_comment_payload(actor: str = "rockachopa") -> dict: + return { + **_sender(actor), + "action": "created", + "repository": {"full_name": "rockachopa/Timmy-time-dashboard"}, + "issue": {"number": 42, "title": "Bug in dashboard"}, + "comment": {"body": "I think this is related to the config change"}, + } + + +def _pr_payload( + actor: str = "rockachopa", + action: str = "opened", + merged: bool = False, +) -> dict: + return { + **_sender(actor), + "action": action, + "repository": {"full_name": "rockachopa/Timmy-time-dashboard"}, + "pull_request": { + "number": 99, + "title": "feat: add new feature", + "merged": merged, + }, + } + + +# ── Unit tests: helpers ────────────────────────────────────────────────────── + + +class TestExtractActor: + def test_normal_sender(self): + assert _extract_actor({"sender": {"login": "rockachopa"}}) == "rockachopa" + + def test_missing_sender(self): + assert _extract_actor({}) == "unknown" + + +class TestIsBot: + @pytest.mark.parametrize("name", list(BOT_USERNAMES)) + def test_known_bots(self, name): + assert _is_bot(name) is True + + def test_owner_not_bot(self): + assert _is_bot("rockachopa") is False + + def test_case_insensitive(self): + assert _is_bot("Kimi") is True + + +class TestIsPrMerge: + def test_merged_pr(self): + payload = _pr_payload(action="closed", merged=True) + assert _is_pr_merge("pull_request", payload) is True + + def test_closed_not_merged(self): + payload = _pr_payload(action="closed", merged=False) + assert _is_pr_merge("pull_request", payload) is False + + def test_opened_pr(self): + payload = _pr_payload(action="opened") + assert _is_pr_merge("pull_request", payload) is False + + def test_non_pr_event(self): + assert _is_pr_merge("push", {}) is False + + +# ── Unit tests: normalizers ────────────────────────────────────────────────── + + +class TestNormalizePush: + def test_basic(self): + data = _normalize_push(_push_payload(), "rockachopa") + assert data["actor"] == "rockachopa" + assert data["ref"] == "refs/heads/main" + assert data["num_commits"] == 2 + assert data["head_message"] == "fix: something" + assert data["repo"] == "rockachopa/Timmy-time-dashboard" + + def test_empty_commits(self): + payload = {**_push_payload(), "commits": []} + data = _normalize_push(payload, "rockachopa") + assert data["num_commits"] == 0 + assert data["head_message"] == "" + + +class TestNormalizeIssueOpened: + def test_basic(self): + data = _normalize_issue_opened(_issue_payload(), "rockachopa") + assert data["issue_number"] == 42 + assert data["title"] == "Bug in dashboard" + assert data["action"] == "opened" + + +class TestNormalizeIssueComment: + def test_basic(self): + data = _normalize_issue_comment(_issue_comment_payload(), "rockachopa") + assert data["issue_number"] == 42 + assert data["comment_body"].startswith("I think this is related") + + def test_long_comment_truncated(self): + payload = _issue_comment_payload() + payload["comment"]["body"] = "x" * 500 + data = _normalize_issue_comment(payload, "rockachopa") + assert len(data["comment_body"]) == 200 + + +class TestNormalizePullRequest: + def test_opened(self): + data = _normalize_pull_request(_pr_payload(), "rockachopa") + assert data["pr_number"] == 99 + assert data["merged"] is False + assert data["action"] == "opened" + + def test_merged(self): + payload = _pr_payload(action="closed", merged=True) + data = _normalize_pull_request(payload, "rockachopa") + assert data["merged"] is True + + +# ── Integration tests: handle_webhook ──────────────────────────────────────── + + +@pytest.mark.asyncio +class TestHandleWebhook: + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_push_emitted(self, mock_emit): + result = await handle_webhook("push", _push_payload()) + assert result is True + mock_emit.assert_called_once() + args = mock_emit.call_args + assert args[0][0] == "gitea.push" + assert args[1]["data"]["num_commits"] == 2 + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_issue_opened_emitted(self, mock_emit): + result = await handle_webhook("issues", _issue_payload()) + assert result is True + mock_emit.assert_called_once() + assert mock_emit.call_args[0][0] == "gitea.issue.opened" + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_issue_comment_emitted(self, mock_emit): + result = await handle_webhook("issue_comment", _issue_comment_payload()) + assert result is True + assert mock_emit.call_args[0][0] == "gitea.issue.comment" + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_pull_request_emitted(self, mock_emit): + result = await handle_webhook("pull_request", _pr_payload()) + assert result is True + assert mock_emit.call_args[0][0] == "gitea.pull_request" + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_unsupported_event_filtered(self, mock_emit): + result = await handle_webhook("fork", {"sender": {"login": "someone"}}) + assert result is False + mock_emit.assert_not_called() + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_bot_push_filtered(self, mock_emit): + result = await handle_webhook("push", _push_payload(actor="kimi")) + assert result is False + mock_emit.assert_not_called() + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_bot_issue_filtered(self, mock_emit): + result = await handle_webhook("issues", _issue_payload(actor="hermes")) + assert result is False + mock_emit.assert_not_called() + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_bot_pr_merge_not_filtered(self, mock_emit): + """Bot PR merges should still be emitted.""" + payload = _pr_payload(actor="kimi", action="closed", merged=True) + result = await handle_webhook("pull_request", payload) + assert result is True + mock_emit.assert_called_once() + data = mock_emit.call_args[1]["data"] + assert data["merged"] is True + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_bot_pr_close_without_merge_filtered(self, mock_emit): + """Bot PR close (not merge) should be filtered.""" + payload = _pr_payload(actor="manus", action="closed", merged=False) + result = await handle_webhook("pull_request", payload) + assert result is False + mock_emit.assert_not_called() + + @patch("timmy.adapters.gitea_adapter.emit", new_callable=AsyncMock) + async def test_owner_activity_always_emitted(self, mock_emit): + result = await handle_webhook("push", _push_payload(actor="rockachopa")) + assert result is True + mock_emit.assert_called_once()