Files
timmy-config/gitea_client.py
Step35 1b97aa395d
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
Architecture Lint / Linter Tests (pull_request) Successful in 22s
Validate Config / YAML Lint (pull_request) Failing after 15s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 57s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m0s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
Architecture Lint / Lint Repository (pull_request) Failing after 22s
PR Checklist / pr-checklist (pull_request) Successful in 4m20s
[AUDIT][RISK] Enforce branch protection — agents merge before review
Add set_branch_protection() and related methods to GiteaClient.

Fixes #482 via three-way enforcement:
1. gitea_client.py: Added get/set/delete_branch_protection() API wrappers
2. bin/enable-branch-protection.py: Idempotent script protects main branches
   of all Timmy_Foundation core repos with:
   ── required_approvals: 1 (at least one human review)
   ── require_status_checks: true (CI must pass)
   ── restrict_merge: true (only admins + reviewers can merge)
3. tests/test_gitea_client_core.py: Added TestBranchProtection suite

Usage: after merging, run:
  bin/enable-branch-protection.py --dry-run  # verify
  bin/enable-branch-protection.py             # apply to all core repos

This prevents agents from merging their own PRs before human review.
2026-04-26 12:00:23 -04:00

630 lines
20 KiB
Python

"""
Gitea API Client — typed, sovereign, zero-dependency.
Replaces raw curl calls scattered across 41 bash scripts.
Uses only stdlib (urllib) so it works on any Python install.
Usage:
from gitea_client import GiteaClient
client = GiteaClient() # reads token from standard local paths
issues = client.list_issues("Timmy_Foundation/the-nexus", state="open")
client.create_comment("Timmy_Foundation/the-nexus", 42, "PR created.")
"""
from __future__ import annotations
import json
import os
import urllib.request
import urllib.error
import urllib.parse
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _read_token() -> str:
"""Read Gitea token from standard locations."""
for path in [
Path.home() / ".hermes" / "gitea_token",
Path.home() / ".hermes" / "gitea_token_vps",
Path.home() / ".config" / "gitea" / "token",
]:
if path.exists():
return path.read_text().strip()
raise FileNotFoundError(
"No Gitea token found. Checked: ~/.hermes/gitea_token, "
"~/.hermes/gitea_token_vps, ~/.config/gitea/token"
)
def _read_base_url() -> str:
"""Read Gitea base URL. Defaults to the VPS."""
env = os.environ.get("GITEA_URL")
if env:
return env.rstrip("/")
return "http://143.198.27.163:3000"
# ---------------------------------------------------------------------------
# Data classes — typed responses
# ---------------------------------------------------------------------------
@dataclass
class User:
id: int
login: str
full_name: str = ""
email: str = ""
@classmethod
def from_dict(cls, d: dict) -> "User":
return cls(
id=d.get("id", 0),
login=d.get("login", ""),
full_name=d.get("full_name", ""),
email=d.get("email", ""),
)
@dataclass
class Label:
id: int
name: str
color: str = ""
@classmethod
def from_dict(cls, d: dict) -> "Label":
return cls(id=d.get("id", 0), name=d.get("name", ""), color=d.get("color", ""))
@dataclass
class Issue:
number: int
title: str
body: str
state: str
user: User
assignees: list[User] = field(default_factory=list)
labels: list[Label] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
comments: int = 0
@classmethod
def from_dict(cls, d: dict) -> "Issue":
return cls(
number=d.get("number", 0),
title=d.get("title", ""),
body=d.get("body", "") or "",
state=d.get("state", ""),
user=User.from_dict(d.get("user", {})),
assignees=[User.from_dict(a) for a in d.get("assignees", []) or []],
labels=[Label.from_dict(lb) for lb in d.get("labels", []) or []],
created_at=d.get("created_at", ""),
updated_at=d.get("updated_at", ""),
comments=d.get("comments", 0),
)
@dataclass
class Comment:
id: int
body: str
user: User
created_at: str = ""
@classmethod
def from_dict(cls, d: dict) -> "Comment":
return cls(
id=d.get("id", 0),
body=d.get("body", "") or "",
user=User.from_dict(d.get("user", {})),
created_at=d.get("created_at", ""),
)
@dataclass
class PullRequest:
number: int
title: str
body: str
state: str
user: User
head_branch: str = ""
base_branch: str = ""
mergeable: bool = False
merged: bool = False
changed_files: int = 0
additions: int = 0
deletions: int = 0
created_at: str = ""
updated_at: str = ""
closed_at: str = ""
@classmethod
def from_dict(cls, d: dict) -> "PullRequest":
head = d.get("head", {}) or {}
base = d.get("base", {}) or {}
return cls(
number=d.get("number", 0),
title=d.get("title", ""),
body=d.get("body", "") or "",
state=d.get("state", ""),
user=User.from_dict(d.get("user", {})),
head_branch=head.get("ref", ""),
base_branch=base.get("ref", ""),
mergeable=d.get("mergeable", False),
merged=d.get("merged", False) or False,
changed_files=d.get("changed_files", 0),
additions=d.get("additions", 0),
deletions=d.get("deletions", 0),
created_at=d.get("created_at", ""),
updated_at=d.get("updated_at", ""),
closed_at=d.get("closed_at", ""),
)
@dataclass
class PRFile:
filename: str
status: str # added, modified, deleted
additions: int = 0
deletions: int = 0
@classmethod
def from_dict(cls, d: dict) -> "PRFile":
return cls(
filename=d.get("filename", ""),
status=d.get("status", ""),
additions=d.get("additions", 0),
deletions=d.get("deletions", 0),
)
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class GiteaError(Exception):
"""Gitea API error with status code."""
def __init__(self, status: int, message: str, url: str = ""):
self.status = status
self.url = url
super().__init__(f"Gitea {status}: {message} [{url}]")
class GiteaClient:
"""
Typed Gitea API client. Sovereign, zero-dependency.
Covers all operations the agent loops need:
- Issues: list, get, create, update, close, assign, label, comment
- PRs: list, get, create, merge, update, close, files
- Repos: list org repos
"""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
):
self.base_url = base_url or _read_base_url()
self.token = token or _read_token()
self.api = f"{self.base_url}/api/v1"
# -- HTTP layer ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[dict] = None,
params: Optional[dict] = None,
retries: int = 3,
backoff: float = 1.5,
) -> Any:
"""Make an authenticated API request with exponential backoff retries."""
url = f"{self.api}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
body = json.dumps(data).encode() if data else None
for attempt in range(retries):
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode()
if not raw:
return {}
return json.loads(raw)
except urllib.error.HTTPError as e:
# Don't retry client errors (4xx) except 429
if 400 <= e.code < 500 and e.code != 429:
body_text = ""
try:
body_text = e.read().decode()
except Exception:
pass
raise GiteaError(e.code, body_text, url) from e
if attempt == retries - 1:
raise GiteaError(e.code, str(e), url) from e
time.sleep(backoff ** attempt)
except (urllib.error.URLError, TimeoutError) as e:
if attempt == retries - 1:
raise GiteaError(500, str(e), url) from e
time.sleep(backoff ** attempt)
def _get(self, path: str, **params) -> Any:
# Filter out None values
clean = {k: v for k, v in params.items() if v is not None}
return self._request("GET", path, params=clean)
def _post(self, path: str, data: dict) -> Any:
return self._request("POST", path, data=data)
def _patch(self, path: str, data: dict) -> Any:
return self._request("PATCH", path, data=data)
def _delete(self, path: str) -> Any:
return self._request("DELETE", path)
def _repo_path(self, repo: str) -> str:
"""Convert 'owner/name' to '/repos/owner/name'."""
return f"/repos/{repo}"
# -- Health --------------------------------------------------------------
def ping(self) -> bool:
"""Check if Gitea is responding."""
try:
self._get("/version")
return True
except Exception:
return False
# -- Repos ---------------------------------------------------------------
def list_org_repos(self, org: str, limit: int = 50, page: int = 1) -> list[dict]:
"""List repos in an organization."""
return self._get(f"/orgs/{org}/repos", limit=limit, page=page)
# -- Issues --------------------------------------------------------------
def list_issues(
self,
repo: str,
state: str = "open",
assignee: Optional[str] = None,
labels: Optional[str] = None,
sort: str = "created",
direction: str = "desc",
limit: int = 30,
page: int = 1,
since: Optional[str] = None,
) -> list[Issue]:
"""List issues for a repo."""
raw = self._get(
f"{self._repo_path(repo)}/issues",
state=state,
type="issues",
assignee=assignee,
labels=labels,
sort=sort,
direction=direction,
limit=limit,
page=page,
since=since,
)
return [Issue.from_dict(i) for i in raw]
def get_issue(self, repo: str, number: int) -> Issue:
"""Get a single issue."""
return Issue.from_dict(
self._get(f"{self._repo_path(repo)}/issues/{number}")
)
def create_issue(
self,
repo: str,
title: str,
body: str = "",
labels: Optional[list[int]] = None,
assignees: Optional[list[str]] = None,
) -> Issue:
"""Create an issue."""
data: dict[str, Any] = {"title": title, "body": body}
if labels:
data["labels"] = labels
if assignees:
data["assignees"] = assignees
return Issue.from_dict(
self._post(f"{self._repo_path(repo)}/issues", data)
)
def update_issue(
self,
repo: str,
number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
assignees: Optional[list[str]] = None,
) -> Issue:
"""Update an issue (title, body, state, assignees)."""
data: dict[str, Any] = {}
if title is not None:
data["title"] = title
if body is not None:
data["body"] = body
if state is not None:
data["state"] = state
if assignees is not None:
data["assignees"] = assignees
return Issue.from_dict(
self._patch(f"{self._repo_path(repo)}/issues/{number}", data)
)
def close_issue(self, repo: str, number: int) -> Issue:
"""Close an issue."""
return self.update_issue(repo, number, state="closed")
def assign_issue(self, repo: str, number: int, assignees: list[str]) -> Issue:
"""Assign users to an issue."""
return self.update_issue(repo, number, assignees=assignees)
def add_labels(self, repo: str, number: int, label_ids: list[int]) -> list[Label]:
"""Add labels to an issue."""
raw = self._post(
f"{self._repo_path(repo)}/issues/{number}/labels",
{"labels": label_ids},
)
return [Label.from_dict(lb) for lb in raw]
# -- Comments ------------------------------------------------------------
def list_comments(
self, repo: str, number: int, since: Optional[str] = None
) -> list[Comment]:
"""List comments on an issue."""
raw = self._get(
f"{self._repo_path(repo)}/issues/{number}/comments",
since=since,
)
return [Comment.from_dict(c) for c in raw]
def create_comment(self, repo: str, number: int, body: str) -> Comment:
"""Add a comment to an issue."""
return Comment.from_dict(
self._post(
f"{self._repo_path(repo)}/issues/{number}/comments",
{"body": body},
)
)
# -- Pull Requests -------------------------------------------------------
def list_pulls(
self,
repo: str,
state: str = "open",
sort: str = "newest",
limit: int = 20,
page: int = 1,
) -> list[PullRequest]:
"""List pull requests."""
raw = self._get(
f"{self._repo_path(repo)}/pulls",
state=state,
sort=sort,
limit=limit,
page=page,
)
return [PullRequest.from_dict(p) for p in raw]
def get_pull(self, repo: str, number: int) -> PullRequest:
"""Get a single pull request."""
return PullRequest.from_dict(
self._get(f"{self._repo_path(repo)}/pulls/{number}")
)
def create_pull(
self,
repo: str,
title: str,
head: str,
base: str = "main",
body: str = "",
) -> PullRequest:
"""Create a pull request."""
return PullRequest.from_dict(
self._post(
f"{self._repo_path(repo)}/pulls",
{"title": title, "head": head, "base": base, "body": body},
)
)
def merge_pull(
self,
repo: str,
number: int,
method: str = "squash",
delete_branch: bool = True,
) -> bool:
"""Merge a pull request. Returns True on success."""
try:
self._post(
f"{self._repo_path(repo)}/pulls/{number}/merge",
{"Do": method, "delete_branch_after_merge": delete_branch},
)
return True
except GiteaError as e:
if e.status == 405: # not mergeable
return False
raise
def update_pull_branch(
self, repo: str, number: int, style: str = "rebase"
) -> bool:
"""Update a PR branch (rebase onto base). Returns True on success."""
try:
self._post(
f"{self._repo_path(repo)}/pulls/{number}/update",
{"style": style},
)
return True
except GiteaError:
return False
def close_pull(self, repo: str, number: int) -> PullRequest:
"""Close a pull request without merging."""
return PullRequest.from_dict(
self._patch(
f"{self._repo_path(repo)}/pulls/{number}",
{"state": "closed"},
)
)
def get_pull_files(self, repo: str, number: int) -> list[PRFile]:
"""Get files changed in a pull request."""
raw = self._get(f"{self._repo_path(repo)}/pulls/{number}/files")
return [PRFile.from_dict(f) for f in raw]
def find_pull_by_branch(
self, repo: str, branch: str
) -> Optional[PullRequest]:
"""Find an open PR for a given head branch."""
prs = self.list_pulls(repo, state="open", limit=50)
for pr in prs:
if pr.head_branch == branch:
return pr
return None
# -- Branch Protection ----------------------------------------------------
def get_branch_protection(
self, repo: str, branch: str
) -> Optional[dict[str, Any]]:
"""Get branch protection settings for a branch. Returns None if no protection."""
try:
return self._get(
f"{self._repo_path(repo)}/branch_protection/{urllib.parse.quote(branch)}"
)
except GiteaError as e:
if e.status == 404:
return None
raise
def set_branch_protection(
self,
repo: str,
branch: str,
required_approvals: int = 1,
require_status_checks: bool = True,
restrict_merge: bool = True,
dismiss_stale_reviews: bool = False,
require_owner_approval: bool = False,
allow_force_pushes: bool = False,
allow_deletions: bool = False,
) -> dict[str, Any]:
"""Create or update branch protection for a branch.
Enforces:
- At least 1 approval before merge
- CI/status checks must pass before merge
Returns the protection rule dict from Gitea.
"""
data: dict[str, Any] = {
"required_approvals": required_approvals,
"require_status_checks": require_status_checks,
"restrict_merge": restrict_merge,
"dismiss_stale_reviews": dismiss_stale_reviews,
"require_owner_approval": require_owner_approval,
"allow_force_pushes": allow_force_pushes,
"allow_deletions": allow_deletions,
}
return self._post(
f"{self._repo_path(repo)}/branch_protection/{urllib.parse.quote(branch)}",
data,
)
def delete_branch_protection(self, repo: str, branch: str) -> bool:
"""Remove branch protection from a branch. Returns True if deleted."""
try:
self._delete(
f"{self._repo_path(repo)}/branch_protection/{urllib.parse.quote(branch)}"
)
return True
except GiteaError as e:
if e.status == 404:
return False
raise
# -- Convenience ---------------------------------------------------------
def get_issue_with_comments(
self, repo: str, number: int, last_n: int = 5
) -> tuple[Issue, list[Comment]]:
"""Get an issue and its most recent comments."""
issue = self.get_issue(repo, number)
comments = self.list_comments(repo, number)
return issue, comments[-last_n:] if len(comments) > last_n else comments
def find_unassigned_issues(
self,
repo: str,
limit: int = 30,
exclude_labels: Optional[list[str]] = None,
exclude_title_patterns: Optional[list[str]] = None,
) -> list[Issue]:
"""Find open issues not assigned to anyone."""
issues = self.list_issues(repo, state="open", limit=limit)
result = []
for issue in issues:
if issue.assignees:
continue
if exclude_labels:
issue_label_names = {lb.name for lb in issue.labels}
if issue_label_names & set(exclude_labels):
continue
if exclude_title_patterns:
title_lower = issue.title.lower()
if any(p.lower() in title_lower for p in exclude_title_patterns):
continue
result.append(issue)
return result
def find_agent_issues(self, repo: str, agent: str, limit: int = 50) -> list[Issue]:
"""Find open issues assigned to a specific agent.
Gitea's assignee query can return stale or misleading results, so we
always post-filter on the actual assignee list in the returned issue.
"""
issues = self.list_issues(repo, state="open", assignee=agent, limit=limit)
agent_lower = agent.lower()
return [
issue for issue in issues
if any((assignee.login or "").lower() == agent_lower for assignee in issue.assignees)
]
def find_agent_pulls(self, repo: str, agent: str) -> list[PullRequest]:
"""Find open PRs created by a specific agent."""
prs = self.list_pulls(repo, state="open", limit=50)
return [pr for pr in prs if pr.user.login == agent]