#!/usr/bin/env python3 """ [OPS] Gitea Webhook Handler Part of the Gemini Sovereign Infrastructure Suite. Handles real-time events from Gitea to coordinate fleet actions. """ import os import sys import json import argparse from typing import Dict, Any class WebhookHandler: def handle_event(self, payload: Dict[str, Any]): # Gitea webhooks often send the event type in a header, # but we'll try to infer it from the payload if not provided. event_type = payload.get("event") or self.infer_event_type(payload) repo_name = payload.get("repository", {}).get("name") sender = payload.get("sender", {}).get("username") print(f"[*] Received {event_type} event from {repo_name} (by {sender})") if event_type == "push": self.handle_push(payload) elif event_type == "pull_request": self.handle_pr(payload) elif event_type == "issue": self.handle_issue(payload) else: print(f"[INFO] Ignoring event type: {event_type}") def infer_event_type(self, payload: Dict[str, Any]) -> str: if "commits" in payload: return "push" if "pull_request" in payload: return "pull_request" if "issue" in payload: return "issue" return "unknown" def handle_push(self, payload: Dict[str, Any]): ref = payload.get("ref") print(f" [PUSH] Branch: {ref}") # Trigger CI or deployment if ref == "refs/heads/main": print(" [ACTION] Triggering production deployment...") # Example: subprocess.run(["./deploy.sh"]) def handle_pr(self, payload: Dict[str, Any]): action = payload.get("action") pr_num = payload.get("pull_request", {}).get("number") print(f" [PR] Action: {action} | PR #{pr_num}") if action in ["opened", "synchronized"]: print(f" [ACTION] Triggering architecture linter for PR #{pr_num}...") # Example: subprocess.run(["python3", "scripts/architecture_linter_v2.py"]) def handle_issue(self, payload: Dict[str, Any]): action = payload.get("action") issue_num = payload.get("issue", {}).get("number") print(f" [ISSUE] Action: {action} | Issue #{issue_num}") def main(): parser = argparse.ArgumentParser(description="Gemini Webhook Handler") parser.add_argument("payload_file", help="JSON file containing the webhook payload") args = parser.parse_args() if not os.path.exists(args.payload_file): print(f"[ERROR] Payload file {args.payload_file} not found.") sys.exit(1) with open(args.payload_file, "r") as f: try: payload = json.load(f) except: print("[ERROR] Invalid JSON payload.") sys.exit(1) handler = WebhookHandler() handler.handle_event(payload) if __name__ == "__main__": main()