Files
timmy-config/scripts/gitea_webhook_handler.py

83 lines
2.9 KiB
Python

#!/usr/bin/env python3
"""
[OPS] Gitea Webhook Handler
Part of the Gemini Sovereign Infrastructure Suite.
Handles real-time events from Gitea to coordinate fleet actions.
"""
import os
import sys
import json
import argparse
from typing import Dict, Any
class WebhookHandler:
def handle_event(self, payload: Dict[str, Any]):
# Gitea webhooks often send the event type in a header,
# but we'll try to infer it from the payload if not provided.
event_type = payload.get("event") or self.infer_event_type(payload)
repo_name = payload.get("repository", {}).get("name")
sender = payload.get("sender", {}).get("username")
print(f"[*] Received {event_type} event from {repo_name} (by {sender})")
if event_type == "push":
self.handle_push(payload)
elif event_type == "pull_request":
self.handle_pr(payload)
elif event_type == "issue":
self.handle_issue(payload)
else:
print(f"[INFO] Ignoring event type: {event_type}")
def infer_event_type(self, payload: Dict[str, Any]) -> str:
if "commits" in payload: return "push"
if "pull_request" in payload: return "pull_request"
if "issue" in payload: return "issue"
return "unknown"
def handle_push(self, payload: Dict[str, Any]):
ref = payload.get("ref")
print(f" [PUSH] Branch: {ref}")
# Trigger CI or deployment
if ref == "refs/heads/main":
print(" [ACTION] Triggering production deployment...")
# Example: subprocess.run(["./deploy.sh"])
def handle_pr(self, payload: Dict[str, Any]):
action = payload.get("action")
pr_num = payload.get("pull_request", {}).get("number")
print(f" [PR] Action: {action} | PR #{pr_num}")
if action in ["opened", "synchronized"]:
print(f" [ACTION] Triggering architecture linter for PR #{pr_num}...")
# Example: subprocess.run(["python3", "scripts/architecture_linter_v2.py"])
def handle_issue(self, payload: Dict[str, Any]):
action = payload.get("action")
issue_num = payload.get("issue", {}).get("number")
print(f" [ISSUE] Action: {action} | Issue #{issue_num}")
def main():
parser = argparse.ArgumentParser(description="Gemini Webhook Handler")
parser.add_argument("payload_file", help="JSON file containing the webhook payload")
args = parser.parse_args()
if not os.path.exists(args.payload_file):
print(f"[ERROR] Payload file {args.payload_file} not found.")
sys.exit(1)
with open(args.payload_file, "r") as f:
try:
payload = json.load(f)
except:
print("[ERROR] Invalid JSON payload.")
sys.exit(1)
handler = WebhookHandler()
handler.handle_event(payload)
if __name__ == "__main__":
main()