540 lines
17 KiB
Python
540 lines
17 KiB
Python
"""
|
|
Gitea API Client — typed, sovereign, zero-dependency.
|
|
|
|
Replaces raw curl calls scattered across 41 bash scripts.
|
|
Uses only stdlib (urllib) so it works on any Python install.
|
|
|
|
Usage:
|
|
from tools.gitea_client import GiteaClient
|
|
|
|
client = GiteaClient() # reads token from ~/.hermes/gitea_token
|
|
issues = client.list_issues("Timmy_Foundation/the-nexus", state="open")
|
|
client.create_comment("Timmy_Foundation/the-nexus", 42, "PR created.")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _read_token() -> str:
|
|
"""Read Gitea token from standard locations."""
|
|
for path in [
|
|
Path.home() / ".hermes" / "gitea_token",
|
|
Path.home() / ".hermes" / "gitea_token_vps",
|
|
Path.home() / ".config" / "gitea" / "token",
|
|
]:
|
|
if path.exists():
|
|
return path.read_text().strip()
|
|
raise FileNotFoundError(
|
|
"No Gitea token found. Checked: ~/.hermes/gitea_token, "
|
|
"~/.hermes/gitea_token_vps, ~/.config/gitea/token"
|
|
)
|
|
|
|
|
|
def _read_base_url() -> str:
|
|
"""Read Gitea base URL. Defaults to the VPS."""
|
|
env = os.environ.get("GITEA_URL")
|
|
if env:
|
|
return env.rstrip("/")
|
|
return "http://143.198.27.163:3000"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes — typed responses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class User:
|
|
id: int
|
|
login: str
|
|
full_name: str = ""
|
|
email: str = ""
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "User":
|
|
return cls(
|
|
id=d.get("id", 0),
|
|
login=d.get("login", ""),
|
|
full_name=d.get("full_name", ""),
|
|
email=d.get("email", ""),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Label:
|
|
id: int
|
|
name: str
|
|
color: str = ""
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "Label":
|
|
return cls(id=d.get("id", 0), name=d.get("name", ""), color=d.get("color", ""))
|
|
|
|
|
|
@dataclass
|
|
class Issue:
|
|
number: int
|
|
title: str
|
|
body: str
|
|
state: str
|
|
user: User
|
|
assignees: list[User] = field(default_factory=list)
|
|
labels: list[Label] = field(default_factory=list)
|
|
created_at: str = ""
|
|
updated_at: str = ""
|
|
comments: int = 0
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "Issue":
|
|
return cls(
|
|
number=d.get("number", 0),
|
|
title=d.get("title", ""),
|
|
body=d.get("body", "") or "",
|
|
state=d.get("state", ""),
|
|
user=User.from_dict(d.get("user", {})),
|
|
assignees=[User.from_dict(a) for a in d.get("assignees", []) or []],
|
|
labels=[Label.from_dict(lb) for lb in d.get("labels", []) or []],
|
|
created_at=d.get("created_at", ""),
|
|
updated_at=d.get("updated_at", ""),
|
|
comments=d.get("comments", 0),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Comment:
|
|
id: int
|
|
body: str
|
|
user: User
|
|
created_at: str = ""
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "Comment":
|
|
return cls(
|
|
id=d.get("id", 0),
|
|
body=d.get("body", "") or "",
|
|
user=User.from_dict(d.get("user", {})),
|
|
created_at=d.get("created_at", ""),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class PullRequest:
|
|
number: int
|
|
title: str
|
|
body: str
|
|
state: str
|
|
user: User
|
|
head_branch: str = ""
|
|
base_branch: str = ""
|
|
mergeable: bool = False
|
|
merged: bool = False
|
|
changed_files: int = 0
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "PullRequest":
|
|
head = d.get("head", {}) or {}
|
|
base = d.get("base", {}) or {}
|
|
return cls(
|
|
number=d.get("number", 0),
|
|
title=d.get("title", ""),
|
|
body=d.get("body", "") or "",
|
|
state=d.get("state", ""),
|
|
user=User.from_dict(d.get("user", {})),
|
|
head_branch=head.get("ref", ""),
|
|
base_branch=base.get("ref", ""),
|
|
mergeable=d.get("mergeable", False),
|
|
merged=d.get("merged", False) or False,
|
|
changed_files=d.get("changed_files", 0),
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class PRFile:
|
|
filename: str
|
|
status: str # added, modified, deleted
|
|
additions: int = 0
|
|
deletions: int = 0
|
|
|
|
@classmethod
|
|
def from_dict(cls, d: dict) -> "PRFile":
|
|
return cls(
|
|
filename=d.get("filename", ""),
|
|
status=d.get("status", ""),
|
|
additions=d.get("additions", 0),
|
|
deletions=d.get("deletions", 0),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class GiteaError(Exception):
|
|
"""Gitea API error with status code."""
|
|
def __init__(self, status: int, message: str, url: str = ""):
|
|
self.status = status
|
|
self.url = url
|
|
super().__init__(f"Gitea {status}: {message} [{url}]")
|
|
|
|
|
|
class GiteaClient:
|
|
"""
|
|
Typed Gitea API client. Sovereign, zero-dependency.
|
|
|
|
Covers all operations the agent loops need:
|
|
- Issues: list, get, create, update, close, assign, label, comment
|
|
- PRs: list, get, create, merge, update, close, files
|
|
- Repos: list org repos
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: Optional[str] = None,
|
|
token: Optional[str] = None,
|
|
):
|
|
self.base_url = base_url or _read_base_url()
|
|
self.token = token or _read_token()
|
|
self.api = f"{self.base_url}/api/v1"
|
|
|
|
# -- HTTP layer ----------------------------------------------------------
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: Optional[dict] = None,
|
|
params: Optional[dict] = None,
|
|
) -> Any:
|
|
"""Make an authenticated API request. Returns parsed JSON."""
|
|
url = f"{self.api}{path}"
|
|
if params:
|
|
url += "?" + urllib.parse.urlencode(params)
|
|
|
|
body = json.dumps(data).encode() if data else None
|
|
req = urllib.request.Request(url, data=body, method=method)
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("Accept", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read().decode()
|
|
if not raw:
|
|
return {}
|
|
return json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
body_text = ""
|
|
try:
|
|
body_text = e.read().decode()
|
|
except Exception:
|
|
pass
|
|
raise GiteaError(e.code, body_text, url) from e
|
|
|
|
def _get(self, path: str, **params) -> Any:
|
|
# Filter out None values
|
|
clean = {k: v for k, v in params.items() if v is not None}
|
|
return self._request("GET", path, params=clean)
|
|
|
|
def _post(self, path: str, data: dict) -> Any:
|
|
return self._request("POST", path, data=data)
|
|
|
|
def _patch(self, path: str, data: dict) -> Any:
|
|
return self._request("PATCH", path, data=data)
|
|
|
|
def _delete(self, path: str) -> Any:
|
|
return self._request("DELETE", path)
|
|
|
|
def _repo_path(self, repo: str) -> str:
|
|
"""Convert 'owner/name' to '/repos/owner/name'."""
|
|
return f"/repos/{repo}"
|
|
|
|
# -- Health --------------------------------------------------------------
|
|
|
|
def ping(self) -> bool:
|
|
"""Check if Gitea is responding."""
|
|
try:
|
|
self._get("/version")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# -- Repos ---------------------------------------------------------------
|
|
|
|
def list_org_repos(self, org: str, limit: int = 50) -> list[dict]:
|
|
"""List repos in an organization."""
|
|
return self._get(f"/orgs/{org}/repos", limit=limit)
|
|
|
|
# -- Issues --------------------------------------------------------------
|
|
|
|
def list_issues(
|
|
self,
|
|
repo: str,
|
|
state: str = "open",
|
|
assignee: Optional[str] = None,
|
|
labels: Optional[str] = None,
|
|
sort: str = "created",
|
|
direction: str = "desc",
|
|
limit: int = 30,
|
|
page: int = 1,
|
|
) -> list[Issue]:
|
|
"""List issues for a repo."""
|
|
raw = self._get(
|
|
f"{self._repo_path(repo)}/issues",
|
|
state=state,
|
|
type="issues",
|
|
assignee=assignee,
|
|
labels=labels,
|
|
sort=sort,
|
|
direction=direction,
|
|
limit=limit,
|
|
page=page,
|
|
)
|
|
return [Issue.from_dict(i) for i in raw]
|
|
|
|
def get_issue(self, repo: str, number: int) -> Issue:
|
|
"""Get a single issue."""
|
|
return Issue.from_dict(
|
|
self._get(f"{self._repo_path(repo)}/issues/{number}")
|
|
)
|
|
|
|
def create_issue(
|
|
self,
|
|
repo: str,
|
|
title: str,
|
|
body: str = "",
|
|
labels: Optional[list[int]] = None,
|
|
assignees: Optional[list[str]] = None,
|
|
) -> Issue:
|
|
"""Create an issue."""
|
|
data: dict[str, Any] = {"title": title, "body": body}
|
|
if labels:
|
|
data["labels"] = labels
|
|
if assignees:
|
|
data["assignees"] = assignees
|
|
return Issue.from_dict(
|
|
self._post(f"{self._repo_path(repo)}/issues", data)
|
|
)
|
|
|
|
def update_issue(
|
|
self,
|
|
repo: str,
|
|
number: int,
|
|
title: Optional[str] = None,
|
|
body: Optional[str] = None,
|
|
state: Optional[str] = None,
|
|
assignees: Optional[list[str]] = None,
|
|
) -> Issue:
|
|
"""Update an issue (title, body, state, assignees)."""
|
|
data: dict[str, Any] = {}
|
|
if title is not None:
|
|
data["title"] = title
|
|
if body is not None:
|
|
data["body"] = body
|
|
if state is not None:
|
|
data["state"] = state
|
|
if assignees is not None:
|
|
data["assignees"] = assignees
|
|
return Issue.from_dict(
|
|
self._patch(f"{self._repo_path(repo)}/issues/{number}", data)
|
|
)
|
|
|
|
def close_issue(self, repo: str, number: int) -> Issue:
|
|
"""Close an issue."""
|
|
return self.update_issue(repo, number, state="closed")
|
|
|
|
def assign_issue(self, repo: str, number: int, assignees: list[str]) -> Issue:
|
|
"""Assign users to an issue."""
|
|
return self.update_issue(repo, number, assignees=assignees)
|
|
|
|
def add_labels(self, repo: str, number: int, label_ids: list[int]) -> list[Label]:
|
|
"""Add labels to an issue."""
|
|
raw = self._post(
|
|
f"{self._repo_path(repo)}/issues/{number}/labels",
|
|
{"labels": label_ids},
|
|
)
|
|
return [Label.from_dict(lb) for lb in raw]
|
|
|
|
# -- Comments ------------------------------------------------------------
|
|
|
|
def list_comments(
|
|
self, repo: str, number: int, since: Optional[str] = None
|
|
) -> list[Comment]:
|
|
"""List comments on an issue."""
|
|
raw = self._get(
|
|
f"{self._repo_path(repo)}/issues/{number}/comments",
|
|
since=since,
|
|
)
|
|
return [Comment.from_dict(c) for c in raw]
|
|
|
|
def create_comment(self, repo: str, number: int, body: str) -> Comment:
|
|
"""Add a comment to an issue."""
|
|
return Comment.from_dict(
|
|
self._post(
|
|
f"{self._repo_path(repo)}/issues/{number}/comments",
|
|
{"body": body},
|
|
)
|
|
)
|
|
|
|
# -- Pull Requests -------------------------------------------------------
|
|
|
|
def list_pulls(
|
|
self,
|
|
repo: str,
|
|
state: str = "open",
|
|
sort: str = "newest",
|
|
limit: int = 20,
|
|
page: int = 1,
|
|
) -> list[PullRequest]:
|
|
"""List pull requests."""
|
|
raw = self._get(
|
|
f"{self._repo_path(repo)}/pulls",
|
|
state=state,
|
|
sort=sort,
|
|
limit=limit,
|
|
page=page,
|
|
)
|
|
return [PullRequest.from_dict(p) for p in raw]
|
|
|
|
def get_pull(self, repo: str, number: int) -> PullRequest:
|
|
"""Get a single pull request."""
|
|
return PullRequest.from_dict(
|
|
self._get(f"{self._repo_path(repo)}/pulls/{number}")
|
|
)
|
|
|
|
def create_pull(
|
|
self,
|
|
repo: str,
|
|
title: str,
|
|
head: str,
|
|
base: str = "main",
|
|
body: str = "",
|
|
) -> PullRequest:
|
|
"""Create a pull request."""
|
|
return PullRequest.from_dict(
|
|
self._post(
|
|
f"{self._repo_path(repo)}/pulls",
|
|
{"title": title, "head": head, "base": base, "body": body},
|
|
)
|
|
)
|
|
|
|
def merge_pull(
|
|
self,
|
|
repo: str,
|
|
number: int,
|
|
method: str = "squash",
|
|
delete_branch: bool = True,
|
|
) -> bool:
|
|
"""Merge a pull request. Returns True on success."""
|
|
try:
|
|
self._post(
|
|
f"{self._repo_path(repo)}/pulls/{number}/merge",
|
|
{"Do": method, "delete_branch_after_merge": delete_branch},
|
|
)
|
|
return True
|
|
except GiteaError as e:
|
|
if e.status == 405: # not mergeable
|
|
return False
|
|
raise
|
|
|
|
def update_pull_branch(
|
|
self, repo: str, number: int, style: str = "rebase"
|
|
) -> bool:
|
|
"""Update a PR branch (rebase onto base). Returns True on success."""
|
|
try:
|
|
self._post(
|
|
f"{self._repo_path(repo)}/pulls/{number}/update",
|
|
{"style": style},
|
|
)
|
|
return True
|
|
except GiteaError:
|
|
return False
|
|
|
|
def close_pull(self, repo: str, number: int) -> PullRequest:
|
|
"""Close a pull request without merging."""
|
|
return PullRequest.from_dict(
|
|
self._patch(
|
|
f"{self._repo_path(repo)}/pulls/{number}",
|
|
{"state": "closed"},
|
|
)
|
|
)
|
|
|
|
def get_pull_files(self, repo: str, number: int) -> list[PRFile]:
|
|
"""Get files changed in a pull request."""
|
|
raw = self._get(f"{self._repo_path(repo)}/pulls/{number}/files")
|
|
return [PRFile.from_dict(f) for f in raw]
|
|
|
|
def find_pull_by_branch(
|
|
self, repo: str, branch: str
|
|
) -> Optional[PullRequest]:
|
|
"""Find an open PR for a given head branch."""
|
|
prs = self.list_pulls(repo, state="open", limit=50)
|
|
for pr in prs:
|
|
if pr.head_branch == branch:
|
|
return pr
|
|
return None
|
|
|
|
# -- Convenience ---------------------------------------------------------
|
|
|
|
def get_issue_with_comments(
|
|
self, repo: str, number: int, last_n: int = 5
|
|
) -> tuple[Issue, list[Comment]]:
|
|
"""Get an issue and its most recent comments."""
|
|
issue = self.get_issue(repo, number)
|
|
comments = self.list_comments(repo, number)
|
|
return issue, comments[-last_n:] if len(comments) > last_n else comments
|
|
|
|
def find_unassigned_issues(
|
|
self,
|
|
repo: str,
|
|
limit: int = 30,
|
|
exclude_labels: Optional[list[str]] = None,
|
|
exclude_title_patterns: Optional[list[str]] = None,
|
|
) -> list[Issue]:
|
|
"""Find open issues not assigned to anyone."""
|
|
issues = self.list_issues(repo, state="open", limit=limit)
|
|
result = []
|
|
for issue in issues:
|
|
if issue.assignees:
|
|
continue
|
|
if exclude_labels:
|
|
issue_label_names = {lb.name for lb in issue.labels}
|
|
if issue_label_names & set(exclude_labels):
|
|
continue
|
|
if exclude_title_patterns:
|
|
title_lower = issue.title.lower()
|
|
if any(p.lower() in title_lower for p in exclude_title_patterns):
|
|
continue
|
|
result.append(issue)
|
|
return result
|
|
|
|
def find_agent_issues(self, repo: str, agent: str, limit: int = 50) -> list[Issue]:
|
|
"""Find open issues assigned to a specific agent.
|
|
|
|
Gitea's assignee query can return stale or misleading results, so we
|
|
always post-filter on the actual assignee list in the returned issue.
|
|
"""
|
|
issues = self.list_issues(repo, state="open", assignee=agent, limit=limit)
|
|
agent_lower = agent.lower()
|
|
return [
|
|
issue for issue in issues
|
|
if any((assignee.login or "").lower() == agent_lower for assignee in issue.assignees)
|
|
]
|
|
|
|
def find_agent_pulls(self, repo: str, agent: str) -> list[PullRequest]:
|
|
"""Find open PRs created by a specific agent."""
|
|
prs = self.list_pulls(repo, state="open", limit=50)
|
|
return [pr for pr in prs if pr.user.login == agent]
|