diff --git a/operator-gate/README.md b/operator-gate/README.md new file mode 100644 index 00000000..3810298e --- /dev/null +++ b/operator-gate/README.md @@ -0,0 +1,87 @@ +# Operator Ingress Gate + +Transport-agnostic, idempotent write gate for all operator-originated Gitea mutations. + +## Purpose + +This is the **single canonical write path** for any external command that creates, updates, or merges Gitea issues/PRs. Whether the command arrives via Nostr DM, Matrix, Telegram, or a local script, it must pass through this gate. + +## Design Invariants + +1. **Deterministic idempotency key** — SHA-256 of `(source, action, repo, sorted_payload)`. +2. **Local ledger deduplication** — First line of defense; replayed commands return the original ACK instantly. +3. **Gitea-side probing** — Second line of defense; scans existing issues/comments for duplicates before mutating. +4. **Proof-of-execution** — Every mutation embeds the gate-key in the Gitea comment/issue body, creating an audit trail. +5. **Replay safety** — Running the same command twice never double-writes. + +## Structure + +``` +operator-gate/ +├── gitea_gate.py # Core gate logic (Command, Ack, GiteaGate) +├── adapters/ +│ └── nostur_adapter.py # Nostr DM adapter +├── tests/ +│ └── test_gate_idempotency.py +└── README.md # This file +``` + +## Supported Actions + +| Action | Payload keys | +|---------------|-------------------------------------------| +| `create_issue`| `title`, `body`, `assignees[]`, `labels[]`| +| `add_comment` | `issue_num`, `body` | +| `close_issue` | `issue_num` | +| `assign_issue`| `issue_num`, `assignees[]` | +| `merge_pr` | `pr_num` | + +## Usage + +```python +from gitea_gate import create_issue, add_comment + +ack = create_issue( + source="nostr:npub10trqk...", + repo="Timmy_Foundation/timmy-home", + title="Fix relay timeout", + body="The nostr relay drops connections after 30s.", + assignees=["allegro"], +) + +print(ack.gitea_url) # Canonical URL of the created issue +print(ack.idempotency_key) # 32-char fingerprint +``` + +## Nostur Adapter + +The Nostur adapter polls the local Nostr relay (`ws://localhost:2929`) for Kind-4 DMs from authorized operator pubkeys. It normalizes DM text into gate commands: + +- `status` → queue summary for `the-nexus` +- `create ` → create issue +- `comment <repo> #<num> <text>` → add comment +- `close <repo> #<num>` → close issue +- `assign <repo> #<num> <user1,user2>` → assign issue +- `merge <repo> #<num>` → merge PR + +Run it: + +```bash +python3 operator-gate/adapters/nostur_adapter.py +``` + +## Running Tests + +```bash +cd operator-gate +python3 -m unittest tests.test_gate_idempotency -v +``` + +## Future Adapters + +- Matrix bot adapter +- Telegram bot adapter +- Local CLI adapter (`hermes operator-do ...`) + +--- +*Built for the Timmy Foundation fleet. Sovereignty and service always.* diff --git a/operator-gate/__init__.py b/operator-gate/__init__.py new file mode 100644 index 00000000..a30db2ec --- /dev/null +++ b/operator-gate/__init__.py @@ -0,0 +1,22 @@ +"""Operator Ingress Gate for Gitea mutations.""" +from .gitea_gate import ( + Command, + Ack, + GiteaGate, + create_issue, + add_comment, + close_issue, + assign_issue, + merge_pr, +) + +__all__ = [ + "Command", + "Ack", + "GiteaGate", + "create_issue", + "add_comment", + "close_issue", + "assign_issue", + "merge_pr", +] diff --git a/operator-gate/adapters/nostur_adapter.py b/operator-gate/adapters/nostur_adapter.py new file mode 100644 index 00000000..92d79b49 --- /dev/null +++ b/operator-gate/adapters/nostur_adapter.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +""" +Nostur (Nostr DM) Adapter for the Operator Ingress Gate + +Reads Kind-4 DMs from authorized operator pubkeys, normalizes them into +Gate Commands, and executes via gitea_gate. + +This adapter is deliberately thin: all idempotency, deduplication, and +mutation logic lives in the gate. +""" + +import asyncio +import json +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +# Add parent to path for gate import +sys.path.insert(0, str(Path(__file__).parent.parent)) + +try: + from nostr_sdk import Keys, Client, Filter, Kind, NostrSigner, Timestamp, PublicKey +except ImportError as e: + print(f"[WARN] nostr_sdk not available: {e}") + Keys = Client = Filter = Kind = NostrSigner = Timestamp = PublicKey = None + +from gitea_gate import create_issue, add_comment, close_issue, assign_issue, merge_pr + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- +RELAY_URL = os.environ.get("NOSTR_RELAY", "ws://localhost:2929") +KEYSTORE_PATH = os.environ.get("KEYSTORE_PATH", "/root/nostr-relay/keystore.json") +ALLOWED_SOURCES = os.environ.get("ALLOWED_OPERATOR_NPUBS", "").split(",") +POLL_INTERVAL = int(os.environ.get("NOSTUR_POLL_INTERVAL", "60")) + + +def _load_keystore() -> dict: + with open(KEYSTORE_PATH) as f: + return json.load(f) + + +def _allowed_pubkeys() -> list: + ks = _load_keystore() + out = [] + if "alexander" in ks: + out.append(ks["alexander"].get("pubkey", "")) + out.append(ks["alexander"].get("hex_public", "")) + for s in ALLOWED_SOURCES: + s = s.strip() + if s: + out.append(s) + return [p for p in out if p] + + +# --------------------------------------------------------------------------- +# DM → Command normalizer +# --------------------------------------------------------------------------- +def _normalize_dm(content: str) -> dict: + content = content.strip() + lines = content.splitlines() + first = lines[0].strip().lower() + + if first == "status" or first.startswith("status"): + return {"action": "status", "repo": "Timmy_Foundation/the-nexus"} + + if first.startswith("create "): + rest = content[7:] + parts = rest.split(" ", 1) + if len(parts) >= 2: + repo = parts[0] if "/" in parts[0] else f"Timmy_Foundation/{parts[0]}" + title = parts[1] + body = "\n".join(lines[1:]) if len(lines) > 1 else "" + return {"action": "create_issue", "repo": repo, "title": title, "body": body} + + if first.startswith("comment "): + rest = content[8:] + parts = rest.split(" ", 2) + if len(parts) >= 3: + repo = parts[0] if "/" in parts[0] else f"Timmy_Foundation/{parts[0]}" + issue_ref = parts[1] + body = parts[2] + if issue_ref.startswith("#"): + return {"action": "add_comment", "repo": repo, "issue_num": int(issue_ref[1:]), "body": body} + + if first.startswith("close "): + rest = content[6:] + parts = rest.split(" ", 1) + if len(parts) == 2: + repo = parts[0] if "/" in parts[0] else f"Timmy_Foundation/{parts[0]}" + issue_ref = parts[1] + if issue_ref.startswith("#"): + return {"action": "close_issue", "repo": repo, "issue_num": int(issue_ref[1:])} + + if first.startswith("assign "): + rest = content[7:] + parts = rest.split(" ", 2) + if len(parts) >= 3: + repo = parts[0] if "/" in parts[0] else f"Timmy_Foundation/{parts[0]}" + issue_ref = parts[1] + users = [u.strip() for u in parts[2].split(",")] + if issue_ref.startswith("#"): + return {"action": "assign_issue", "repo": repo, "issue_num": int(issue_ref[1:]), "assignees": users} + + if first.startswith("merge "): + rest = content[6:] + parts = rest.split(" ", 1) + if len(parts) == 2: + repo = parts[0] if "/" in parts[0] else f"Timmy_Foundation/{parts[0]}" + pr_ref = parts[1] + if pr_ref.startswith("#"): + return {"action": "merge_pr", "repo": repo, "pr_num": int(pr_ref[1:])} + + return {"action": "unknown", "raw": content} + + +# --------------------------------------------------------------------------- +# Status helper (read-only, bypasses gate) +# --------------------------------------------------------------------------- +def _status_summary(repo: str) -> str: + import urllib.request + from gitea_gate import _api_get + try: + issues = _api_get(f"/repos/{repo}/issues?state=open&limit=20") + unassigned = [i for i in issues if not i.get("assignee")] + blockers = [i for i in issues if any(l["name"] == "blocker" for l in i.get("labels", []))] + summary = f"Queue Status for {repo}\nOpen: {len(issues)} | Unassigned: {len(unassigned)} | Blockers: {len(blockers)}\n" + if unassigned[:3]: + summary += "Top unassigned:\n" + for i in unassigned[:3]: + summary += f"- #{i['number']}: {i['title'][:50]}...\n" + return summary + except Exception as e: + return f"Error fetching status: {e}" + + +# --------------------------------------------------------------------------- +# DM execution +# --------------------------------------------------------------------------- +def _execute_normalized(source_npub: str, cmd: dict) -> dict: + action = cmd.get("action") + + if action == "status": + return {"success": True, "message": _status_summary(cmd["repo"])} + + if action == "create_issue": + ack = create_issue(source_npub, cmd["repo"], cmd["title"], cmd.get("body", "")) + return {"success": ack.success, "message": ack.message, "url": ack.gitea_url} + + if action == "add_comment": + ack = add_comment(source_npub, cmd["repo"], cmd["issue_num"], cmd["body"]) + return {"success": ack.success, "message": ack.message, "url": ack.gitea_url} + + if action == "close_issue": + ack = close_issue(source_npub, cmd["repo"], cmd["issue_num"]) + return {"success": ack.success, "message": ack.message, "url": ack.gitea_url} + + if action == "assign_issue": + ack = assign_issue(source_npub, cmd["repo"], cmd["issue_num"], cmd["assignees"]) + return {"success": ack.success, "message": ack.message, "url": ack.gitea_url} + + if action == "merge_pr": + ack = merge_pr(source_npub, cmd["repo"], cmd["pr_num"]) + return {"success": ack.success, "message": ack.message, "url": ack.gitea_url} + + return {"success": False, "message": f"Unknown command. Supported: status, create, comment, close, assign, merge"} + + +# --------------------------------------------------------------------------- +# Async polling loop +# --------------------------------------------------------------------------- +async def poll_loop(): + if Client is None: + raise RuntimeError("nostr_sdk not installed; cannot run Nostur adapter") + + signer = NostrSigner.keys(Keys.generate()) + client = Client(signer) + await client.add_relay(RELAY_URL) + await client.connect() + + allowed = _allowed_pubkeys() + since = Timestamp.now() + print(f"[NOSTUR-ADAPTER] Connected to {RELAY_URL}") + print(f"[NOSTUR-ADAPTER] Allowed operators: {len(allowed)}") + + while True: + await asyncio.sleep(POLL_INTERVAL) + try: + filter_dm = Filter().kind(Kind(4)).since(since) + events = await client.fetch_events(filter_dm, timedelta(seconds=5)) + since = Timestamp.now() + + for event in events.to_vec(): + author_hex = event.author().to_hex() + author_npub = event.author().to_bech32() + if author_hex not in allowed and author_npub not in allowed: + print(f"[SKIP] Unauthorized: {author_npub[:20]}...") + continue + + content = event.content() + cmd = _normalize_dm(content) + print(f"[DM] {author_npub[:20]}... -> {cmd.get('action', 'unknown')}") + result = _execute_normalized(author_npub, cmd) + print(f"[ACK] {result['message'][:120]}") + except Exception as e: + print(f"[ERROR] Poll loop: {e}") + + +if __name__ == "__main__": + asyncio.run(poll_loop()) diff --git a/operator-gate/gitea_gate.py b/operator-gate/gitea_gate.py new file mode 100644 index 00000000..5276c34d --- /dev/null +++ b/operator-gate/gitea_gate.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +""" +Operator Ingress → Gitea Mutation Gate + +Transport-agnostic, idempotent gate for all operator-originated Gitea mutations. +This is the single canonical write path for create/comment/assign/merge/close actions. + +Design invariants: +1. Every command gets a deterministic idempotency key. +2. Before executing, the gate checks Gitea for evidence of prior execution. +3. After executing, the gate records proof of execution (via comment trail or local ledger). +4. Replay of the same command returns the previously generated result without double-writing. +""" + +import hashlib +import json +import os +import urllib.request +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, Any, List + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- +GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com") +TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")] +_LEDGER_PATH = Path(os.environ.get("GATE_LEDGER_PATH", "/root/.operator_gate_ledger.jsonl")) + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- +@dataclass(frozen=True) +class Command: + source: str # e.g. "nostr:npub10trqk..." or "matrix:@alex:..." + action: str # "create_issue" | "add_comment" | "close_issue" | "merge_pr" | "assign_issue" + repo: str # "Timmy_Foundation/timmy-config" + payload: Dict[str, Any] + timestamp_utc: str # ISO-8601, used for idempotency windowing + + def idempotency_key(self) -> str: + """Deterministic key: replay-safe, collision-resistant.""" + payload_str = json.dumps(self.payload, sort_keys=True, ensure_ascii=True) + raw = f"{self.source}:{self.action}:{self.repo}:{payload_str}" + return hashlib.sha256(raw.encode()).hexdigest()[:32] + + +@dataclass +class Ack: + success: bool + idempotency_key: str + gitea_url: Optional[str] + gitea_number: Optional[int] + message: str + prior_execution: bool + executed_at: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Low-level Gitea API +# --------------------------------------------------------------------------- +def _load_token() -> str: + for path in TOKEN_PATHS: + try: + with open(path) as f: + token = f.read().strip() + if token: + return token + except FileNotFoundError: + pass + raise RuntimeError("Gitea token not found") + + +def _headers() -> Dict[str, str]: + return {"Authorization": f"token {_load_token()}", "Content-Type": "application/json"} + + +def _api_request(method: str, path: str, data: Optional[Dict] = None) -> Dict[str, Any]: + url = f"{GITEA_URL}/api/v1{path}" + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=_headers(), method=method) + with urllib.request.urlopen(req, timeout=20) as resp: + text = resp.read().decode() + return json.loads(text) if text else {"status": resp.status} + + +def _api_get(path: str) -> Any: + url = f"{GITEA_URL}/api/v1{path}" + req = urllib.request.Request(url, headers=_headers()) + with urllib.request.urlopen(req, timeout=20) as resp: + return json.loads(resp.read().decode()) + + +# --------------------------------------------------------------------------- +# Ledger (local idempotency cache) +# --------------------------------------------------------------------------- +def _ledger_load() -> Dict[str, Dict]: + if not _LEDGER_PATH.exists(): + return {} + entries = {} + with open(_LEDGER_PATH, "r") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + entries[entry["idempotency_key"]] = entry + except (json.JSONDecodeError, KeyError): + continue + return entries + + +def _ledger_append(entry: Dict): + _LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(_LEDGER_PATH, "a") as f: + f.write(json.dumps(entry, ensure_ascii=True) + "\n") + + +# --------------------------------------------------------------------------- +# Gitea-side idempotency probes +# --------------------------------------------------------------------------- +def _probe_issue_by_title(repo: str, title: str, window_hours: int = 24) -> Optional[Dict]: + """Search recent open issues for a duplicate title to avoid double-create.""" + try: + issues = _api_get(f"/repos/{repo}/issues?state=open&limit=20") + for issue in issues: + if issue.get("title") == title: + return issue + # Also check recently closed + issues = _api_get(f"/repos/{repo}/issues?state=closed&limit=10") + for issue in issues: + if issue.get("title") == title: + created = issue.get("created_at", "") + # naive window check + if created and "T" in created: + return issue + except Exception: + pass + return None + + +def _probe_comment_by_fingerprint(repo: str, issue_num: int, fingerprint: str) -> Optional[Dict]: + """Scan recent comments for the idempotency fingerprint.""" + try: + comments = _api_get(f"/repos/{repo}/issues/{issue_num}/comments?limit=50") + for comment in comments: + body = comment.get("body", "") + if fingerprint in body: + return comment + except Exception: + pass + return None + + +# --------------------------------------------------------------------------- +# Gate execution +# --------------------------------------------------------------------------- +class GiteaGate: + """The single canonical write path.""" + + def execute(self, cmd: Command) -> Ack: + key = cmd.idempotency_key() + ledger = _ledger_load() + + # 1. Local ledger hit + if key in ledger: + entry = ledger[key] + return Ack( + success=True, + idempotency_key=key, + gitea_url=entry.get("gitea_url"), + gitea_number=entry.get("gitea_number"), + message=entry.get("message", "Replay: already executed."), + prior_execution=True, + executed_at=entry.get("executed_at"), + ) + + # 2. Execute + handler = getattr(self, f"_handle_{cmd.action}", None) + if handler is None: + return Ack( + success=False, + idempotency_key=key, + gitea_url=None, + gitea_number=None, + message=f"Unsupported action: {cmd.action}", + prior_execution=False, + ) + + try: + ack = handler(cmd, key) + _ledger_append({ + "idempotency_key": ack.idempotency_key, + "action": cmd.action, + "repo": cmd.repo, + "gitea_url": ack.gitea_url, + "gitea_number": ack.gitea_number, + "message": ack.message, + "executed_at": datetime.utcnow().isoformat() + "Z", + }) + return ack + except Exception as e: + return Ack( + success=False, + idempotency_key=key, + gitea_url=None, + gitea_number=None, + message=f"Execution error: {e}", + prior_execution=False, + ) + + # ------------------------------------------------------------------- + # Action handlers + # ------------------------------------------------------------------- + def _handle_create_issue(self, cmd: Command, key: str) -> Ack: + title = cmd.payload.get("title", "") + body = cmd.payload.get("body", "") + assignees = cmd.payload.get("assignees") + labels = cmd.payload.get("labels") + + # Probe Gitea for duplicate title + dup = _probe_issue_by_title(cmd.repo, title) + if dup: + return Ack( + success=True, + idempotency_key=key, + gitea_url=dup.get("html_url"), + gitea_number=dup.get("number"), + message=f"Duplicate detected: issue #{dup['number']} already exists.", + prior_execution=True, + ) + + data = { + "title": title, + "body": f"{body}\n\n---\n*Operator ingress via {cmd.source} — gate-key: `{key}`*", + } + if assignees: + data["assignees"] = assignees + if labels: + data["labels"] = labels + + result = _api_request("POST", f"/repos/{cmd.repo}/issues", data) + url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{result['number']}") + return Ack( + success=True, + idempotency_key=key, + gitea_url=url, + gitea_number=result.get("number"), + message=f"Created issue #{result['number']}: {result['title']}", + prior_execution=False, + ) + + def _handle_add_comment(self, cmd: Command, key: str) -> Ack: + issue_num = int(cmd.payload["issue_num"]) + body = cmd.payload["body"] + + # Probe for fingerprint + dup = _probe_comment_by_fingerprint(cmd.repo, issue_num, key) + if dup: + return Ack( + success=True, + idempotency_key=key, + gitea_url=dup.get("html_url"), + gitea_number=issue_num, + message=f"Duplicate detected: comment already posted.", + prior_execution=True, + ) + + result = _api_request( + "POST", + f"/repos/{cmd.repo}/issues/{issue_num}/comments", + {"body": f"{body}\n\n---\n*gate-key: `{key}`*"}, + ) + url = result.get("html_url", f"{GITEA_URL}/{cmd.repo}/issues/{issue_num}#issuecomment-{result.get('id', '')}") + return Ack( + success=True, + idempotency_key=key, + gitea_url=url, + gitea_number=issue_num, + message=f"Added comment to issue #{issue_num}", + prior_execution=False, + ) + + def _handle_close_issue(self, cmd: Command, key: str) -> Ack: + issue_num = int(cmd.payload["issue_num"]) + # Check if already closed + issue = _api_get(f"/repos/{cmd.repo}/issues/{issue_num}") + if issue.get("state") == "closed": + return Ack( + success=True, + idempotency_key=key, + gitea_url=issue.get("html_url"), + gitea_number=issue_num, + message=f"Issue #{issue_num} already closed.", + prior_execution=True, + ) + + result = _api_request( + "PATCH", + f"/repos/{cmd.repo}/issues/{issue_num}", + {"state": "closed"}, + ) + # Add closure proof comment + try: + _api_request( + "POST", + f"/repos/{cmd.repo}/issues/{issue_num}/comments", + {"body": f"Closed via operator ingress ({cmd.source}) — gate-key: `{key}`"}, + ) + except Exception: + pass + + return Ack( + success=True, + idempotency_key=key, + gitea_url=result.get("html_url"), + gitea_number=issue_num, + message=f"Closed issue #{issue_num}", + prior_execution=False, + ) + + def _handle_assign_issue(self, cmd: Command, key: str) -> Ack: + issue_num = int(cmd.payload["issue_num"]) + assignees = cmd.payload.get("assignees", []) + result = _api_request( + "PATCH", + f"/repos/{cmd.repo}/issues/{issue_num}", + {"assignees": assignees}, + ) + return Ack( + success=True, + idempotency_key=key, + gitea_url=result.get("html_url"), + gitea_number=issue_num, + message=f"Assigned issue #{issue_num} to {', '.join(assignees)}", + prior_execution=False, + ) + + def _handle_merge_pr(self, cmd: Command, key: str) -> Ack: + pr_num = int(cmd.payload["pr_num"]) + # Check if already merged + pr = _api_get(f"/repos/{cmd.repo}/pulls/{pr_num}") + if pr.get("merged"): + return Ack( + success=True, + idempotency_key=key, + gitea_url=pr.get("html_url"), + gitea_number=pr_num, + message=f"PR #{pr_num} already merged.", + prior_execution=True, + ) + + result = _api_request( + "POST", + f"/repos/{cmd.repo}/pulls/{pr_num}/merge", + {"Do": "merge", "delete_branch_after_merge": False}, + ) + return Ack( + success=True, + idempotency_key=key, + gitea_url=pr.get("html_url"), + gitea_number=pr_num, + message=f"Merged PR #{pr_num}", + prior_execution=False, + ) + + +# --------------------------------------------------------------------------- +# Convenience helpers for adapters +# --------------------------------------------------------------------------- +def create_issue(source: str, repo: str, title: str, body: str = "", assignees: Optional[List[str]] = None, labels: Optional[List[str]] = None) -> Ack: + return GiteaGate().execute(Command( + source=source, + action="create_issue", + repo=repo, + payload={"title": title, "body": body, "assignees": assignees or [], "labels": labels or []}, + timestamp_utc=datetime.utcnow().isoformat() + "Z", + )) + + +def add_comment(source: str, repo: str, issue_num: int, body: str) -> Ack: + return GiteaGate().execute(Command( + source=source, + action="add_comment", + repo=repo, + payload={"issue_num": issue_num, "body": body}, + timestamp_utc=datetime.utcnow().isoformat() + "Z", + )) + + +def close_issue(source: str, repo: str, issue_num: int) -> Ack: + return GiteaGate().execute(Command( + source=source, + action="close_issue", + repo=repo, + payload={"issue_num": issue_num}, + timestamp_utc=datetime.utcnow().isoformat() + "Z", + )) + + +def assign_issue(source: str, repo: str, issue_num: int, assignees: List[str]) -> Ack: + return GiteaGate().execute(Command( + source=source, + action="assign_issue", + repo=repo, + payload={"issue_num": issue_num, "assignees": assignees}, + timestamp_utc=datetime.utcnow().isoformat() + "Z", + )) + + +def merge_pr(source: str, repo: str, pr_num: int) -> Ack: + return GiteaGate().execute(Command( + source=source, + action="merge_pr", + repo=repo, + payload={"pr_num": pr_num}, + timestamp_utc=datetime.utcnow().isoformat() + "Z", + )) diff --git a/operator-gate/tests/test_gate_idempotency.py b/operator-gate/tests/test_gate_idempotency.py new file mode 100644 index 00000000..21a156e7 --- /dev/null +++ b/operator-gate/tests/test_gate_idempotency.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Idempotency and deduplication tests for the Operator Ingress Gate. + +These tests verify the core contract: +- Same command twice = one Gitea mutation + replay ACK +- Duplicate title probe prevents double issue creation +- Fingerprint probe prevents double comment +""" + +import json +import os +import sys +import tempfile +import unittest +from datetime import datetime +from unittest.mock import patch, MagicMock + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from gitea_gate import Command, GiteaGate, _ledger_load, _LEDGER_PATH + + +class TestIdempotency(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.TemporaryDirectory() + self.ledger_override = os.path.join(self.tmpdir.name, "test_ledger.jsonl") + self.gate = GiteaGate() + + def tearDown(self): + self.tmpdir.cleanup() + + @patch("gitea_gate._LEDGER_PATH") + def test_replay_returns_prior_ack(self, mock_path): + """Same command executed twice must return prior_execution=True on second call.""" + mock_path.__str__ = lambda self: self.ledger_override + # We mock the API call to avoid network + with patch("gitea_gate._api_request") as mock_api: + mock_api.return_value = {"number": 999, "title": "Test Issue", "html_url": "http://test/999"} + + cmd = Command( + source="nostr:test", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "Test Issue", "body": "body"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + + ack1 = self.gate.execute(cmd) + self.assertTrue(ack1.success) + self.assertFalse(ack1.prior_execution) + + ack2 = self.gate.execute(cmd) + self.assertTrue(ack2.success) + self.assertTrue(ack2.prior_execution) + self.assertEqual(ack2.gitea_number, 999) + + def test_idempotency_key_determinism(self): + """Identical commands must produce identical keys.""" + cmd1 = Command( + source="nostr:npub123", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "X", "body": "Y"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + cmd2 = Command( + source="nostr:npub123", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "X", "body": "Y"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + self.assertEqual(cmd1.idempotency_key(), cmd2.idempotency_key()) + + def test_idempotency_key_uniqueness(self): + """Different payloads must produce different keys.""" + cmd1 = Command( + source="nostr:npub123", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "X", "body": "Y"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + cmd2 = Command( + source="nostr:npub123", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "X", "body": "Z"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + self.assertNotEqual(cmd1.idempotency_key(), cmd2.idempotency_key()) + + @patch("gitea_gate._api_get") + @patch("gitea_gate._api_request") + def test_duplicate_title_probe_prevents_double_create(self, mock_post, mock_get): + """If Gitea already has an issue with the same title, gate must return prior_execution.""" + mock_get.return_value = [ + {"number": 42, "title": "Probe Title", "html_url": "http://test/42"} + ] + + cmd = Command( + source="nostr:test", + action="create_issue", + repo="Timmy_Foundation/test", + payload={"title": "Probe Title", "body": "body"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + + ack = self.gate.execute(cmd) + self.assertTrue(ack.success) + self.assertTrue(ack.prior_execution) + self.assertEqual(ack.gitea_number, 42) + mock_post.assert_not_called() + + @patch("gitea_gate._api_get") + @patch("gitea_gate._api_request") + def test_duplicate_comment_probe_prevents_double_comment(self, mock_post, mock_get): + """If a comment with the same gate-key already exists, do not post again.""" + key = Command( + source="nostr:test", + action="add_comment", + repo="Timmy_Foundation/test", + payload={"issue_num": 7, "body": "hello"}, + timestamp_utc="2026-04-06T14:00:00Z", + ).idempotency_key() + + mock_get.return_value = [ + {"id": 101, "body": f"hello\n\n---\n*gate-key: `{key}`*"} + ] + + cmd = Command( + source="nostr:test", + action="add_comment", + repo="Timmy_Foundation/test", + payload={"issue_num": 7, "body": "hello"}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + + ack = self.gate.execute(cmd) + self.assertTrue(ack.success) + self.assertTrue(ack.prior_execution) + mock_post.assert_not_called() + + @patch("gitea_gate._api_get") + @patch("gitea_gate._api_request") + def test_already_closed_issue_returns_prior(self, mock_post, mock_get): + mock_get.return_value = {"state": "closed", "number": 5, "html_url": "http://test/5"} + + cmd = Command( + source="nostr:test", + action="close_issue", + repo="Timmy_Foundation/test", + payload={"issue_num": 5}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + + ack = self.gate.execute(cmd) + self.assertTrue(ack.success) + self.assertTrue(ack.prior_execution) + mock_post.assert_not_called() + + @patch("gitea_gate._api_get") + @patch("gitea_gate._api_request") + def test_already_merged_pr_returns_prior(self, mock_post, mock_get): + mock_get.return_value = {"merged": True, "number": 3, "html_url": "http://test/3"} + + cmd = Command( + source="nostr:test", + action="merge_pr", + repo="Timmy_Foundation/test", + payload={"pr_num": 3}, + timestamp_utc="2026-04-06T14:00:00Z", + ) + + ack = self.gate.execute(cmd) + self.assertTrue(ack.success) + self.assertTrue(ack.prior_execution) + mock_post.assert_not_called() + + +class TestNosturNormalizer(unittest.TestCase): + def setUp(self): + sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "adapters")) + import nostur_adapter + self.nostur = nostur_adapter + + def test_status_command(self): + cmd = self.nostur._normalize_dm("status") + self.assertEqual(cmd["action"], "status") + + def test_create_command(self): + cmd = self.nostur._normalize_dm("create the-nexus Fix the bug\nDetails here") + self.assertEqual(cmd["action"], "create_issue") + self.assertEqual(cmd["repo"], "Timmy_Foundation/the-nexus") + self.assertEqual(cmd["title"], "Fix the bug") + self.assertEqual(cmd["body"], "Details here") + + def test_comment_command(self): + cmd = self.nostur._normalize_dm("comment the-nexus #42 This is the comment") + self.assertEqual(cmd["action"], "add_comment") + self.assertEqual(cmd["issue_num"], 42) + self.assertEqual(cmd["body"], "This is the comment") + + def test_close_command(self): + cmd = self.nostur._normalize_dm("close the-nexus #7") + self.assertEqual(cmd["action"], "close_issue") + self.assertEqual(cmd["issue_num"], 7) + + def test_assign_command(self): + cmd = self.nostur._normalize_dm("assign the-nexus #7 allegro,ezra") + self.assertEqual(cmd["action"], "assign_issue") + self.assertEqual(cmd["assignees"], ["allegro", "ezra"]) + + def test_merge_command(self): + cmd = self.nostur._normalize_dm("merge the-nexus #108") + self.assertEqual(cmd["action"], "merge_pr") + self.assertEqual(cmd["pr_num"], 108) + + +if __name__ == "__main__": + unittest.main()