forked from Rockachopa/Timmy-time-dashboard
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
"""Gitea webhook adapter — normalize webhook payloads to event bus events.
|
|
|
|
Receives raw Gitea webhook payloads and emits typed events via the
|
|
infrastructure event bus. Bot-only activity is filtered unless it
|
|
represents a PR merge (which is always noteworthy).
|
|
"""
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from infrastructure.events.bus import emit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Gitea usernames considered "bot" accounts
|
|
BOT_USERNAMES = frozenset({"hermes", "kimi", "manus"})
|
|
|
|
# Owner username — activity from this user is always emitted
|
|
OWNER_USERNAME = "rockachopa"
|
|
|
|
# Mapping from Gitea webhook event type to our bus event type
|
|
_EVENT_TYPE_MAP = {
|
|
"push": "gitea.push",
|
|
"issues": "gitea.issue.opened",
|
|
"issue_comment": "gitea.issue.comment",
|
|
"pull_request": "gitea.pull_request",
|
|
}
|
|
|
|
|
|
def _extract_actor(payload: dict[str, Any]) -> str:
|
|
"""Extract the actor username from a webhook payload."""
|
|
# Gitea puts actor in sender.login for most events
|
|
sender = payload.get("sender", {})
|
|
return sender.get("login", "unknown")
|
|
|
|
|
|
def _is_bot(username: str) -> bool:
|
|
return username.lower() in BOT_USERNAMES
|
|
|
|
|
|
def _is_pr_merge(event_type: str, payload: dict[str, Any]) -> bool:
|
|
"""Check if this is a pull_request merge event."""
|
|
if event_type != "pull_request":
|
|
return False
|
|
action = payload.get("action", "")
|
|
pr = payload.get("pull_request", {})
|
|
return action == "closed" and pr.get("merged", False)
|
|
|
|
|
|
def _normalize_push(payload: dict[str, Any], actor: str) -> dict[str, Any]:
|
|
"""Normalize a push event payload."""
|
|
commits = payload.get("commits", [])
|
|
return {
|
|
"actor": actor,
|
|
"ref": payload.get("ref", ""),
|
|
"repo": payload.get("repository", {}).get("full_name", ""),
|
|
"num_commits": len(commits),
|
|
"head_message": commits[0].get("message", "").split("\n", 1)[0].strip() if commits else "",
|
|
}
|
|
|
|
|
|
def _normalize_issue_opened(payload: dict[str, Any], actor: str) -> dict[str, Any]:
|
|
"""Normalize an issue-opened event payload."""
|
|
issue = payload.get("issue", {})
|
|
return {
|
|
"actor": actor,
|
|
"action": payload.get("action", "opened"),
|
|
"repo": payload.get("repository", {}).get("full_name", ""),
|
|
"issue_number": issue.get("number", 0),
|
|
"title": issue.get("title", ""),
|
|
}
|
|
|
|
|
|
def _normalize_issue_comment(payload: dict[str, Any], actor: str) -> dict[str, Any]:
|
|
"""Normalize an issue-comment event payload."""
|
|
issue = payload.get("issue", {})
|
|
comment = payload.get("comment", {})
|
|
return {
|
|
"actor": actor,
|
|
"action": payload.get("action", "created"),
|
|
"repo": payload.get("repository", {}).get("full_name", ""),
|
|
"issue_number": issue.get("number", 0),
|
|
"issue_title": issue.get("title", ""),
|
|
"comment_body": (comment.get("body", "")[:200]),
|
|
}
|
|
|
|
|
|
def _normalize_pull_request(payload: dict[str, Any], actor: str) -> dict[str, Any]:
|
|
"""Normalize a pull-request event payload."""
|
|
pr = payload.get("pull_request", {})
|
|
return {
|
|
"actor": actor,
|
|
"action": payload.get("action", ""),
|
|
"repo": payload.get("repository", {}).get("full_name", ""),
|
|
"pr_number": pr.get("number", 0),
|
|
"title": pr.get("title", ""),
|
|
"merged": pr.get("merged", False),
|
|
}
|
|
|
|
|
|
_NORMALIZERS = {
|
|
"push": _normalize_push,
|
|
"issues": _normalize_issue_opened,
|
|
"issue_comment": _normalize_issue_comment,
|
|
"pull_request": _normalize_pull_request,
|
|
}
|
|
|
|
|
|
async def handle_webhook(event_type: str, payload: dict[str, Any]) -> bool:
|
|
"""Normalize a Gitea webhook payload and emit it to the event bus.
|
|
|
|
Args:
|
|
event_type: The Gitea event type header (e.g. "push", "issues").
|
|
payload: The raw JSON payload from the webhook.
|
|
|
|
Returns:
|
|
True if an event was emitted, False if filtered or unsupported.
|
|
"""
|
|
bus_event_type = _EVENT_TYPE_MAP.get(event_type)
|
|
if bus_event_type is None:
|
|
logger.debug("Unsupported Gitea event type: %s", event_type)
|
|
return False
|
|
|
|
actor = _extract_actor(payload)
|
|
|
|
# Filter bot-only activity — except PR merges
|
|
if _is_bot(actor) and not _is_pr_merge(event_type, payload):
|
|
logger.debug("Filtered bot activity from %s on %s", actor, event_type)
|
|
return False
|
|
|
|
normalizer = _NORMALIZERS[event_type]
|
|
data = normalizer(payload, actor)
|
|
|
|
await emit(bus_event_type, source="gitea", data=data)
|
|
logger.info("Emitted %s from %s", bus_event_type, actor)
|
|
return True
|