Compare commits
1 Commits
mimo/code/
...
fix/1441-b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7132fabb34 |
269
bin/gitea_safe_push.py
Normal file
269
bin/gitea_safe_push.py
Normal file
@@ -0,0 +1,269 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
gitea_safe_push.py — Safely push files to Gitea via API with branch existence checks.
|
||||||
|
|
||||||
|
Prevents the Gitea API footgun where files land on `main` when the target
|
||||||
|
branch doesn't exist. Always verifies branch existence before file operations.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 bin/gitea_safe_push.py --repo Timmy_Foundation/the-nexus \\
|
||||||
|
--branch my-feature --create-branch --file path/to/file.py --message "add file"
|
||||||
|
|
||||||
|
# Or use as a library:
|
||||||
|
from bin.gitea_safe_push import GiteaSafePush
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
push.ensure_branch("Timmy_Foundation/the-nexus", "my-branch", base="main")
|
||||||
|
push.push_file("Timmy_Foundation/the-nexus", "my-branch", "file.py", "content", "commit msg")
|
||||||
|
"""
|
||||||
|
import argparse
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class GiteaAPIError(Exception):
|
||||||
|
"""Gitea API error with status code and response body."""
|
||||||
|
def __init__(self, status: int, message: str, body: str = ""):
|
||||||
|
self.status = status
|
||||||
|
self.body = body
|
||||||
|
super().__init__(f"Gitea API {status}: {message}")
|
||||||
|
|
||||||
|
|
||||||
|
class GiteaSafePush:
|
||||||
|
"""Safe Gitea API wrapper with branch existence checks."""
|
||||||
|
|
||||||
|
def __init__(self, base_url: str, token: str):
|
||||||
|
self.base_url = base_url.rstrip("/")
|
||||||
|
self.token = token
|
||||||
|
self._headers = {
|
||||||
|
"Authorization": f"token {token}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
def _api(self, method: str, path: str, data: dict = None, timeout: int = 30) -> dict:
|
||||||
|
"""Make a Gitea API call."""
|
||||||
|
url = f"{self.base_url}/api/v1{path}"
|
||||||
|
body = json.dumps(data).encode() if data else None
|
||||||
|
req = urllib.request.Request(url, data=body, headers=self._headers, method=method)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||||
|
return json.loads(resp.read()) if resp.status != 204 else {}
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
resp_body = e.read().decode()[:500] if hasattr(e, 'read') else ""
|
||||||
|
raise GiteaAPIError(e.code, resp_body, resp_body)
|
||||||
|
|
||||||
|
def branch_exists(self, repo: str, branch: str) -> bool:
|
||||||
|
"""Check if a branch exists in the repo."""
|
||||||
|
try:
|
||||||
|
self._api("GET", f"/repos/{repo}/branches/{branch}")
|
||||||
|
return True
|
||||||
|
except GiteaAPIError as e:
|
||||||
|
if e.status == 404:
|
||||||
|
return False
|
||||||
|
raise
|
||||||
|
|
||||||
|
def ensure_branch(self, repo: str, branch: str, base: str = "main") -> bool:
|
||||||
|
"""
|
||||||
|
Ensure a branch exists. Creates it from base if it doesn't.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if branch exists or was created, False if creation failed.
|
||||||
|
"""
|
||||||
|
if self.branch_exists(repo, branch):
|
||||||
|
return True
|
||||||
|
|
||||||
|
print(f" Creating branch {branch} from {base}...")
|
||||||
|
try:
|
||||||
|
self._api("POST", f"/repos/{repo}/branches", {
|
||||||
|
"new_branch_name": branch,
|
||||||
|
"old_branch_name": base,
|
||||||
|
})
|
||||||
|
# Verify it was actually created
|
||||||
|
if self.branch_exists(repo, branch):
|
||||||
|
print(f" Branch {branch} created.")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f" ERROR: Branch creation returned success but branch doesn't exist!")
|
||||||
|
return False
|
||||||
|
except GiteaAPIError as e:
|
||||||
|
print(f" ERROR: Failed to create branch {branch}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def push_file(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
branch: str,
|
||||||
|
path: str,
|
||||||
|
content: str,
|
||||||
|
message: str,
|
||||||
|
create_branch: bool = False,
|
||||||
|
base: str = "main",
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Push a file to a specific branch with branch existence verification.
|
||||||
|
|
||||||
|
This is the SAFE version — it never silently falls back to main.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
repo: e.g. "Timmy_Foundation/the-nexus"
|
||||||
|
branch: target branch name
|
||||||
|
path: file path in repo
|
||||||
|
content: file content (text)
|
||||||
|
message: commit message
|
||||||
|
create_branch: if True, create branch if it doesn't exist
|
||||||
|
base: base branch for branch creation
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if successful, False if failed.
|
||||||
|
"""
|
||||||
|
# Step 1: Ensure branch exists
|
||||||
|
if not self.branch_exists(repo, branch):
|
||||||
|
if create_branch:
|
||||||
|
if not self.ensure_branch(repo, branch, base):
|
||||||
|
print(f" FAIL: Cannot create branch {branch}. Aborting file push.")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
print(f" FAIL: Branch {branch} does not exist. Use --create-branch or ensure_branch() first.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Step 2: Get existing file SHA if it exists on the target branch
|
||||||
|
sha = None
|
||||||
|
try:
|
||||||
|
existing = self._api("GET", f"/repos/{repo}/contents/{path}?ref={branch}")
|
||||||
|
sha = existing.get("sha")
|
||||||
|
except GiteaAPIError as e:
|
||||||
|
if e.status != 404:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Step 3: Create or update the file
|
||||||
|
b64 = base64.b64encode(content.encode()).decode()
|
||||||
|
payload = {
|
||||||
|
"content": b64,
|
||||||
|
"message": message,
|
||||||
|
"branch_name": branch,
|
||||||
|
}
|
||||||
|
if sha:
|
||||||
|
payload["sha"] = sha
|
||||||
|
method = "PUT"
|
||||||
|
action = "Updated"
|
||||||
|
else:
|
||||||
|
method = "POST"
|
||||||
|
action = "Created"
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._api(method, f"/repos/{repo}/contents/{path}", payload)
|
||||||
|
print(f" {action} {path} on {branch}")
|
||||||
|
return True
|
||||||
|
except GiteaAPIError as e:
|
||||||
|
print(f" FAIL: Could not {action.lower()} {path} on {branch}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def push_files(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
branch: str,
|
||||||
|
files: dict[str, str],
|
||||||
|
message: str,
|
||||||
|
create_branch: bool = True,
|
||||||
|
base: str = "main",
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
Push multiple files to a branch.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
repo: e.g. "Timmy_Foundation/the-nexus"
|
||||||
|
branch: target branch
|
||||||
|
files: dict of {path: content}
|
||||||
|
message: commit message
|
||||||
|
create_branch: create branch if needed
|
||||||
|
base: base branch
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict of {path: success_bool}
|
||||||
|
"""
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
# Ensure branch exists ONCE before any file operations
|
||||||
|
if not self.ensure_branch(repo, branch, base):
|
||||||
|
print(f" FAIL: Cannot ensure branch {branch}. No files pushed.")
|
||||||
|
return {path: False for path in files}
|
||||||
|
|
||||||
|
for path, content in files.items():
|
||||||
|
results[path] = self.push_file(
|
||||||
|
repo, branch, path, content, message,
|
||||||
|
create_branch=False, # already ensured above
|
||||||
|
)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Safely push files to Gitea with branch checks")
|
||||||
|
parser.add_argument("--repo", required=True, help="Repo (e.g. Timmy_Foundation/the-nexus)")
|
||||||
|
parser.add_argument("--branch", required=True, help="Target branch name")
|
||||||
|
parser.add_argument("--base", default="main", help="Base branch for creation (default: main)")
|
||||||
|
parser.add_argument("--create-branch", action="store_true", help="Create branch if it doesn't exist")
|
||||||
|
parser.add_argument("--file", action="append", help="File to push (path:content or @filepath)")
|
||||||
|
parser.add_argument("--message", default="Automated commit", help="Commit message")
|
||||||
|
parser.add_argument("--token", default=None, help="Gitea token (or reads from ~/.config/gitea/token)")
|
||||||
|
parser.add_argument("--url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL")
|
||||||
|
parser.add_argument("--check-branch", action="store_true", help="Only check if branch exists")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Get token
|
||||||
|
token = args.token
|
||||||
|
if not token:
|
||||||
|
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||||
|
if token_path.exists():
|
||||||
|
token = token_path.read_text().strip()
|
||||||
|
else:
|
||||||
|
print("ERROR: No token provided and ~/.config/gitea/token not found", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
push = GiteaSafePush(args.url, token)
|
||||||
|
|
||||||
|
# Branch check mode
|
||||||
|
if args.check_branch:
|
||||||
|
exists = push.branch_exists(args.repo, args.branch)
|
||||||
|
print(f"Branch {args.branch}: {'EXISTS' if exists else 'NOT FOUND'}")
|
||||||
|
sys.exit(0 if exists else 1)
|
||||||
|
|
||||||
|
# File push mode
|
||||||
|
if not args.file:
|
||||||
|
print("ERROR: No files specified. Use --file path (reads from stdin) or --file @path", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
files = {}
|
||||||
|
for f in args.file:
|
||||||
|
if f.startswith("@"):
|
||||||
|
# Read from file
|
||||||
|
filepath = f[1:]
|
||||||
|
with open(filepath) as fh:
|
||||||
|
files[filepath] = fh.read()
|
||||||
|
elif ":" in f:
|
||||||
|
# path:content format
|
||||||
|
path, content = f.split(":", 1)
|
||||||
|
files[path] = content
|
||||||
|
else:
|
||||||
|
# Read file from disk
|
||||||
|
with open(f) as fh:
|
||||||
|
files[f] = fh.read()
|
||||||
|
|
||||||
|
results = push.push_files(
|
||||||
|
args.repo, args.branch, files, args.message,
|
||||||
|
create_branch=args.create_branch, base=args.base,
|
||||||
|
)
|
||||||
|
|
||||||
|
success = all(results.values())
|
||||||
|
print(f"\n{'All' if success else 'Some'} files pushed. Results: {results}")
|
||||||
|
sys.exit(0 if success else 1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
124
tests/test_gitea_safe_push.py
Normal file
124
tests/test_gitea_safe_push.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
"""Tests for gitea_safe_push — Branch existence checks before file operations."""
|
||||||
|
import json
|
||||||
|
from unittest.mock import MagicMock, patch, call
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import sys
|
||||||
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||||
|
|
||||||
|
from bin.gitea_safe_push import GiteaSafePush, GiteaAPIError
|
||||||
|
|
||||||
|
|
||||||
|
class TestGiteaAPIError:
|
||||||
|
def test_creation(self):
|
||||||
|
e = GiteaAPIError(404, "not found", '{"message":"not found"}')
|
||||||
|
assert e.status == 404
|
||||||
|
assert "404" in str(e)
|
||||||
|
|
||||||
|
def test_is_exception(self):
|
||||||
|
e = GiteaAPIError(500, "internal")
|
||||||
|
assert isinstance(e, Exception)
|
||||||
|
|
||||||
|
|
||||||
|
class TestBranchExists:
|
||||||
|
def test_branch_exists(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "_api", return_value={"name": "main"}):
|
||||||
|
assert push.branch_exists("owner/repo", "main") is True
|
||||||
|
|
||||||
|
def test_branch_not_exists(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "_api", side_effect=GiteaAPIError(404, "not found")):
|
||||||
|
assert push.branch_exists("owner/repo", "nonexistent") is False
|
||||||
|
|
||||||
|
def test_api_error_propagates(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "_api", side_effect=GiteaAPIError(500, "server error")):
|
||||||
|
with pytest.raises(GiteaAPIError):
|
||||||
|
push.branch_exists("owner/repo", "main")
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnsureBranch:
|
||||||
|
def test_already_exists(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", return_value=True):
|
||||||
|
assert push.ensure_branch("owner/repo", "my-branch") is True
|
||||||
|
|
||||||
|
def test_creates_branch(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", side_effect=[False, True]):
|
||||||
|
with patch.object(push, "_api", return_value={"name": "my-branch"}):
|
||||||
|
assert push.ensure_branch("owner/repo", "my-branch", base="main") is True
|
||||||
|
|
||||||
|
def test_creation_fails(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", return_value=False):
|
||||||
|
with patch.object(push, "_api", side_effect=GiteaAPIError(422, "invalid")):
|
||||||
|
assert push.ensure_branch("owner/repo", "bad-branch") is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestPushFile:
|
||||||
|
def test_rejects_missing_branch(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", return_value=False):
|
||||||
|
result = push.push_file("owner/repo", "missing", "file.py", "content", "msg")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_creates_new_file(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", return_value=True):
|
||||||
|
with patch.object(push, "_api", side_effect=[
|
||||||
|
GiteaAPIError(404, "not found"), # GET existing file
|
||||||
|
{}, # POST new file
|
||||||
|
]):
|
||||||
|
result = push.push_file("owner/repo", "branch", "new.py", "content", "msg")
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
def test_updates_existing_file(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "branch_exists", return_value=True):
|
||||||
|
with patch.object(push, "_api", side_effect=[
|
||||||
|
{"sha": "abc123"}, # GET existing file
|
||||||
|
{}, # PUT update
|
||||||
|
]):
|
||||||
|
result = push.push_file("owner/repo", "branch", "existing.py", "new content", "msg")
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
def test_create_branch_when_missing(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
# Mock branch_exists: first call returns False (doesn't exist),
|
||||||
|
# second call (inside ensure_branch) returns True (created externally)
|
||||||
|
exists_calls = [False, True]
|
||||||
|
exists_idx = [0]
|
||||||
|
def mock_exists(repo, branch):
|
||||||
|
idx = min(exists_idx[0], len(exists_calls) - 1)
|
||||||
|
exists_idx[0] += 1
|
||||||
|
return exists_calls[idx]
|
||||||
|
with patch.object(push, "branch_exists", side_effect=mock_exists):
|
||||||
|
with patch.object(push, "_api") as mock_api:
|
||||||
|
mock_api.side_effect = [
|
||||||
|
GiteaAPIError(404, "not found"), # GET existing file (not found)
|
||||||
|
{"content": {"path": "f.py"}}, # POST new file
|
||||||
|
]
|
||||||
|
result = push.push_file("owner/repo", "new-branch", "f.py", "c", "m", create_branch=True)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestPushFiles:
|
||||||
|
def test_push_multiple_files(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "ensure_branch", return_value=True):
|
||||||
|
with patch.object(push, "push_file", return_value=True):
|
||||||
|
results = push.push_files("owner/repo", "branch", {
|
||||||
|
"a.py": "content a",
|
||||||
|
"b.py": "content b",
|
||||||
|
}, "message")
|
||||||
|
assert all(results.values())
|
||||||
|
assert len(results) == 2
|
||||||
|
|
||||||
|
def test_branch_creation_fails_aborts_all(self):
|
||||||
|
push = GiteaSafePush("https://forge.example.com", "token123")
|
||||||
|
with patch.object(push, "ensure_branch", return_value=False):
|
||||||
|
results = push.push_files("owner/repo", "bad", {"a.py": "x"}, "msg")
|
||||||
|
assert all(v is False for v in results.values())
|
||||||
Reference in New Issue
Block a user