""" Gitea API Client — typed, sovereign, zero-dependency. Replaces raw curl calls scattered across 41 bash scripts. Uses only stdlib (urllib) so it works on any Python install. Usage: from gitea_client import GiteaClient client = GiteaClient() # reads token from standard local paths issues = client.list_issues("Timmy_Foundation/the-nexus", state="open") client.create_comment("Timmy_Foundation/the-nexus", 42, "PR created.") """ from __future__ import annotations import json import os import urllib.request import urllib.error import urllib.parse from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def _read_token() -> str: """Read Gitea token from standard locations.""" for path in [ Path.home() / ".hermes" / "gitea_token", Path.home() / ".hermes" / "gitea_token_vps", Path.home() / ".config" / "gitea" / "token", ]: if path.exists(): return path.read_text().strip() raise FileNotFoundError( "No Gitea token found. Checked: ~/.hermes/gitea_token, " "~/.hermes/gitea_token_vps, ~/.config/gitea/token" ) def _read_base_url() -> str: """Read Gitea base URL. Defaults to the VPS.""" env = os.environ.get("GITEA_URL") if env: return env.rstrip("/") return "http://143.198.27.163:3000" # --------------------------------------------------------------------------- # Data classes — typed responses # --------------------------------------------------------------------------- @dataclass class User: id: int login: str full_name: str = "" email: str = "" @classmethod def from_dict(cls, d: dict) -> "User": return cls( id=d.get("id", 0), login=d.get("login", ""), full_name=d.get("full_name", ""), email=d.get("email", ""), ) @dataclass class Label: id: int name: str color: str = "" @classmethod def from_dict(cls, d: dict) -> "Label": return cls(id=d.get("id", 0), name=d.get("name", ""), color=d.get("color", "")) @dataclass class Issue: number: int title: str body: str state: str user: User assignees: list[User] = field(default_factory=list) labels: list[Label] = field(default_factory=list) created_at: str = "" updated_at: str = "" comments: int = 0 @classmethod def from_dict(cls, d: dict) -> "Issue": return cls( number=d.get("number", 0), title=d.get("title", ""), body=d.get("body", "") or "", state=d.get("state", ""), user=User.from_dict(d.get("user", {})), assignees=[User.from_dict(a) for a in d.get("assignees", []) or []], labels=[Label.from_dict(lb) for lb in d.get("labels", []) or []], created_at=d.get("created_at", ""), updated_at=d.get("updated_at", ""), comments=d.get("comments", 0), ) @dataclass class Comment: id: int body: str user: User created_at: str = "" @classmethod def from_dict(cls, d: dict) -> "Comment": return cls( id=d.get("id", 0), body=d.get("body", "") or "", user=User.from_dict(d.get("user", {})), created_at=d.get("created_at", ""), ) @dataclass class PullRequest: number: int title: str body: str state: str user: User head_branch: str = "" base_branch: str = "" mergeable: bool = False merged: bool = False changed_files: int = 0 @classmethod def from_dict(cls, d: dict) -> "PullRequest": head = d.get("head", {}) or {} base = d.get("base", {}) or {} return cls( number=d.get("number", 0), title=d.get("title", ""), body=d.get("body", "") or "", state=d.get("state", ""), user=User.from_dict(d.get("user", {})), head_branch=head.get("ref", ""), base_branch=base.get("ref", ""), mergeable=d.get("mergeable", False), merged=d.get("merged", False) or False, changed_files=d.get("changed_files", 0), ) @dataclass class PRFile: filename: str status: str # added, modified, deleted additions: int = 0 deletions: int = 0 @classmethod def from_dict(cls, d: dict) -> "PRFile": return cls( filename=d.get("filename", ""), status=d.get("status", ""), additions=d.get("additions", 0), deletions=d.get("deletions", 0), ) # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class GiteaError(Exception): """Gitea API error with status code.""" def __init__(self, status: int, message: str, url: str = ""): self.status = status self.url = url super().__init__(f"Gitea {status}: {message} [{url}]") class GiteaClient: """ Typed Gitea API client. Sovereign, zero-dependency. Covers all operations the agent loops need: - Issues: list, get, create, update, close, assign, label, comment - PRs: list, get, create, merge, update, close, files - Repos: list org repos """ def __init__( self, base_url: Optional[str] = None, token: Optional[str] = None, ): self.base_url = base_url or _read_base_url() self.token = token or _read_token() self.api = f"{self.base_url}/api/v1" # -- HTTP layer ---------------------------------------------------------- def _request( self, method: str, path: str, data: Optional[dict] = None, params: Optional[dict] = None, ) -> Any: """Make an authenticated API request. Returns parsed JSON.""" url = f"{self.api}{path}" if params: url += "?" + urllib.parse.urlencode(params) body = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=body, method=method) req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") req.add_header("Accept", "application/json") try: with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode() if not raw: return {} return json.loads(raw) except urllib.error.HTTPError as e: body_text = "" try: body_text = e.read().decode() except Exception: pass raise GiteaError(e.code, body_text, url) from e def _get(self, path: str, **params) -> Any: # Filter out None values clean = {k: v for k, v in params.items() if v is not None} return self._request("GET", path, params=clean) def _post(self, path: str, data: dict) -> Any: return self._request("POST", path, data=data) def _patch(self, path: str, data: dict) -> Any: return self._request("PATCH", path, data=data) def _delete(self, path: str) -> Any: return self._request("DELETE", path) def _repo_path(self, repo: str) -> str: """Convert 'owner/name' to '/repos/owner/name'.""" return f"/repos/{repo}" # -- Health -------------------------------------------------------------- def ping(self) -> bool: """Check if Gitea is responding.""" try: self._get("/version") return True except Exception: return False # -- Repos --------------------------------------------------------------- def list_org_repos(self, org: str, limit: int = 50) -> list[dict]: """List repos in an organization.""" return self._get(f"/orgs/{org}/repos", limit=limit) # -- Issues -------------------------------------------------------------- def list_issues( self, repo: str, state: str = "open", assignee: Optional[str] = None, labels: Optional[str] = None, sort: str = "created", direction: str = "desc", limit: int = 30, page: int = 1, ) -> list[Issue]: """List issues for a repo.""" raw = self._get( f"{self._repo_path(repo)}/issues", state=state, type="issues", assignee=assignee, labels=labels, sort=sort, direction=direction, limit=limit, page=page, ) return [Issue.from_dict(i) for i in raw] def get_issue(self, repo: str, number: int) -> Issue: """Get a single issue.""" return Issue.from_dict( self._get(f"{self._repo_path(repo)}/issues/{number}") ) def create_issue( self, repo: str, title: str, body: str = "", labels: Optional[list[int]] = None, assignees: Optional[list[str]] = None, ) -> Issue: """Create an issue.""" data: dict[str, Any] = {"title": title, "body": body} if labels: data["labels"] = labels if assignees: data["assignees"] = assignees return Issue.from_dict( self._post(f"{self._repo_path(repo)}/issues", data) ) def update_issue( self, repo: str, number: int, title: Optional[str] = None, body: Optional[str] = None, state: Optional[str] = None, assignees: Optional[list[str]] = None, ) -> Issue: """Update an issue (title, body, state, assignees).""" data: dict[str, Any] = {} if title is not None: data["title"] = title if body is not None: data["body"] = body if state is not None: data["state"] = state if assignees is not None: data["assignees"] = assignees return Issue.from_dict( self._patch(f"{self._repo_path(repo)}/issues/{number}", data) ) def close_issue(self, repo: str, number: int) -> Issue: """Close an issue.""" return self.update_issue(repo, number, state="closed") def assign_issue(self, repo: str, number: int, assignees: list[str]) -> Issue: """Assign users to an issue.""" return self.update_issue(repo, number, assignees=assignees) def add_labels(self, repo: str, number: int, label_ids: list[int]) -> list[Label]: """Add labels to an issue.""" raw = self._post( f"{self._repo_path(repo)}/issues/{number}/labels", {"labels": label_ids}, ) return [Label.from_dict(lb) for lb in raw] # -- Comments ------------------------------------------------------------ def list_comments( self, repo: str, number: int, since: Optional[str] = None ) -> list[Comment]: """List comments on an issue.""" raw = self._get( f"{self._repo_path(repo)}/issues/{number}/comments", since=since, ) return [Comment.from_dict(c) for c in raw] def create_comment(self, repo: str, number: int, body: str) -> Comment: """Add a comment to an issue.""" return Comment.from_dict( self._post( f"{self._repo_path(repo)}/issues/{number}/comments", {"body": body}, ) ) # -- Pull Requests ------------------------------------------------------- def list_pulls( self, repo: str, state: str = "open", sort: str = "newest", limit: int = 20, page: int = 1, ) -> list[PullRequest]: """List pull requests.""" raw = self._get( f"{self._repo_path(repo)}/pulls", state=state, sort=sort, limit=limit, page=page, ) return [PullRequest.from_dict(p) for p in raw] def get_pull(self, repo: str, number: int) -> PullRequest: """Get a single pull request.""" return PullRequest.from_dict( self._get(f"{self._repo_path(repo)}/pulls/{number}") ) def create_pull( self, repo: str, title: str, head: str, base: str = "main", body: str = "", ) -> PullRequest: """Create a pull request.""" return PullRequest.from_dict( self._post( f"{self._repo_path(repo)}/pulls", {"title": title, "head": head, "base": base, "body": body}, ) ) def merge_pull( self, repo: str, number: int, method: str = "squash", delete_branch: bool = True, ) -> bool: """Merge a pull request. Returns True on success.""" try: self._post( f"{self._repo_path(repo)}/pulls/{number}/merge", {"Do": method, "delete_branch_after_merge": delete_branch}, ) return True except GiteaError as e: if e.status == 405: # not mergeable return False raise def update_pull_branch( self, repo: str, number: int, style: str = "rebase" ) -> bool: """Update a PR branch (rebase onto base). Returns True on success.""" try: self._post( f"{self._repo_path(repo)}/pulls/{number}/update", {"style": style}, ) return True except GiteaError: return False def close_pull(self, repo: str, number: int) -> PullRequest: """Close a pull request without merging.""" return PullRequest.from_dict( self._patch( f"{self._repo_path(repo)}/pulls/{number}", {"state": "closed"}, ) ) def get_pull_files(self, repo: str, number: int) -> list[PRFile]: """Get files changed in a pull request.""" raw = self._get(f"{self._repo_path(repo)}/pulls/{number}/files") return [PRFile.from_dict(f) for f in raw] def find_pull_by_branch( self, repo: str, branch: str ) -> Optional[PullRequest]: """Find an open PR for a given head branch.""" prs = self.list_pulls(repo, state="open", limit=50) for pr in prs: if pr.head_branch == branch: return pr return None # -- Convenience --------------------------------------------------------- def get_issue_with_comments( self, repo: str, number: int, last_n: int = 5 ) -> tuple[Issue, list[Comment]]: """Get an issue and its most recent comments.""" issue = self.get_issue(repo, number) comments = self.list_comments(repo, number) return issue, comments[-last_n:] if len(comments) > last_n else comments def find_unassigned_issues( self, repo: str, limit: int = 30, exclude_labels: Optional[list[str]] = None, exclude_title_patterns: Optional[list[str]] = None, ) -> list[Issue]: """Find open issues not assigned to anyone.""" issues = self.list_issues(repo, state="open", limit=limit) result = [] for issue in issues: if issue.assignees: continue if exclude_labels: issue_label_names = {lb.name for lb in issue.labels} if issue_label_names & set(exclude_labels): continue if exclude_title_patterns: title_lower = issue.title.lower() if any(p.lower() in title_lower for p in exclude_title_patterns): continue result.append(issue) return result def find_agent_issues(self, repo: str, agent: str, limit: int = 50) -> list[Issue]: """Find open issues assigned to a specific agent. Gitea's assignee query can return stale or misleading results, so we always post-filter on the actual assignee list in the returned issue. """ issues = self.list_issues(repo, state="open", assignee=agent, limit=limit) agent_lower = agent.lower() return [ issue for issue in issues if any((assignee.login or "").lower() == agent_lower for assignee in issue.assignees) ] def find_agent_pulls(self, repo: str, agent: str) -> list[PullRequest]: """Find open PRs created by a specific agent.""" prs = self.list_pulls(repo, state="open", limit=50) return [pr for pr in prs if pr.user.login == agent]