diff --git a/bin/gitea_safe_push.py b/bin/gitea_safe_push.py new file mode 100644 index 00000000..1d321ecb --- /dev/null +++ b/bin/gitea_safe_push.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +""" +gitea_safe_push.py — Safely push files to Gitea via API with branch existence checks. + +Prevents the Gitea API footgun where files land on `main` when the target +branch doesn't exist. Always verifies branch existence before file operations. + +Usage: + python3 bin/gitea_safe_push.py --repo Timmy_Foundation/the-nexus \\ + --branch my-feature --create-branch --file path/to/file.py --message "add file" + + # Or use as a library: + from bin.gitea_safe_push import GiteaSafePush + push = GiteaSafePush("https://forge.example.com", "token123") + push.ensure_branch("Timmy_Foundation/the-nexus", "my-branch", base="main") + push.push_file("Timmy_Foundation/the-nexus", "my-branch", "file.py", "content", "commit msg") +""" +import argparse +import base64 +import json +import os +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Optional + + +class GiteaAPIError(Exception): + """Gitea API error with status code and response body.""" + def __init__(self, status: int, message: str, body: str = ""): + self.status = status + self.body = body + super().__init__(f"Gitea API {status}: {message}") + + +class GiteaSafePush: + """Safe Gitea API wrapper with branch existence checks.""" + + def __init__(self, base_url: str, token: str): + self.base_url = base_url.rstrip("/") + self.token = token + self._headers = { + "Authorization": f"token {token}", + "Content-Type": "application/json", + } + + def _api(self, method: str, path: str, data: dict = None, timeout: int = 30) -> dict: + """Make a Gitea API call.""" + url = f"{self.base_url}/api/v1{path}" + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=self._headers, method=method) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read()) if resp.status != 204 else {} + except urllib.error.HTTPError as e: + resp_body = e.read().decode()[:500] if hasattr(e, 'read') else "" + raise GiteaAPIError(e.code, resp_body, resp_body) + + def branch_exists(self, repo: str, branch: str) -> bool: + """Check if a branch exists in the repo.""" + try: + self._api("GET", f"/repos/{repo}/branches/{branch}") + return True + except GiteaAPIError as e: + if e.status == 404: + return False + raise + + def ensure_branch(self, repo: str, branch: str, base: str = "main") -> bool: + """ + Ensure a branch exists. Creates it from base if it doesn't. + + Returns: + True if branch exists or was created, False if creation failed. + """ + if self.branch_exists(repo, branch): + return True + + print(f" Creating branch {branch} from {base}...") + try: + self._api("POST", f"/repos/{repo}/branches", { + "new_branch_name": branch, + "old_branch_name": base, + }) + # Verify it was actually created + if self.branch_exists(repo, branch): + print(f" Branch {branch} created.") + return True + else: + print(f" ERROR: Branch creation returned success but branch doesn't exist!") + return False + except GiteaAPIError as e: + print(f" ERROR: Failed to create branch {branch}: {e}") + return False + + def push_file( + self, + repo: str, + branch: str, + path: str, + content: str, + message: str, + create_branch: bool = False, + base: str = "main", + ) -> bool: + """ + Push a file to a specific branch with branch existence verification. + + This is the SAFE version — it never silently falls back to main. + + Args: + repo: e.g. "Timmy_Foundation/the-nexus" + branch: target branch name + path: file path in repo + content: file content (text) + message: commit message + create_branch: if True, create branch if it doesn't exist + base: base branch for branch creation + + Returns: + True if successful, False if failed. + """ + # Step 1: Ensure branch exists + if not self.branch_exists(repo, branch): + if create_branch: + if not self.ensure_branch(repo, branch, base): + print(f" FAIL: Cannot create branch {branch}. Aborting file push.") + return False + else: + print(f" FAIL: Branch {branch} does not exist. Use --create-branch or ensure_branch() first.") + return False + + # Step 2: Get existing file SHA if it exists on the target branch + sha = None + try: + existing = self._api("GET", f"/repos/{repo}/contents/{path}?ref={branch}") + sha = existing.get("sha") + except GiteaAPIError as e: + if e.status != 404: + raise + + # Step 3: Create or update the file + b64 = base64.b64encode(content.encode()).decode() + payload = { + "content": b64, + "message": message, + "branch_name": branch, + } + if sha: + payload["sha"] = sha + method = "PUT" + action = "Updated" + else: + method = "POST" + action = "Created" + + try: + self._api(method, f"/repos/{repo}/contents/{path}", payload) + print(f" {action} {path} on {branch}") + return True + except GiteaAPIError as e: + print(f" FAIL: Could not {action.lower()} {path} on {branch}: {e}") + return False + + def push_files( + self, + repo: str, + branch: str, + files: dict[str, str], + message: str, + create_branch: bool = True, + base: str = "main", + ) -> dict: + """ + Push multiple files to a branch. + + Args: + repo: e.g. "Timmy_Foundation/the-nexus" + branch: target branch + files: dict of {path: content} + message: commit message + create_branch: create branch if needed + base: base branch + + Returns: + dict of {path: success_bool} + """ + results = {} + + # Ensure branch exists ONCE before any file operations + if not self.ensure_branch(repo, branch, base): + print(f" FAIL: Cannot ensure branch {branch}. No files pushed.") + return {path: False for path in files} + + for path, content in files.items(): + results[path] = self.push_file( + repo, branch, path, content, message, + create_branch=False, # already ensured above + ) + + return results + + +def main(): + parser = argparse.ArgumentParser(description="Safely push files to Gitea with branch checks") + parser.add_argument("--repo", required=True, help="Repo (e.g. Timmy_Foundation/the-nexus)") + parser.add_argument("--branch", required=True, help="Target branch name") + parser.add_argument("--base", default="main", help="Base branch for creation (default: main)") + parser.add_argument("--create-branch", action="store_true", help="Create branch if it doesn't exist") + parser.add_argument("--file", action="append", help="File to push (path:content or @filepath)") + parser.add_argument("--message", default="Automated commit", help="Commit message") + parser.add_argument("--token", default=None, help="Gitea token (or reads from ~/.config/gitea/token)") + parser.add_argument("--url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL") + parser.add_argument("--check-branch", action="store_true", help="Only check if branch exists") + + args = parser.parse_args() + + # Get token + token = args.token + if not token: + token_path = Path.home() / ".config" / "gitea" / "token" + if token_path.exists(): + token = token_path.read_text().strip() + else: + print("ERROR: No token provided and ~/.config/gitea/token not found", file=sys.stderr) + sys.exit(1) + + push = GiteaSafePush(args.url, token) + + # Branch check mode + if args.check_branch: + exists = push.branch_exists(args.repo, args.branch) + print(f"Branch {args.branch}: {'EXISTS' if exists else 'NOT FOUND'}") + sys.exit(0 if exists else 1) + + # File push mode + if not args.file: + print("ERROR: No files specified. Use --file path (reads from stdin) or --file @path", file=sys.stderr) + sys.exit(1) + + files = {} + for f in args.file: + if f.startswith("@"): + # Read from file + filepath = f[1:] + with open(filepath) as fh: + files[filepath] = fh.read() + elif ":" in f: + # path:content format + path, content = f.split(":", 1) + files[path] = content + else: + # Read file from disk + with open(f) as fh: + files[f] = fh.read() + + results = push.push_files( + args.repo, args.branch, files, args.message, + create_branch=args.create_branch, base=args.base, + ) + + success = all(results.values()) + print(f"\n{'All' if success else 'Some'} files pushed. Results: {results}") + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/tests/test_gitea_safe_push.py b/tests/test_gitea_safe_push.py new file mode 100644 index 00000000..e335b819 --- /dev/null +++ b/tests/test_gitea_safe_push.py @@ -0,0 +1,124 @@ +"""Tests for gitea_safe_push — Branch existence checks before file operations.""" +import json +from unittest.mock import MagicMock, patch, call +from pathlib import Path + +import pytest +import sys +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from bin.gitea_safe_push import GiteaSafePush, GiteaAPIError + + +class TestGiteaAPIError: + def test_creation(self): + e = GiteaAPIError(404, "not found", '{"message":"not found"}') + assert e.status == 404 + assert "404" in str(e) + + def test_is_exception(self): + e = GiteaAPIError(500, "internal") + assert isinstance(e, Exception) + + +class TestBranchExists: + def test_branch_exists(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "_api", return_value={"name": "main"}): + assert push.branch_exists("owner/repo", "main") is True + + def test_branch_not_exists(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "_api", side_effect=GiteaAPIError(404, "not found")): + assert push.branch_exists("owner/repo", "nonexistent") is False + + def test_api_error_propagates(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "_api", side_effect=GiteaAPIError(500, "server error")): + with pytest.raises(GiteaAPIError): + push.branch_exists("owner/repo", "main") + + +class TestEnsureBranch: + def test_already_exists(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", return_value=True): + assert push.ensure_branch("owner/repo", "my-branch") is True + + def test_creates_branch(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", side_effect=[False, True]): + with patch.object(push, "_api", return_value={"name": "my-branch"}): + assert push.ensure_branch("owner/repo", "my-branch", base="main") is True + + def test_creation_fails(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", return_value=False): + with patch.object(push, "_api", side_effect=GiteaAPIError(422, "invalid")): + assert push.ensure_branch("owner/repo", "bad-branch") is False + + +class TestPushFile: + def test_rejects_missing_branch(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", return_value=False): + result = push.push_file("owner/repo", "missing", "file.py", "content", "msg") + assert result is False + + def test_creates_new_file(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", return_value=True): + with patch.object(push, "_api", side_effect=[ + GiteaAPIError(404, "not found"), # GET existing file + {}, # POST new file + ]): + result = push.push_file("owner/repo", "branch", "new.py", "content", "msg") + assert result is True + + def test_updates_existing_file(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "branch_exists", return_value=True): + with patch.object(push, "_api", side_effect=[ + {"sha": "abc123"}, # GET existing file + {}, # PUT update + ]): + result = push.push_file("owner/repo", "branch", "existing.py", "new content", "msg") + assert result is True + + def test_create_branch_when_missing(self): + push = GiteaSafePush("https://forge.example.com", "token123") + # Mock branch_exists: first call returns False (doesn't exist), + # second call (inside ensure_branch) returns True (created externally) + exists_calls = [False, True] + exists_idx = [0] + def mock_exists(repo, branch): + idx = min(exists_idx[0], len(exists_calls) - 1) + exists_idx[0] += 1 + return exists_calls[idx] + with patch.object(push, "branch_exists", side_effect=mock_exists): + with patch.object(push, "_api") as mock_api: + mock_api.side_effect = [ + GiteaAPIError(404, "not found"), # GET existing file (not found) + {"content": {"path": "f.py"}}, # POST new file + ] + result = push.push_file("owner/repo", "new-branch", "f.py", "c", "m", create_branch=True) + assert result is True + + +class TestPushFiles: + def test_push_multiple_files(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "ensure_branch", return_value=True): + with patch.object(push, "push_file", return_value=True): + results = push.push_files("owner/repo", "branch", { + "a.py": "content a", + "b.py": "content b", + }, "message") + assert all(results.values()) + assert len(results) == 2 + + def test_branch_creation_fails_aborts_all(self): + push = GiteaSafePush("https://forge.example.com", "token123") + with patch.object(push, "ensure_branch", return_value=False): + results = push.push_files("owner/repo", "bad", {"a.py": "x"}, "msg") + assert all(v is False for v in results.values())