Adds the canonical write path for all operator-originated Gitea mutations. Core: - gitea_gate.py with deterministic idempotency keys - Local ledger + Gitea-side probing for replay safety - Actions: create_issue, add_comment, close_issue, assign_issue, merge_pr Adapter: - nostur_adapter.py for Nostr DM ingress Tests: - idempotency, deduplication, and normalizer unit tests Docs: - README with architecture invariants and usage examples
423 lines
15 KiB
Python
423 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Operator Ingress → Gitea Mutation Gate
|
|
|
|
Transport-agnostic, idempotent gate for all operator-originated Gitea mutations.
|
|
This is the single canonical write path for create/comment/assign/merge/close actions.
|
|
|
|
Design invariants:
|
|
1. Every command gets a deterministic idempotency key.
|
|
2. Before executing, the gate checks Gitea for evidence of prior execution.
|
|
3. After executing, the gate records proof of execution (via comment trail or local ledger).
|
|
4. Replay of the same command returns the previously generated result without double-writing.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import urllib.request
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
|
TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")]
|
|
_LEDGER_PATH = Path(os.environ.get("GATE_LEDGER_PATH", "/root/.operator_gate_ledger.jsonl"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data models
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass(frozen=True)
|
|
class Command:
|
|
source: str # e.g. "nostr:npub10trqk..." or "matrix:@alex:..."
|
|
action: str # "create_issue" | "add_comment" | "close_issue" | "merge_pr" | "assign_issue"
|
|
repo: str # "Timmy_Foundation/timmy-config"
|
|
payload: Dict[str, Any]
|
|
timestamp_utc: str # ISO-8601, used for idempotency windowing
|
|
|
|
def idempotency_key(self) -> str:
|
|
"""Deterministic key: replay-safe, collision-resistant."""
|
|
payload_str = json.dumps(self.payload, sort_keys=True, ensure_ascii=True)
|
|
raw = f"{self.source}:{self.action}:{self.repo}:{payload_str}"
|
|
return hashlib.sha256(raw.encode()).hexdigest()[:32]
|
|
|
|
|
|
@dataclass
|
|
class Ack:
|
|
success: bool
|
|
idempotency_key: str
|
|
gitea_url: Optional[str]
|
|
gitea_number: Optional[int]
|
|
message: str
|
|
prior_execution: bool
|
|
executed_at: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Low-level Gitea API
|
|
# ---------------------------------------------------------------------------
|
|
def _load_token() -> str:
|
|
for path in TOKEN_PATHS:
|
|
try:
|
|
with open(path) as f:
|
|
token = f.read().strip()
|
|
if token:
|
|
return token
|
|
except FileNotFoundError:
|
|
pass
|
|
raise RuntimeError("Gitea token not found")
|
|
|
|
|
|
def _headers() -> Dict[str, str]:
|
|
return {"Authorization": f"token {_load_token()}", "Content-Type": "application/json"}
|
|
|
|
|
|
def _api_request(method: str, path: str, data: Optional[Dict] = None) -> Dict[str, Any]:
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
body = json.dumps(data).encode() if data else None
|
|
req = urllib.request.Request(url, data=body, headers=_headers(), method=method)
|
|
with urllib.request.urlopen(req, timeout=20) as resp:
|
|
text = resp.read().decode()
|
|
return json.loads(text) if text else {"status": resp.status}
|
|
|
|
|
|
def _api_get(path: str) -> Any:
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
req = urllib.request.Request(url, headers=_headers())
|
|
with urllib.request.urlopen(req, timeout=20) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ledger (local idempotency cache)
|
|
# ---------------------------------------------------------------------------
|
|
def _ledger_load() -> Dict[str, Dict]:
|
|
if not _LEDGER_PATH.exists():
|
|
return {}
|
|
entries = {}
|
|
with open(_LEDGER_PATH, "r") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
entry = json.loads(line)
|
|
entries[entry["idempotency_key"]] = entry
|
|
except (json.JSONDecodeError, KeyError):
|
|
continue
|
|
return entries
|
|
|
|
|
|
def _ledger_append(entry: Dict):
|
|
_LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(_LEDGER_PATH, "a") as f:
|
|
f.write(json.dumps(entry, ensure_ascii=True) + "\n")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gitea-side idempotency probes
|
|
# ---------------------------------------------------------------------------
|
|
def _probe_issue_by_title(repo: str, title: str, window_hours: int = 24) -> Optional[Dict]:
|
|
"""Search recent open issues for a duplicate title to avoid double-create."""
|
|
try:
|
|
issues = _api_get(f"/repos/{repo}/issues?state=open&limit=20")
|
|
for issue in issues:
|
|
if issue.get("title") == title:
|
|
return issue
|
|
# Also check recently closed
|
|
issues = _api_get(f"/repos/{repo}/issues?state=closed&limit=10")
|
|
for issue in issues:
|
|
if issue.get("title") == title:
|
|
created = issue.get("created_at", "")
|
|
# naive window check
|
|
if created and "T" in created:
|
|
return issue
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _probe_comment_by_fingerprint(repo: str, issue_num: int, fingerprint: str) -> Optional[Dict]:
|
|
"""Scan recent comments for the idempotency fingerprint."""
|
|
try:
|
|
comments = _api_get(f"/repos/{repo}/issues/{issue_num}/comments?limit=50")
|
|
for comment in comments:
|
|
body = comment.get("body", "")
|
|
if fingerprint in body:
|
|
return comment
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gate execution
|
|
# ---------------------------------------------------------------------------
|
|
class GiteaGate:
|
|
"""The single canonical write path."""
|
|
|
|
def execute(self, cmd: Command) -> Ack:
|
|
key = cmd.idempotency_key()
|
|
ledger = _ledger_load()
|
|
|
|
# 1. Local ledger hit
|
|
if key in ledger:
|
|
entry = ledger[key]
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=entry.get("gitea_url"),
|
|
gitea_number=entry.get("gitea_number"),
|
|
message=entry.get("message", "Replay: already executed."),
|
|
prior_execution=True,
|
|
executed_at=entry.get("executed_at"),
|
|
)
|
|
|
|
# 2. Execute
|
|
handler = getattr(self, f"_handle_{cmd.action}", None)
|
|
if handler is None:
|
|
return Ack(
|
|
success=False,
|
|
idempotency_key=key,
|
|
gitea_url=None,
|
|
gitea_number=None,
|
|
message=f"Unsupported action: {cmd.action}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
try:
|
|
ack = handler(cmd, key)
|
|
_ledger_append({
|
|
"idempotency_key": ack.idempotency_key,
|
|
"action": cmd.action,
|
|
"repo": cmd.repo,
|
|
"gitea_url": ack.gitea_url,
|
|
"gitea_number": ack.gitea_number,
|
|
"message": ack.message,
|
|
"executed_at": datetime.utcnow().isoformat() + "Z",
|
|
})
|
|
return ack
|
|
except Exception as e:
|
|
return Ack(
|
|
success=False,
|
|
idempotency_key=key,
|
|
gitea_url=None,
|
|
gitea_number=None,
|
|
message=f"Execution error: {e}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
# -------------------------------------------------------------------
|
|
# Action handlers
|
|
# -------------------------------------------------------------------
|
|
def _handle_create_issue(self, cmd: Command, key: str) -> Ack:
|
|
title = cmd.payload.get("title", "")
|
|
body = cmd.payload.get("body", "")
|
|
assignees = cmd.payload.get("assignees")
|
|
labels = cmd.payload.get("labels")
|
|
|
|
# Probe Gitea for duplicate title
|
|
dup = _probe_issue_by_title(cmd.repo, title)
|
|
if dup:
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=dup.get("html_url"),
|
|
gitea_number=dup.get("number"),
|
|
message=f"Duplicate detected: issue #{dup['number']} already exists.",
|
|
prior_execution=True,
|
|
)
|
|
|
|
data = {
|
|
"title": title,
|
|
"body": f"{body}\n\n---\n*Operator ingress via {cmd.source} — gate-key: `{key}`*",
|
|
}
|
|
if assignees:
|
|
data["assignees"] = assignees
|
|
if labels:
|
|
data["labels"] = labels
|
|
|
|
result = _api_request("POST", f"/repos/{cmd.repo}/issues", data)
|
|
url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{result['number']}")
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=url,
|
|
gitea_number=result.get("number"),
|
|
message=f"Created issue #{result['number']}: {result['title']}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
def _handle_add_comment(self, cmd: Command, key: str) -> Ack:
|
|
issue_num = int(cmd.payload["issue_num"])
|
|
body = cmd.payload["body"]
|
|
|
|
# Probe for fingerprint
|
|
dup = _probe_comment_by_fingerprint(cmd.repo, issue_num, key)
|
|
if dup:
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=dup.get("html_url"),
|
|
gitea_number=issue_num,
|
|
message=f"Duplicate detected: comment already posted.",
|
|
prior_execution=True,
|
|
)
|
|
|
|
result = _api_request(
|
|
"POST",
|
|
f"/repos/{cmd.repo}/issues/{issue_num}/comments",
|
|
{"body": f"{body}\n\n---\n*gate-key: `{key}`*"},
|
|
)
|
|
url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{issue_num}#issuecomment-{result.get('id', '')}")
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=url,
|
|
gitea_number=issue_num,
|
|
message=f"Added comment to issue #{issue_num}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
def _handle_close_issue(self, cmd: Command, key: str) -> Ack:
|
|
issue_num = int(cmd.payload["issue_num"])
|
|
# Check if already closed
|
|
issue = _api_get(f"/repos/{cmd.repo}/issues/{issue_num}")
|
|
if issue.get("state") == "closed":
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=issue.get("html_url"),
|
|
gitea_number=issue_num,
|
|
message=f"Issue #{issue_num} already closed.",
|
|
prior_execution=True,
|
|
)
|
|
|
|
result = _api_request(
|
|
"PATCH",
|
|
f"/repos/{cmd.repo}/issues/{issue_num}",
|
|
{"state": "closed"},
|
|
)
|
|
# Add closure proof comment
|
|
try:
|
|
_api_request(
|
|
"POST",
|
|
f"/repos/{cmd.repo}/issues/{issue_num}/comments",
|
|
{"body": f"Closed via operator ingress ({cmd.source}) — gate-key: `{key}`"},
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=result.get("html_url"),
|
|
gitea_number=issue_num,
|
|
message=f"Closed issue #{issue_num}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
def _handle_assign_issue(self, cmd: Command, key: str) -> Ack:
|
|
issue_num = int(cmd.payload["issue_num"])
|
|
assignees = cmd.payload.get("assignees", [])
|
|
result = _api_request(
|
|
"PATCH",
|
|
f"/repos/{cmd.repo}/issues/{issue_num}",
|
|
{"assignees": assignees},
|
|
)
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=result.get("html_url"),
|
|
gitea_number=issue_num,
|
|
message=f"Assigned issue #{issue_num} to {', '.join(assignees)}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
def _handle_merge_pr(self, cmd: Command, key: str) -> Ack:
|
|
pr_num = int(cmd.payload["pr_num"])
|
|
# Check if already merged
|
|
pr = _api_get(f"/repos/{cmd.repo}/pulls/{pr_num}")
|
|
if pr.get("merged"):
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=pr.get("html_url"),
|
|
gitea_number=pr_num,
|
|
message=f"PR #{pr_num} already merged.",
|
|
prior_execution=True,
|
|
)
|
|
|
|
result = _api_request(
|
|
"POST",
|
|
f"/repos/{cmd.repo}/pulls/{pr_num}/merge",
|
|
{"Do": "merge", "delete_branch_after_merge": False},
|
|
)
|
|
return Ack(
|
|
success=True,
|
|
idempotency_key=key,
|
|
gitea_url=pr.get("html_url"),
|
|
gitea_number=pr_num,
|
|
message=f"Merged PR #{pr_num}",
|
|
prior_execution=False,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Convenience helpers for adapters
|
|
# ---------------------------------------------------------------------------
|
|
def create_issue(source: str, repo: str, title: str, body: str = "", assignees: Optional[List[str]] = None, labels: Optional[List[str]] = None) -> Ack:
|
|
return GiteaGate().execute(Command(
|
|
source=source,
|
|
action="create_issue",
|
|
repo=repo,
|
|
payload={"title": title, "body": body, "assignees": assignees or [], "labels": labels or []},
|
|
timestamp_utc=datetime.utcnow().isoformat() + "Z",
|
|
))
|
|
|
|
|
|
def add_comment(source: str, repo: str, issue_num: int, body: str) -> Ack:
|
|
return GiteaGate().execute(Command(
|
|
source=source,
|
|
action="add_comment",
|
|
repo=repo,
|
|
payload={"issue_num": issue_num, "body": body},
|
|
timestamp_utc=datetime.utcnow().isoformat() + "Z",
|
|
))
|
|
|
|
|
|
def close_issue(source: str, repo: str, issue_num: int) -> Ack:
|
|
return GiteaGate().execute(Command(
|
|
source=source,
|
|
action="close_issue",
|
|
repo=repo,
|
|
payload={"issue_num": issue_num},
|
|
timestamp_utc=datetime.utcnow().isoformat() + "Z",
|
|
))
|
|
|
|
|
|
def assign_issue(source: str, repo: str, issue_num: int, assignees: List[str]) -> Ack:
|
|
return GiteaGate().execute(Command(
|
|
source=source,
|
|
action="assign_issue",
|
|
repo=repo,
|
|
payload={"issue_num": issue_num, "assignees": assignees},
|
|
timestamp_utc=datetime.utcnow().isoformat() + "Z",
|
|
))
|
|
|
|
|
|
def merge_pr(source: str, repo: str, pr_num: int) -> Ack:
|
|
return GiteaGate().execute(Command(
|
|
source=source,
|
|
action="merge_pr",
|
|
repo=repo,
|
|
payload={"pr_num": pr_num},
|
|
timestamp_utc=datetime.utcnow().isoformat() + "Z",
|
|
))
|