Files
timmy-config/operator-gate/gitea_gate.py
Timmy Foundation Ops 9c717f36ee feat(operator-gate): transport-agnostic idempotent Gitea mutation gate (#186)
Adds the canonical write path for all operator-originated Gitea mutations.

Core:
- gitea_gate.py with deterministic idempotency keys
- Local ledger + Gitea-side probing for replay safety
- Actions: create_issue, add_comment, close_issue, assign_issue, merge_pr

Adapter:
- nostur_adapter.py for Nostr DM ingress

Tests:
- idempotency, deduplication, and normalizer unit tests

Docs:
- README with architecture invariants and usage examples
2026-04-06 15:03:03 +00:00

423 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Operator Ingress → Gitea Mutation Gate
Transport-agnostic, idempotent gate for all operator-originated Gitea mutations.
This is the single canonical write path for create/comment/assign/merge/close actions.
Design invariants:
1. Every command gets a deterministic idempotency key.
2. Before executing, the gate checks Gitea for evidence of prior execution.
3. After executing, the gate records proof of execution (via comment trail or local ledger).
4. Replay of the same command returns the previously generated result without double-writing.
"""
import hashlib
import json
import os
import urllib.request
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")]
_LEDGER_PATH = Path(os.environ.get("GATE_LEDGER_PATH", "/root/.operator_gate_ledger.jsonl"))
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class Command:
source: str # e.g. "nostr:npub10trqk..." or "matrix:@alex:..."
action: str # "create_issue" | "add_comment" | "close_issue" | "merge_pr" | "assign_issue"
repo: str # "Timmy_Foundation/timmy-config"
payload: Dict[str, Any]
timestamp_utc: str # ISO-8601, used for idempotency windowing
def idempotency_key(self) -> str:
"""Deterministic key: replay-safe, collision-resistant."""
payload_str = json.dumps(self.payload, sort_keys=True, ensure_ascii=True)
raw = f"{self.source}:{self.action}:{self.repo}:{payload_str}"
return hashlib.sha256(raw.encode()).hexdigest()[:32]
@dataclass
class Ack:
success: bool
idempotency_key: str
gitea_url: Optional[str]
gitea_number: Optional[int]
message: str
prior_execution: bool
executed_at: Optional[str] = None
# ---------------------------------------------------------------------------
# Low-level Gitea API
# ---------------------------------------------------------------------------
def _load_token() -> str:
for path in TOKEN_PATHS:
try:
with open(path) as f:
token = f.read().strip()
if token:
return token
except FileNotFoundError:
pass
raise RuntimeError("Gitea token not found")
def _headers() -> Dict[str, str]:
return {"Authorization": f"token {_load_token()}", "Content-Type": "application/json"}
def _api_request(method: str, path: str, data: Optional[Dict] = None) -> Dict[str, Any]:
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=_headers(), method=method)
with urllib.request.urlopen(req, timeout=20) as resp:
text = resp.read().decode()
return json.loads(text) if text else {"status": resp.status}
def _api_get(path: str) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
req = urllib.request.Request(url, headers=_headers())
with urllib.request.urlopen(req, timeout=20) as resp:
return json.loads(resp.read().decode())
# ---------------------------------------------------------------------------
# Ledger (local idempotency cache)
# ---------------------------------------------------------------------------
def _ledger_load() -> Dict[str, Dict]:
if not _LEDGER_PATH.exists():
return {}
entries = {}
with open(_LEDGER_PATH, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
entries[entry["idempotency_key"]] = entry
except (json.JSONDecodeError, KeyError):
continue
return entries
def _ledger_append(entry: Dict):
_LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_LEDGER_PATH, "a") as f:
f.write(json.dumps(entry, ensure_ascii=True) + "\n")
# ---------------------------------------------------------------------------
# Gitea-side idempotency probes
# ---------------------------------------------------------------------------
def _probe_issue_by_title(repo: str, title: str, window_hours: int = 24) -> Optional[Dict]:
"""Search recent open issues for a duplicate title to avoid double-create."""
try:
issues = _api_get(f"/repos/{repo}/issues?state=open&limit=20")
for issue in issues:
if issue.get("title") == title:
return issue
# Also check recently closed
issues = _api_get(f"/repos/{repo}/issues?state=closed&limit=10")
for issue in issues:
if issue.get("title") == title:
created = issue.get("created_at", "")
# naive window check
if created and "T" in created:
return issue
except Exception:
pass
return None
def _probe_comment_by_fingerprint(repo: str, issue_num: int, fingerprint: str) -> Optional[Dict]:
"""Scan recent comments for the idempotency fingerprint."""
try:
comments = _api_get(f"/repos/{repo}/issues/{issue_num}/comments?limit=50")
for comment in comments:
body = comment.get("body", "")
if fingerprint in body:
return comment
except Exception:
pass
return None
# ---------------------------------------------------------------------------
# Gate execution
# ---------------------------------------------------------------------------
class GiteaGate:
"""The single canonical write path."""
def execute(self, cmd: Command) -> Ack:
key = cmd.idempotency_key()
ledger = _ledger_load()
# 1. Local ledger hit
if key in ledger:
entry = ledger[key]
return Ack(
success=True,
idempotency_key=key,
gitea_url=entry.get("gitea_url"),
gitea_number=entry.get("gitea_number"),
message=entry.get("message", "Replay: already executed."),
prior_execution=True,
executed_at=entry.get("executed_at"),
)
# 2. Execute
handler = getattr(self, f"_handle_{cmd.action}", None)
if handler is None:
return Ack(
success=False,
idempotency_key=key,
gitea_url=None,
gitea_number=None,
message=f"Unsupported action: {cmd.action}",
prior_execution=False,
)
try:
ack = handler(cmd, key)
_ledger_append({
"idempotency_key": ack.idempotency_key,
"action": cmd.action,
"repo": cmd.repo,
"gitea_url": ack.gitea_url,
"gitea_number": ack.gitea_number,
"message": ack.message,
"executed_at": datetime.utcnow().isoformat() + "Z",
})
return ack
except Exception as e:
return Ack(
success=False,
idempotency_key=key,
gitea_url=None,
gitea_number=None,
message=f"Execution error: {e}",
prior_execution=False,
)
# -------------------------------------------------------------------
# Action handlers
# -------------------------------------------------------------------
def _handle_create_issue(self, cmd: Command, key: str) -> Ack:
title = cmd.payload.get("title", "")
body = cmd.payload.get("body", "")
assignees = cmd.payload.get("assignees")
labels = cmd.payload.get("labels")
# Probe Gitea for duplicate title
dup = _probe_issue_by_title(cmd.repo, title)
if dup:
return Ack(
success=True,
idempotency_key=key,
gitea_url=dup.get("html_url"),
gitea_number=dup.get("number"),
message=f"Duplicate detected: issue #{dup['number']} already exists.",
prior_execution=True,
)
data = {
"title": title,
"body": f"{body}\n\n---\n*Operator ingress via {cmd.source} — gate-key: `{key}`*",
}
if assignees:
data["assignees"] = assignees
if labels:
data["labels"] = labels
result = _api_request("POST", f"/repos/{cmd.repo}/issues", data)
url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{result['number']}")
return Ack(
success=True,
idempotency_key=key,
gitea_url=url,
gitea_number=result.get("number"),
message=f"Created issue #{result['number']}: {result['title']}",
prior_execution=False,
)
def _handle_add_comment(self, cmd: Command, key: str) -> Ack:
issue_num = int(cmd.payload["issue_num"])
body = cmd.payload["body"]
# Probe for fingerprint
dup = _probe_comment_by_fingerprint(cmd.repo, issue_num, key)
if dup:
return Ack(
success=True,
idempotency_key=key,
gitea_url=dup.get("html_url"),
gitea_number=issue_num,
message=f"Duplicate detected: comment already posted.",
prior_execution=True,
)
result = _api_request(
"POST",
f"/repos/{cmd.repo}/issues/{issue_num}/comments",
{"body": f"{body}\n\n---\n*gate-key: `{key}`*"},
)
url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{issue_num}#issuecomment-{result.get('id', '')}")
return Ack(
success=True,
idempotency_key=key,
gitea_url=url,
gitea_number=issue_num,
message=f"Added comment to issue #{issue_num}",
prior_execution=False,
)
def _handle_close_issue(self, cmd: Command, key: str) -> Ack:
issue_num = int(cmd.payload["issue_num"])
# Check if already closed
issue = _api_get(f"/repos/{cmd.repo}/issues/{issue_num}")
if issue.get("state") == "closed":
return Ack(
success=True,
idempotency_key=key,
gitea_url=issue.get("html_url"),
gitea_number=issue_num,
message=f"Issue #{issue_num} already closed.",
prior_execution=True,
)
result = _api_request(
"PATCH",
f"/repos/{cmd.repo}/issues/{issue_num}",
{"state": "closed"},
)
# Add closure proof comment
try:
_api_request(
"POST",
f"/repos/{cmd.repo}/issues/{issue_num}/comments",
{"body": f"Closed via operator ingress ({cmd.source}) — gate-key: `{key}`"},
)
except Exception:
pass
return Ack(
success=True,
idempotency_key=key,
gitea_url=result.get("html_url"),
gitea_number=issue_num,
message=f"Closed issue #{issue_num}",
prior_execution=False,
)
def _handle_assign_issue(self, cmd: Command, key: str) -> Ack:
issue_num = int(cmd.payload["issue_num"])
assignees = cmd.payload.get("assignees", [])
result = _api_request(
"PATCH",
f"/repos/{cmd.repo}/issues/{issue_num}",
{"assignees": assignees},
)
return Ack(
success=True,
idempotency_key=key,
gitea_url=result.get("html_url"),
gitea_number=issue_num,
message=f"Assigned issue #{issue_num} to {', '.join(assignees)}",
prior_execution=False,
)
def _handle_merge_pr(self, cmd: Command, key: str) -> Ack:
pr_num = int(cmd.payload["pr_num"])
# Check if already merged
pr = _api_get(f"/repos/{cmd.repo}/pulls/{pr_num}")
if pr.get("merged"):
return Ack(
success=True,
idempotency_key=key,
gitea_url=pr.get("html_url"),
gitea_number=pr_num,
message=f"PR #{pr_num} already merged.",
prior_execution=True,
)
result = _api_request(
"POST",
f"/repos/{cmd.repo}/pulls/{pr_num}/merge",
{"Do": "merge", "delete_branch_after_merge": False},
)
return Ack(
success=True,
idempotency_key=key,
gitea_url=pr.get("html_url"),
gitea_number=pr_num,
message=f"Merged PR #{pr_num}",
prior_execution=False,
)
# ---------------------------------------------------------------------------
# Convenience helpers for adapters
# ---------------------------------------------------------------------------
def create_issue(source: str, repo: str, title: str, body: str = "", assignees: Optional[List[str]] = None, labels: Optional[List[str]] = None) -> Ack:
return GiteaGate().execute(Command(
source=source,
action="create_issue",
repo=repo,
payload={"title": title, "body": body, "assignees": assignees or [], "labels": labels or []},
timestamp_utc=datetime.utcnow().isoformat() + "Z",
))
def add_comment(source: str, repo: str, issue_num: int, body: str) -> Ack:
return GiteaGate().execute(Command(
source=source,
action="add_comment",
repo=repo,
payload={"issue_num": issue_num, "body": body},
timestamp_utc=datetime.utcnow().isoformat() + "Z",
))
def close_issue(source: str, repo: str, issue_num: int) -> Ack:
return GiteaGate().execute(Command(
source=source,
action="close_issue",
repo=repo,
payload={"issue_num": issue_num},
timestamp_utc=datetime.utcnow().isoformat() + "Z",
))
def assign_issue(source: str, repo: str, issue_num: int, assignees: List[str]) -> Ack:
return GiteaGate().execute(Command(
source=source,
action="assign_issue",
repo=repo,
payload={"issue_num": issue_num, "assignees": assignees},
timestamp_utc=datetime.utcnow().isoformat() + "Z",
))
def merge_pr(source: str, repo: str, pr_num: int) -> Ack:
return GiteaGate().execute(Command(
source=source,
action="merge_pr",
repo=repo,
payload={"pr_num": pr_num},
timestamp_utc=datetime.utcnow().isoformat() + "Z",
))