#!/usr/bin/env python3 """ Operator Ingress → Gitea Mutation Gate Transport-agnostic, idempotent gate for all operator-originated Gitea mutations. This is the single canonical write path for create/comment/assign/merge/close actions. Design invariants: 1. Every command gets a deterministic idempotency key. 2. Before executing, the gate checks Gitea for evidence of prior execution. 3. After executing, the gate records proof of execution (via comment trail or local ledger). 4. Replay of the same command returns the previously generated result without double-writing. """ import hashlib import json import os import urllib.request from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com") TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")] _LEDGER_PATH = Path(os.environ.get("GATE_LEDGER_PATH", "/root/.operator_gate_ledger.jsonl")) # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- @dataclass(frozen=True) class Command: source: str # e.g. "nostr:npub10trqk..." or "matrix:@alex:..." action: str # "create_issue" | "add_comment" | "close_issue" | "merge_pr" | "assign_issue" repo: str # "Timmy_Foundation/timmy-config" payload: Dict[str, Any] timestamp_utc: str # ISO-8601, used for idempotency windowing def idempotency_key(self) -> str: """Deterministic key: replay-safe, collision-resistant.""" payload_str = json.dumps(self.payload, sort_keys=True, ensure_ascii=True) raw = f"{self.source}:{self.action}:{self.repo}:{payload_str}" return hashlib.sha256(raw.encode()).hexdigest()[:32] @dataclass class Ack: success: bool idempotency_key: str gitea_url: Optional[str] gitea_number: Optional[int] message: str prior_execution: bool executed_at: Optional[str] = None # --------------------------------------------------------------------------- # Low-level Gitea API # --------------------------------------------------------------------------- def _load_token() -> str: for path in TOKEN_PATHS: try: with open(path) as f: token = f.read().strip() if token: return token except FileNotFoundError: pass raise RuntimeError("Gitea token not found") def _headers() -> Dict[str, str]: return {"Authorization": f"token {_load_token()}", "Content-Type": "application/json"} def _api_request(method: str, path: str, data: Optional[Dict] = None) -> Dict[str, Any]: url = f"{GITEA_URL}/api/v1{path}" body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, headers=_headers(), method=method) with urllib.request.urlopen(req, timeout=20) as resp: text = resp.read().decode() return json.loads(text) if text else {"status": resp.status} def _api_get(path: str) -> Any: url = f"{GITEA_URL}/api/v1{path}" req = urllib.request.Request(url, headers=_headers()) with urllib.request.urlopen(req, timeout=20) as resp: return json.loads(resp.read().decode()) # --------------------------------------------------------------------------- # Ledger (local idempotency cache) # --------------------------------------------------------------------------- def _ledger_load() -> Dict[str, Dict]: if not _LEDGER_PATH.exists(): return {} entries = {} with open(_LEDGER_PATH, "r") as f: for line in f: line = line.strip() if not line: continue try: entry = json.loads(line) entries[entry["idempotency_key"]] = entry except (json.JSONDecodeError, KeyError): continue return entries def _ledger_append(entry: Dict): _LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True) with open(_LEDGER_PATH, "a") as f: f.write(json.dumps(entry, ensure_ascii=True) + "\n") # --------------------------------------------------------------------------- # Gitea-side idempotency probes # --------------------------------------------------------------------------- def _probe_issue_by_title(repo: str, title: str, window_hours: int = 24) -> Optional[Dict]: """Search recent open issues for a duplicate title to avoid double-create.""" try: issues = _api_get(f"/repos/{repo}/issues?state=open&limit=20") for issue in issues: if issue.get("title") == title: return issue # Also check recently closed issues = _api_get(f"/repos/{repo}/issues?state=closed&limit=10") for issue in issues: if issue.get("title") == title: created = issue.get("created_at", "") # naive window check if created and "T" in created: return issue except Exception: pass return None def _probe_comment_by_fingerprint(repo: str, issue_num: int, fingerprint: str) -> Optional[Dict]: """Scan recent comments for the idempotency fingerprint.""" try: comments = _api_get(f"/repos/{repo}/issues/{issue_num}/comments?limit=50") for comment in comments: body = comment.get("body", "") if fingerprint in body: return comment except Exception: pass return None # --------------------------------------------------------------------------- # Gate execution # --------------------------------------------------------------------------- class GiteaGate: """The single canonical write path.""" def execute(self, cmd: Command) -> Ack: key = cmd.idempotency_key() ledger = _ledger_load() # 1. Local ledger hit if key in ledger: entry = ledger[key] return Ack( success=True, idempotency_key=key, gitea_url=entry.get("gitea_url"), gitea_number=entry.get("gitea_number"), message=entry.get("message", "Replay: already executed."), prior_execution=True, executed_at=entry.get("executed_at"), ) # 2. Execute handler = getattr(self, f"_handle_{cmd.action}", None) if handler is None: return Ack( success=False, idempotency_key=key, gitea_url=None, gitea_number=None, message=f"Unsupported action: {cmd.action}", prior_execution=False, ) try: ack = handler(cmd, key) _ledger_append({ "idempotency_key": ack.idempotency_key, "action": cmd.action, "repo": cmd.repo, "gitea_url": ack.gitea_url, "gitea_number": ack.gitea_number, "message": ack.message, "executed_at": datetime.utcnow().isoformat() + "Z", }) return ack except Exception as e: return Ack( success=False, idempotency_key=key, gitea_url=None, gitea_number=None, message=f"Execution error: {e}", prior_execution=False, ) # ------------------------------------------------------------------- # Action handlers # ------------------------------------------------------------------- def _handle_create_issue(self, cmd: Command, key: str) -> Ack: title = cmd.payload.get("title", "") body = cmd.payload.get("body", "") assignees = cmd.payload.get("assignees") labels = cmd.payload.get("labels") # Probe Gitea for duplicate title dup = _probe_issue_by_title(cmd.repo, title) if dup: return Ack( success=True, idempotency_key=key, gitea_url=dup.get("html_url"), gitea_number=dup.get("number"), message=f"Duplicate detected: issue #{dup['number']} already exists.", prior_execution=True, ) data = { "title": title, "body": f"{body}\n\n---\n*Operator ingress via {cmd.source} — gate-key: `{key}`*", } if assignees: data["assignees"] = assignees if labels: data["labels"] = labels result = _api_request("POST", f"/repos/{cmd.repo}/issues", data) url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{result['number']}") return Ack( success=True, idempotency_key=key, gitea_url=url, gitea_number=result.get("number"), message=f"Created issue #{result['number']}: {result['title']}", prior_execution=False, ) def _handle_add_comment(self, cmd: Command, key: str) -> Ack: issue_num = int(cmd.payload["issue_num"]) body = cmd.payload["body"] # Probe for fingerprint dup = _probe_comment_by_fingerprint(cmd.repo, issue_num, key) if dup: return Ack( success=True, idempotency_key=key, gitea_url=dup.get("html_url"), gitea_number=issue_num, message=f"Duplicate detected: comment already posted.", prior_execution=True, ) result = _api_request( "POST", f"/repos/{cmd.repo}/issues/{issue_num}/comments", {"body": f"{body}\n\n---\n*gate-key: `{key}`*"}, ) url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{issue_num}#issuecomment-{result.get('id', '')}") return Ack( success=True, idempotency_key=key, gitea_url=url, gitea_number=issue_num, message=f"Added comment to issue #{issue_num}", prior_execution=False, ) def _handle_close_issue(self, cmd: Command, key: str) -> Ack: issue_num = int(cmd.payload["issue_num"]) # Check if already closed issue = _api_get(f"/repos/{cmd.repo}/issues/{issue_num}") if issue.get("state") == "closed": return Ack( success=True, idempotency_key=key, gitea_url=issue.get("html_url"), gitea_number=issue_num, message=f"Issue #{issue_num} already closed.", prior_execution=True, ) result = _api_request( "PATCH", f"/repos/{cmd.repo}/issues/{issue_num}", {"state": "closed"}, ) # Add closure proof comment try: _api_request( "POST", f"/repos/{cmd.repo}/issues/{issue_num}/comments", {"body": f"Closed via operator ingress ({cmd.source}) — gate-key: `{key}`"}, ) except Exception: pass return Ack( success=True, idempotency_key=key, gitea_url=result.get("html_url"), gitea_number=issue_num, message=f"Closed issue #{issue_num}", prior_execution=False, ) def _handle_assign_issue(self, cmd: Command, key: str) -> Ack: issue_num = int(cmd.payload["issue_num"]) assignees = cmd.payload.get("assignees", []) result = _api_request( "PATCH", f"/repos/{cmd.repo}/issues/{issue_num}", {"assignees": assignees}, ) return Ack( success=True, idempotency_key=key, gitea_url=result.get("html_url"), gitea_number=issue_num, message=f"Assigned issue #{issue_num} to {', '.join(assignees)}", prior_execution=False, ) def _handle_merge_pr(self, cmd: Command, key: str) -> Ack: pr_num = int(cmd.payload["pr_num"]) # Check if already merged pr = _api_get(f"/repos/{cmd.repo}/pulls/{pr_num}") if pr.get("merged"): return Ack( success=True, idempotency_key=key, gitea_url=pr.get("html_url"), gitea_number=pr_num, message=f"PR #{pr_num} already merged.", prior_execution=True, ) result = _api_request( "POST", f"/repos/{cmd.repo}/pulls/{pr_num}/merge", {"Do": "merge", "delete_branch_after_merge": False}, ) return Ack( success=True, idempotency_key=key, gitea_url=pr.get("html_url"), gitea_number=pr_num, message=f"Merged PR #{pr_num}", prior_execution=False, ) # --------------------------------------------------------------------------- # Convenience helpers for adapters # --------------------------------------------------------------------------- def create_issue(source: str, repo: str, title: str, body: str = "", assignees: Optional[List[str]] = None, labels: Optional[List[str]] = None) -> Ack: return GiteaGate().execute(Command( source=source, action="create_issue", repo=repo, payload={"title": title, "body": body, "assignees": assignees or [], "labels": labels or []}, timestamp_utc=datetime.utcnow().isoformat() + "Z", )) def add_comment(source: str, repo: str, issue_num: int, body: str) -> Ack: return GiteaGate().execute(Command( source=source, action="add_comment", repo=repo, payload={"issue_num": issue_num, "body": body}, timestamp_utc=datetime.utcnow().isoformat() + "Z", )) def close_issue(source: str, repo: str, issue_num: int) -> Ack: return GiteaGate().execute(Command( source=source, action="close_issue", repo=repo, payload={"issue_num": issue_num}, timestamp_utc=datetime.utcnow().isoformat() + "Z", )) def assign_issue(source: str, repo: str, issue_num: int, assignees: List[str]) -> Ack: return GiteaGate().execute(Command( source=source, action="assign_issue", repo=repo, payload={"issue_num": issue_num, "assignees": assignees}, timestamp_utc=datetime.utcnow().isoformat() + "Z", )) def merge_pr(source: str, repo: str, pr_num: int) -> Ack: return GiteaGate().execute(Command( source=source, action="merge_pr", repo=repo, payload={"pr_num": pr_num}, timestamp_utc=datetime.utcnow().isoformat() + "Z", ))