Files
timmy-config/gitea_client.py

540 lines
17 KiB
Python

"""
Gitea API Client — typed, sovereign, zero-dependency.
Replaces raw curl calls scattered across 41 bash scripts.
Uses only stdlib (urllib) so it works on any Python install.
Usage:
from tools.gitea_client import GiteaClient
client = GiteaClient() # reads token from ~/.hermes/gitea_token
issues = client.list_issues("Timmy_Foundation/the-nexus", state="open")
client.create_comment("Timmy_Foundation/the-nexus", 42, "PR created.")
"""
from __future__ import annotations
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _read_token() -> str:
"""Read Gitea token from standard locations."""
for path in [
Path.home() / ".hermes" / "gitea_token",
Path.home() / ".hermes" / "gitea_token_vps",
Path.home() / ".config" / "gitea" / "token",
]:
if path.exists():
return path.read_text().strip()
raise FileNotFoundError(
"No Gitea token found. Checked: ~/.hermes/gitea_token, "
"~/.hermes/gitea_token_vps, ~/.config/gitea/token"
)
def _read_base_url() -> str:
"""Read Gitea base URL. Defaults to the VPS."""
env = os.environ.get("GITEA_URL")
if env:
return env.rstrip("/")
return "http://143.198.27.163:3000"
# ---------------------------------------------------------------------------
# Data classes — typed responses
# ---------------------------------------------------------------------------
@dataclass
class User:
id: int
login: str
full_name: str = ""
email: str = ""
@classmethod
def from_dict(cls, d: dict) -> "User":
return cls(
id=d.get("id", 0),
login=d.get("login", ""),
full_name=d.get("full_name", ""),
email=d.get("email", ""),
)
@dataclass
class Label:
id: int
name: str
color: str = ""
@classmethod
def from_dict(cls, d: dict) -> "Label":
return cls(id=d.get("id", 0), name=d.get("name", ""), color=d.get("color", ""))
@dataclass
class Issue:
number: int
title: str
body: str
state: str
user: User
assignees: list[User] = field(default_factory=list)
labels: list[Label] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
comments: int = 0
@classmethod
def from_dict(cls, d: dict) -> "Issue":
return cls(
number=d.get("number", 0),
title=d.get("title", ""),
body=d.get("body", "") or "",
state=d.get("state", ""),
user=User.from_dict(d.get("user", {})),
assignees=[User.from_dict(a) for a in d.get("assignees", []) or []],
labels=[Label.from_dict(lb) for lb in d.get("labels", []) or []],
created_at=d.get("created_at", ""),
updated_at=d.get("updated_at", ""),
comments=d.get("comments", 0),
)
@dataclass
class Comment:
id: int
body: str
user: User
created_at: str = ""
@classmethod
def from_dict(cls, d: dict) -> "Comment":
return cls(
id=d.get("id", 0),
body=d.get("body", "") or "",
user=User.from_dict(d.get("user", {})),
created_at=d.get("created_at", ""),
)
@dataclass
class PullRequest:
number: int
title: str
body: str
state: str
user: User
head_branch: str = ""
base_branch: str = ""
mergeable: bool = False
merged: bool = False
changed_files: int = 0
@classmethod
def from_dict(cls, d: dict) -> "PullRequest":
head = d.get("head", {}) or {}
base = d.get("base", {}) or {}
return cls(
number=d.get("number", 0),
title=d.get("title", ""),
body=d.get("body", "") or "",
state=d.get("state", ""),
user=User.from_dict(d.get("user", {})),
head_branch=head.get("ref", ""),
base_branch=base.get("ref", ""),
mergeable=d.get("mergeable", False),
merged=d.get("merged", False) or False,
changed_files=d.get("changed_files", 0),
)
@dataclass
class PRFile:
filename: str
status: str # added, modified, deleted
additions: int = 0
deletions: int = 0
@classmethod
def from_dict(cls, d: dict) -> "PRFile":
return cls(
filename=d.get("filename", ""),
status=d.get("status", ""),
additions=d.get("additions", 0),
deletions=d.get("deletions", 0),
)
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class GiteaError(Exception):
"""Gitea API error with status code."""
def __init__(self, status: int, message: str, url: str = ""):
self.status = status
self.url = url
super().__init__(f"Gitea {status}: {message} [{url}]")
class GiteaClient:
"""
Typed Gitea API client. Sovereign, zero-dependency.
Covers all operations the agent loops need:
- Issues: list, get, create, update, close, assign, label, comment
- PRs: list, get, create, merge, update, close, files
- Repos: list org repos
"""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
):
self.base_url = base_url or _read_base_url()
self.token = token or _read_token()
self.api = f"{self.base_url}/api/v1"
# -- HTTP layer ----------------------------------------------------------
def _request(
self,
method: str,
path: str,
data: Optional[dict] = None,
params: Optional[dict] = None,
) -> Any:
"""Make an authenticated API request. Returns parsed JSON."""
url = f"{self.api}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode()
if not raw:
return {}
return json.loads(raw)
except urllib.error.HTTPError as e:
body_text = ""
try:
body_text = e.read().decode()
except Exception:
pass
raise GiteaError(e.code, body_text, url) from e
def _get(self, path: str, **params) -> Any:
# Filter out None values
clean = {k: v for k, v in params.items() if v is not None}
return self._request("GET", path, params=clean)
def _post(self, path: str, data: dict) -> Any:
return self._request("POST", path, data=data)
def _patch(self, path: str, data: dict) -> Any:
return self._request("PATCH", path, data=data)
def _delete(self, path: str) -> Any:
return self._request("DELETE", path)
def _repo_path(self, repo: str) -> str:
"""Convert 'owner/name' to '/repos/owner/name'."""
return f"/repos/{repo}"
# -- Health --------------------------------------------------------------
def ping(self) -> bool:
"""Check if Gitea is responding."""
try:
self._get("/version")
return True
except Exception:
return False
# -- Repos ---------------------------------------------------------------
def list_org_repos(self, org: str, limit: int = 50) -> list[dict]:
"""List repos in an organization."""
return self._get(f"/orgs/{org}/repos", limit=limit)
# -- Issues --------------------------------------------------------------
def list_issues(
self,
repo: str,
state: str = "open",
assignee: Optional[str] = None,
labels: Optional[str] = None,
sort: str = "created",
direction: str = "desc",
limit: int = 30,
page: int = 1,
) -> list[Issue]:
"""List issues for a repo."""
raw = self._get(
f"{self._repo_path(repo)}/issues",
state=state,
type="issues",
assignee=assignee,
labels=labels,
sort=sort,
direction=direction,
limit=limit,
page=page,
)
return [Issue.from_dict(i) for i in raw]
def get_issue(self, repo: str, number: int) -> Issue:
"""Get a single issue."""
return Issue.from_dict(
self._get(f"{self._repo_path(repo)}/issues/{number}")
)
def create_issue(
self,
repo: str,
title: str,
body: str = "",
labels: Optional[list[int]] = None,
assignees: Optional[list[str]] = None,
) -> Issue:
"""Create an issue."""
data: dict[str, Any] = {"title": title, "body": body}
if labels:
data["labels"] = labels
if assignees:
data["assignees"] = assignees
return Issue.from_dict(
self._post(f"{self._repo_path(repo)}/issues", data)
)
def update_issue(
self,
repo: str,
number: int,
title: Optional[str] = None,
body: Optional[str] = None,
state: Optional[str] = None,
assignees: Optional[list[str]] = None,
) -> Issue:
"""Update an issue (title, body, state, assignees)."""
data: dict[str, Any] = {}
if title is not None:
data["title"] = title
if body is not None:
data["body"] = body
if state is not None:
data["state"] = state
if assignees is not None:
data["assignees"] = assignees
return Issue.from_dict(
self._patch(f"{self._repo_path(repo)}/issues/{number}", data)
)
def close_issue(self, repo: str, number: int) -> Issue:
"""Close an issue."""
return self.update_issue(repo, number, state="closed")
def assign_issue(self, repo: str, number: int, assignees: list[str]) -> Issue:
"""Assign users to an issue."""
return self.update_issue(repo, number, assignees=assignees)
def add_labels(self, repo: str, number: int, label_ids: list[int]) -> list[Label]:
"""Add labels to an issue."""
raw = self._post(
f"{self._repo_path(repo)}/issues/{number}/labels",
{"labels": label_ids},
)
return [Label.from_dict(lb) for lb in raw]
# -- Comments ------------------------------------------------------------
def list_comments(
self, repo: str, number: int, since: Optional[str] = None
) -> list[Comment]:
"""List comments on an issue."""
raw = self._get(
f"{self._repo_path(repo)}/issues/{number}/comments",
since=since,
)
return [Comment.from_dict(c) for c in raw]
def create_comment(self, repo: str, number: int, body: str) -> Comment:
"""Add a comment to an issue."""
return Comment.from_dict(
self._post(
f"{self._repo_path(repo)}/issues/{number}/comments",
{"body": body},
)
)
# -- Pull Requests -------------------------------------------------------
def list_pulls(
self,
repo: str,
state: str = "open",
sort: str = "newest",
limit: int = 20,
page: int = 1,
) -> list[PullRequest]:
"""List pull requests."""
raw = self._get(
f"{self._repo_path(repo)}/pulls",
state=state,
sort=sort,
limit=limit,
page=page,
)
return [PullRequest.from_dict(p) for p in raw]
def get_pull(self, repo: str, number: int) -> PullRequest:
"""Get a single pull request."""
return PullRequest.from_dict(
self._get(f"{self._repo_path(repo)}/pulls/{number}")
)
def create_pull(
self,
repo: str,
title: str,
head: str,
base: str = "main",
body: str = "",
) -> PullRequest:
"""Create a pull request."""
return PullRequest.from_dict(
self._post(
f"{self._repo_path(repo)}/pulls",
{"title": title, "head": head, "base": base, "body": body},
)
)
def merge_pull(
self,
repo: str,
number: int,
method: str = "squash",
delete_branch: bool = True,
) -> bool:
"""Merge a pull request. Returns True on success."""
try:
self._post(
f"{self._repo_path(repo)}/pulls/{number}/merge",
{"Do": method, "delete_branch_after_merge": delete_branch},
)
return True
except GiteaError as e:
if e.status == 405: # not mergeable
return False
raise
def update_pull_branch(
self, repo: str, number: int, style: str = "rebase"
) -> bool:
"""Update a PR branch (rebase onto base). Returns True on success."""
try:
self._post(
f"{self._repo_path(repo)}/pulls/{number}/update",
{"style": style},
)
return True
except GiteaError:
return False
def close_pull(self, repo: str, number: int) -> PullRequest:
"""Close a pull request without merging."""
return PullRequest.from_dict(
self._patch(
f"{self._repo_path(repo)}/pulls/{number}",
{"state": "closed"},
)
)
def get_pull_files(self, repo: str, number: int) -> list[PRFile]:
"""Get files changed in a pull request."""
raw = self._get(f"{self._repo_path(repo)}/pulls/{number}/files")
return [PRFile.from_dict(f) for f in raw]
def find_pull_by_branch(
self, repo: str, branch: str
) -> Optional[PullRequest]:
"""Find an open PR for a given head branch."""
prs = self.list_pulls(repo, state="open", limit=50)
for pr in prs:
if pr.head_branch == branch:
return pr
return None
# -- Convenience ---------------------------------------------------------
def get_issue_with_comments(
self, repo: str, number: int, last_n: int = 5
) -> tuple[Issue, list[Comment]]:
"""Get an issue and its most recent comments."""
issue = self.get_issue(repo, number)
comments = self.list_comments(repo, number)
return issue, comments[-last_n:] if len(comments) > last_n else comments
def find_unassigned_issues(
self,
repo: str,
limit: int = 30,
exclude_labels: Optional[list[str]] = None,
exclude_title_patterns: Optional[list[str]] = None,
) -> list[Issue]:
"""Find open issues not assigned to anyone."""
issues = self.list_issues(repo, state="open", limit=limit)
result = []
for issue in issues:
if issue.assignees:
continue
if exclude_labels:
issue_label_names = {lb.name for lb in issue.labels}
if issue_label_names & set(exclude_labels):
continue
if exclude_title_patterns:
title_lower = issue.title.lower()
if any(p.lower() in title_lower for p in exclude_title_patterns):
continue
result.append(issue)
return result
def find_agent_issues(self, repo: str, agent: str, limit: int = 50) -> list[Issue]:
"""Find open issues assigned to a specific agent.
Gitea's assignee query can return stale or misleading results, so we
always post-filter on the actual assignee list in the returned issue.
"""
issues = self.list_issues(repo, state="open", assignee=agent, limit=limit)
agent_lower = agent.lower()
return [
issue for issue in issues
if any((assignee.login or "").lower() == agent_lower for assignee in issue.assignees)
]
def find_agent_pulls(self, repo: str, agent: str) -> list[PullRequest]:
"""Find open PRs created by a specific agent."""
prs = self.list_pulls(repo, state="open", limit=50)
return [pr for pr in prs if pr.user.login == agent]