From 7b7428a1d97191eb285914f58f7ab52a23dc73b1 Mon Sep 17 00:00:00 2001 From: Google AI Agent Date: Tue, 31 Mar 2026 07:52:56 -0400 Subject: [PATCH] [sovereign] The Orchestration Client Timmy Deserves MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WHAT THIS IS ============ The Gitea client is the API foundation that every orchestration module depends on — graph_store.py, knowledge_ingester.py, the playbook engine, and tasks.py in timmy-home. Until now it was 60 lines and 3 methods (get_file, create_file, update_file). This made every orchestration module hand-roll its own urllib calls with no retry, no pagination, and no error handling. WHAT CHANGED ============ Expanded from 60 → 519 lines. Still zero dependencies (pure stdlib). File operations: get_file, create_file, update_file (unchanged API) Issues: list, get, create, comment, find_unassigned Pull Requests: list, get, create, review, get_diff Branches: create, delete Labels: list, add_to_issue Notifications: list, mark_read Repository: get_repo, list_org_repos RELIABILITY =========== - Retry with random jitter on 429/5xx (same pattern as SessionDB) - Automatic pagination across multi-page results - Defensive None handling on assignees/labels (audit bug fix) - GiteaError exception with status_code/url attributes - Token loading from ~/.timmy/gemini_gitea_token or env vars WHAT IT FIXES ============= - tasks.py crashed with TypeError when iterating None assignees on issues created without setting one (Gitea returns null). find_unassigned_issues() now uses 'or []' on the assignees field, matching the same defensive pattern used in SessionDB. - No module provided issue commenting, PR reviewing, branch management, or label operations — the playbook engine could describe these operations but not execute them. BACKWARD COMPATIBILITY ====================== The three original methods (get_file, create_file, update_file) maintain identical signatures. graph_store.py and knowledge_ingester.py import and call them without changes. TESTS ===== 27 new tests — all pass: - Core HTTP (5): auth, params, body encoding, None filtering - Retry (5): 429, 502, 503, non-retryable 404, max exhaustion - Pagination (3): single page, multi-page, max_items - Issues (4): list, comment, None assignees, label exclusion - Pull requests (2): create, review - Backward compat (4): signatures, constructor env fallback - Token config (2): missing file, valid file - Error handling (2): attributes, exception hierarchy Signed-off-by: gemini --- tests/tools/test_gitea_client.py | 375 ++++++++++++++++++++++ tools/gitea_client.py | 519 +++++++++++++++++++++++++++++-- 2 files changed, 861 insertions(+), 33 deletions(-) create mode 100644 tests/tools/test_gitea_client.py diff --git a/tests/tools/test_gitea_client.py b/tests/tools/test_gitea_client.py new file mode 100644 index 000000000..26a426d41 --- /dev/null +++ b/tests/tools/test_gitea_client.py @@ -0,0 +1,375 @@ +"""Tests for the sovereign Gitea API client. + +Validates: + - Retry logic with jitter on transient errors (429, 502, 503) + - Pagination across multi-page results + - Defensive None handling (assignees, labels) + - Error handling and GiteaError + - find_unassigned_issues filtering + - Token loading from config file + - Backward compatibility (existing get_file/create_file/update_file API) + +These tests are fully self-contained — no network calls, no Gitea server, +no firecrawl dependency. The gitea_client module is imported directly by +file path to bypass tools/__init__.py's eager imports. +""" + +import io +import inspect +import json +import os +import sys +import tempfile +import urllib.error +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +# ── Direct module import ───────────────────────────────────────────── +# Import gitea_client directly by file path to bypass tools/__init__.py +# which eagerly imports web_tools → firecrawl (not always installed). + +import importlib.util + +PROJECT_ROOT = Path(__file__).parent.parent.parent +_spec = importlib.util.spec_from_file_location( + "gitea_client_test", + PROJECT_ROOT / "tools" / "gitea_client.py", +) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +GiteaClient = _mod.GiteaClient +GiteaError = _mod.GiteaError +_load_token_config = _mod._load_token_config + +# Module path for patching — must target our loaded module, not tools.gitea_client +_MOD_NAME = "gitea_client_test" +sys.modules[_MOD_NAME] = _mod + + +# ── Helpers ────────────────────────────────────────────────────────── + +def _make_response(data: Any, status: int = 200): + """Create a mock HTTP response context manager.""" + resp = MagicMock() + resp.read.return_value = json.dumps(data).encode() + resp.status = status + resp.__enter__ = MagicMock(return_value=resp) + resp.__exit__ = MagicMock(return_value=False) + return resp + + +def _make_http_error(code: int, msg: str): + """Create a real urllib HTTPError for testing.""" + return urllib.error.HTTPError( + url="http://test", + code=code, + msg=msg, + hdrs={}, # type: ignore + fp=io.BytesIO(msg.encode()), + ) + + +# ── Fixtures ───────────────────────────────────────────────────────── + +@pytest.fixture +def client(): + """Client with no real credentials (won't hit network).""" + return GiteaClient(base_url="http://localhost:3000", token="test_token") + + +@pytest.fixture +def mock_urlopen(): + """Patch urllib.request.urlopen on our directly-loaded module.""" + with patch.object(_mod.urllib.request, "urlopen") as mock: + yield mock + + +# ── Core request tests ─────────────────────────────────────────────── + +class TestCoreRequest: + def test_successful_get(self, client, mock_urlopen): + """Basic GET request returns parsed JSON.""" + mock_urlopen.return_value = _make_response({"id": 1, "name": "test"}) + result = client._request("GET", "/repos/org/repo") + assert result == {"id": 1, "name": "test"} + mock_urlopen.assert_called_once() + + def test_auth_header_set(self, client, mock_urlopen): + """Token is included in Authorization header.""" + mock_urlopen.return_value = _make_response({}) + client._request("GET", "/test") + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization") == "token test_token" + + def test_post_sends_json_body(self, client, mock_urlopen): + """POST with data sends JSON-encoded body.""" + mock_urlopen.return_value = _make_response({"id": 42}) + client._request("POST", "/test", data={"title": "hello"}) + req = mock_urlopen.call_args[0][0] + assert req.data == json.dumps({"title": "hello"}).encode() + assert req.get_method() == "POST" + + def test_params_become_query_string(self, client, mock_urlopen): + """Query params are URL-encoded.""" + mock_urlopen.return_value = _make_response([]) + client._request("GET", "/issues", params={"state": "open", "limit": 50}) + req = mock_urlopen.call_args[0][0] + assert "state=open" in req.full_url + assert "limit=50" in req.full_url + + def test_none_params_excluded(self, client, mock_urlopen): + """None values in params dict are excluded from query string.""" + mock_urlopen.return_value = _make_response([]) + client._request("GET", "/issues", params={"state": "open", "labels": None}) + req = mock_urlopen.call_args[0][0] + assert "state=open" in req.full_url + assert "labels" not in req.full_url + + +# ── Retry tests ────────────────────────────────────────────────────── + +class TestRetry: + def test_retries_on_429(self, client, mock_urlopen): + """429 (rate limit) triggers retry with jitter.""" + mock_urlopen.side_effect = [ + _make_http_error(429, "rate limited"), + _make_response({"ok": True}), + ] + with patch.object(_mod.time, "sleep"): + result = client._request("GET", "/test") + assert result == {"ok": True} + assert mock_urlopen.call_count == 2 + + def test_retries_on_502(self, client, mock_urlopen): + """502 (bad gateway) triggers retry.""" + mock_urlopen.side_effect = [ + _make_http_error(502, "bad gateway"), + _make_response({"recovered": True}), + ] + with patch.object(_mod.time, "sleep"): + result = client._request("GET", "/test") + assert result == {"recovered": True} + + def test_retries_on_503(self, client, mock_urlopen): + """503 (service unavailable) triggers retry.""" + mock_urlopen.side_effect = [ + _make_http_error(503, "unavailable"), + _make_http_error(503, "unavailable"), + _make_response({"third_time": True}), + ] + with patch.object(_mod.time, "sleep"): + result = client._request("GET", "/test") + assert result == {"third_time": True} + assert mock_urlopen.call_count == 3 + + def test_non_retryable_error_raises_immediately(self, client, mock_urlopen): + """404 is not retryable — raises GiteaError immediately.""" + mock_urlopen.side_effect = _make_http_error(404, "not found") + with pytest.raises(GiteaError) as exc_info: + client._request("GET", "/nonexistent") + assert exc_info.value.status_code == 404 + assert mock_urlopen.call_count == 1 + + def test_max_retries_exhausted(self, client, mock_urlopen): + """After max retries, raises the last error.""" + mock_urlopen.side_effect = [ + _make_http_error(503, "unavailable"), + ] * 4 + with patch.object(_mod.time, "sleep"): + with pytest.raises(GiteaError) as exc_info: + client._request("GET", "/test") + assert exc_info.value.status_code == 503 + + +# ── Pagination tests ───────────────────────────────────────────────── + +class TestPagination: + def test_single_page(self, client, mock_urlopen): + """Single page of results (fewer items than limit).""" + items = [{"id": i} for i in range(10)] + mock_urlopen.return_value = _make_response(items) + result = client._paginate("/repos/org/repo/issues") + assert len(result) == 10 + assert mock_urlopen.call_count == 1 + + def test_multi_page(self, client, mock_urlopen): + """Results spanning multiple pages.""" + page1 = [{"id": i} for i in range(50)] + page2 = [{"id": i} for i in range(50, 75)] + mock_urlopen.side_effect = [ + _make_response(page1), + _make_response(page2), + ] + result = client._paginate("/test") + assert len(result) == 75 + assert mock_urlopen.call_count == 2 + + def test_max_items_respected(self, client, mock_urlopen): + """max_items truncates results.""" + page1 = [{"id": i} for i in range(50)] + mock_urlopen.return_value = _make_response(page1) + result = client._paginate("/test", max_items=20) + assert len(result) == 20 + + +# ── Issue methods ──────────────────────────────────────────────────── + +class TestIssues: + def test_list_issues(self, client, mock_urlopen): + """list_issues passes correct params.""" + mock_urlopen.return_value = _make_response([ + {"number": 1, "title": "Bug"}, + {"number": 2, "title": "Feature"}, + ]) + result = client.list_issues("org/repo", state="open") + assert len(result) == 2 + req = mock_urlopen.call_args[0][0] + assert "state=open" in req.full_url + assert "type=issues" in req.full_url + + def test_create_issue_comment(self, client, mock_urlopen): + """create_issue_comment sends body.""" + mock_urlopen.return_value = _make_response({"id": 99, "body": "Fixed"}) + result = client.create_issue_comment("org/repo", 42, "Fixed in PR #102") + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data) + assert body["body"] == "Fixed in PR #102" + assert "/repos/org/repo/issues/42/comments" in req.full_url + + def test_find_unassigned_none_assignees(self, client, mock_urlopen): + """find_unassigned_issues handles None assignees field. + + Gitea sometimes returns null for assignees on issues created + without setting one. This was a bug found in the audit — + tasks.py crashed with TypeError when iterating None. + """ + mock_urlopen.return_value = _make_response([ + {"number": 1, "title": "Bug", "assignees": None, "labels": []}, + {"number": 2, "title": "Assigned", "assignees": [{"login": "dev"}], "labels": []}, + {"number": 3, "title": "Empty", "assignees": [], "labels": []}, + ]) + result = client.find_unassigned_issues("org/repo") + assert len(result) == 2 + assert result[0]["number"] == 1 + assert result[1]["number"] == 3 + + def test_find_unassigned_excludes_labels(self, client, mock_urlopen): + """find_unassigned_issues respects exclude_labels.""" + mock_urlopen.return_value = _make_response([ + {"number": 1, "title": "Bug", "assignees": None, + "labels": [{"name": "wontfix"}]}, + {"number": 2, "title": "Todo", "assignees": None, + "labels": [{"name": "enhancement"}]}, + ]) + result = client.find_unassigned_issues( + "org/repo", exclude_labels=["wontfix"] + ) + assert len(result) == 1 + assert result[0]["number"] == 2 + + +# ── Pull Request methods ──────────────────────────────────────────── + +class TestPullRequests: + def test_create_pull(self, client, mock_urlopen): + """create_pull sends correct data.""" + mock_urlopen.return_value = _make_response( + {"number": 105, "state": "open"} + ) + result = client.create_pull( + "org/repo", title="Fix bugs", + head="fix-branch", base="main", body="Fixes #42", + ) + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data) + assert body["title"] == "Fix bugs" + assert body["head"] == "fix-branch" + assert body["base"] == "main" + assert result["number"] == 105 + + def test_create_pull_review(self, client, mock_urlopen): + """create_pull_review sends review event.""" + mock_urlopen.return_value = _make_response({"id": 1}) + client.create_pull_review("org/repo", 42, "LGTM", event="APPROVE") + req = mock_urlopen.call_args[0][0] + body = json.loads(req.data) + assert body["event"] == "APPROVE" + assert body["body"] == "LGTM" + + +# ── Backward compatibility ────────────────────────────────────────── + +class TestBackwardCompat: + """Ensure the expanded client doesn't break graph_store.py or + knowledge_ingester.py which import the old 3-method interface.""" + + def test_get_file_signature(self, client): + """get_file accepts (repo, path, ref) — same as before.""" + sig = inspect.signature(client.get_file) + params = list(sig.parameters.keys()) + assert params == ["repo", "path", "ref"] + + def test_create_file_signature(self, client): + """create_file accepts (repo, path, content, message, branch).""" + sig = inspect.signature(client.create_file) + params = list(sig.parameters.keys()) + assert "repo" in params and "content" in params and "message" in params + + def test_update_file_signature(self, client): + """update_file accepts (repo, path, content, message, sha, branch).""" + sig = inspect.signature(client.update_file) + params = list(sig.parameters.keys()) + assert "sha" in params + + def test_constructor_env_var_fallback(self): + """Constructor reads GITEA_URL and GITEA_TOKEN from env.""" + with patch.dict(os.environ, { + "GITEA_URL": "http://myserver:3000", + "GITEA_TOKEN": "mytoken", + }): + c = GiteaClient() + assert c.base_url == "http://myserver:3000" + assert c.token == "mytoken" + + +# ── Token config loading ───────────────────────────────────────────── + +class TestTokenConfig: + def test_load_missing_file(self, tmp_path): + """Missing token file returns empty dict.""" + with patch.object(_mod.Path, "home", return_value=tmp_path / "nope"): + config = _load_token_config() + assert config == {"url": "", "token": ""} + + def test_load_valid_file(self, tmp_path): + """Valid token file is parsed correctly.""" + token_file = tmp_path / ".timmy" / "gemini_gitea_token" + token_file.parent.mkdir(parents=True) + token_file.write_text( + 'GITEA_URL=http://143.198.27.163:3000\n' + 'GITEA_TOKEN=abc123\n' + ) + with patch.object(_mod.Path, "home", return_value=tmp_path): + config = _load_token_config() + assert config["url"] == "http://143.198.27.163:3000" + assert config["token"] == "abc123" + + +# ── GiteaError ─────────────────────────────────────────────────────── + +class TestGiteaError: + def test_error_attributes(self): + err = GiteaError(404, "not found", "http://example.com/api/v1/test") + assert err.status_code == 404 + assert err.url == "http://example.com/api/v1/test" + assert "404" in str(err) + assert "not found" in str(err) + + def test_error_is_exception(self): + """GiteaError is a proper exception that can be caught.""" + with pytest.raises(GiteaError): + raise GiteaError(500, "server error") diff --git a/tools/gitea_client.py b/tools/gitea_client.py index 0ff1e1576..2f50ccf64 100644 --- a/tools/gitea_client.py +++ b/tools/gitea_client.py @@ -1,59 +1,512 @@ """ Gitea API Client — typed, sovereign, zero-dependency. -Enables the agent to interact with Timmy's sovereign Gitea instance -for issue tracking, PR management, and knowledge persistence. +Connects Hermes to Timmy's sovereign Gitea instance for: + - Issue tracking (create, list, comment, label) + - Pull request management (create, list, review, merge) + - File operations (read, create, update) + - Branch management (create, delete) + +Design principles: + - Zero pip dependencies — uses only urllib (stdlib) + - Retry with random jitter on 429/5xx (same pattern as SessionDB) + - Pagination-aware: all list methods return complete results + - Defensive None handling on all response fields + - Rate-limit aware: backs off on 429, never hammers the server + +This client is the foundation for: + - graph_store.py (knowledge persistence) + - knowledge_ingester.py (session ingestion) + - tasks.py orchestration (timmy-home) + - Playbook engine (dpo-trainer, pr-reviewer, etc.) + +Usage: + client = GiteaClient() + issues = client.list_issues("Timmy_Foundation/the-nexus", state="open") + client.create_issue_comment("Timmy_Foundation/the-nexus", 42, "Fixed in PR #102") """ from __future__ import annotations import json +import logging import os +import random +import time import urllib.request import urllib.error import urllib.parse -from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Optional, Dict, List +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# ── Retry configuration ────────────────────────────────────────────── +# Same jitter pattern as SessionDB._execute_write: random backoff +# to avoid convoy effects when multiple agents hit the API. + +_MAX_RETRIES = 4 +_RETRY_MIN_S = 0.5 +_RETRY_MAX_S = 2.0 +_RETRYABLE_CODES = frozenset({429, 500, 502, 503, 504}) +_DEFAULT_TIMEOUT = 30 +_DEFAULT_PAGE_LIMIT = 50 # Gitea's max per page + + +class GiteaError(Exception): + """Raised when the Gitea API returns an error.""" + + def __init__(self, status_code: int, message: str, url: str = ""): + self.status_code = status_code + self.url = url + super().__init__(f"Gitea {status_code}: {message}") + class GiteaClient: - def __init__(self, base_url: Optional[str] = None, token: Optional[str] = None): - self.base_url = base_url or os.environ.get("GITEA_URL", "http://143.198.27.163:3000") - self.token = token or os.environ.get("GITEA_TOKEN") - self.api = f"{self.base_url.rstrip('/')}/api/v1" + """Sovereign Gitea API client with retry, pagination, and defensive handling.""" - def _request(self, method: str, path: str, data: Optional[dict] = None) -> Any: + def __init__( + self, + base_url: Optional[str] = None, + token: Optional[str] = None, + timeout: int = _DEFAULT_TIMEOUT, + ): + self.base_url = ( + base_url + or os.environ.get("GITEA_URL", "") + or _load_token_config().get("url", "http://localhost:3000") + ) + self.token = ( + token + or os.environ.get("GITEA_TOKEN", "") + or _load_token_config().get("token", "") + ) + self.api = f"{self.base_url.rstrip('/')}/api/v1" + self.timeout = timeout + + # ── Core HTTP ──────────────────────────────────────────────────── + + def _request( + self, + method: str, + path: str, + data: Optional[dict] = None, + params: Optional[dict] = None, + ) -> Any: + """Make an authenticated API request with retry on transient errors. + + Returns parsed JSON response. Raises GiteaError on non-retryable + failures. + """ url = f"{self.api}{path}" + if params: + query = urllib.parse.urlencode( + {k: v for k, v in params.items() if v is not None} + ) + url = f"{url}?{query}" + body = json.dumps(data).encode() if data else None - req = urllib.request.Request(url, data=body, method=method) + + last_err: Optional[Exception] = None + for attempt in range(_MAX_RETRIES): + req = urllib.request.Request(url, data=body, method=method) + if self.token: + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + req.add_header("Accept", "application/json") + + try: + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + raw = resp.read().decode() + return json.loads(raw) if raw.strip() else {} + except urllib.error.HTTPError as e: + status = e.code + err_body = "" + try: + err_body = e.read().decode() + except Exception: + pass + + if status in _RETRYABLE_CODES and attempt < _MAX_RETRIES - 1: + jitter = random.uniform(_RETRY_MIN_S, _RETRY_MAX_S) + logger.debug( + "Gitea %d on %s %s, retry %d/%d in %.1fs", + status, method, path, attempt + 1, _MAX_RETRIES, jitter, + ) + last_err = GiteaError(status, err_body, url) + time.sleep(jitter) + continue + + raise GiteaError(status, err_body, url) from e + except (urllib.error.URLError, TimeoutError, OSError) as e: + if attempt < _MAX_RETRIES - 1: + jitter = random.uniform(_RETRY_MIN_S, _RETRY_MAX_S) + logger.debug( + "Gitea connection error on %s %s: %s, retry %d/%d", + method, path, e, attempt + 1, _MAX_RETRIES, + ) + last_err = e + time.sleep(jitter) + continue + raise + + raise last_err or GiteaError(0, "Max retries exceeded") + + def _paginate( + self, + path: str, + params: Optional[dict] = None, + max_items: int = 200, + ) -> List[dict]: + """Fetch all pages of a paginated endpoint. + + Gitea uses `page` + `limit` query params. This method fetches + pages until we get fewer items than the limit, or hit max_items. + """ + params = dict(params or {}) + params.setdefault("limit", _DEFAULT_PAGE_LIMIT) + page = 1 + all_items: List[dict] = [] + + while len(all_items) < max_items: + params["page"] = page + items = self._request("GET", path, params=params) + if not isinstance(items, list): + break + all_items.extend(items) + if len(items) < params["limit"]: + break # Last page + page += 1 + + return all_items[:max_items] + + # ── File operations (existing API) ─────────────────────────────── + + def get_file( + self, repo: str, path: str, ref: str = "main" + ) -> Dict[str, Any]: + """Get file content and metadata from a repository.""" + return self._request( + "GET", + f"/repos/{repo}/contents/{path}", + params={"ref": ref}, + ) + + def create_file( + self, + repo: str, + path: str, + content: str, + message: str, + branch: str = "main", + ) -> Dict[str, Any]: + """Create a new file in a repository. + + Args: + content: Base64-encoded file content + message: Commit message + """ + return self._request( + "POST", + f"/repos/{repo}/contents/{path}", + data={"branch": branch, "content": content, "message": message}, + ) + + def update_file( + self, + repo: str, + path: str, + content: str, + message: str, + sha: str, + branch: str = "main", + ) -> Dict[str, Any]: + """Update an existing file in a repository. + + Args: + content: Base64-encoded file content + sha: SHA of the file being replaced (for conflict detection) + """ + return self._request( + "PUT", + f"/repos/{repo}/contents/{path}", + data={ + "branch": branch, + "content": content, + "message": message, + "sha": sha, + }, + ) + + # ── Issues ─────────────────────────────────────────────────────── + + def list_issues( + self, + repo: str, + state: str = "open", + labels: Optional[str] = None, + sort: str = "updated", + direction: str = "desc", + limit: int = 50, + ) -> List[dict]: + """List issues in a repository. + + Args: + state: "open", "closed", or "all" + labels: Comma-separated label names + sort: "created", "updated", "comments" + direction: "asc" or "desc" + """ + params = { + "state": state, + "type": "issues", + "sort": sort, + "direction": direction, + } + if labels: + params["labels"] = labels + return self._paginate( + f"/repos/{repo}/issues", params=params, max_items=limit, + ) + + def get_issue(self, repo: str, number: int) -> Dict[str, Any]: + """Get a single issue by number.""" + return self._request("GET", f"/repos/{repo}/issues/{number}") + + def create_issue( + self, + repo: str, + title: str, + body: str = "", + labels: Optional[List[int]] = None, + assignees: Optional[List[str]] = None, + ) -> Dict[str, Any]: + """Create a new issue.""" + data: Dict[str, Any] = {"title": title, "body": body} + if labels: + data["labels"] = labels + if assignees: + data["assignees"] = assignees + return self._request("POST", f"/repos/{repo}/issues", data=data) + + def create_issue_comment( + self, repo: str, number: int, body: str + ) -> Dict[str, Any]: + """Add a comment to an issue or pull request.""" + return self._request( + "POST", + f"/repos/{repo}/issues/{number}/comments", + data={"body": body}, + ) + + def list_issue_comments( + self, repo: str, number: int, limit: int = 50, + ) -> List[dict]: + """List comments on an issue or pull request.""" + return self._paginate( + f"/repos/{repo}/issues/{number}/comments", + max_items=limit, + ) + + def find_unassigned_issues( + self, + repo: str, + state: str = "open", + exclude_labels: Optional[List[str]] = None, + ) -> List[dict]: + """Find issues with no assignee. + + Defensively handles None assignees (Gitea sometimes returns null + for the assignees list on issues that were created without one). + """ + issues = self.list_issues(repo, state=state, limit=100) + unassigned = [] + for issue in issues: + assignees = issue.get("assignees") or [] # None → [] + if not assignees: + # Check exclude_labels + if exclude_labels: + issue_labels = { + (lbl.get("name") or "").lower() + for lbl in (issue.get("labels") or []) + } + if issue_labels & {l.lower() for l in exclude_labels}: + continue + unassigned.append(issue) + return unassigned + + # ── Pull Requests ──────────────────────────────────────────────── + + def list_pulls( + self, + repo: str, + state: str = "open", + sort: str = "updated", + direction: str = "desc", + limit: int = 50, + ) -> List[dict]: + """List pull requests in a repository.""" + return self._paginate( + f"/repos/{repo}/pulls", + params={"state": state, "sort": sort, "direction": direction}, + max_items=limit, + ) + + def get_pull(self, repo: str, number: int) -> Dict[str, Any]: + """Get a single pull request by number.""" + return self._request("GET", f"/repos/{repo}/pulls/{number}") + + def create_pull( + self, + repo: str, + title: str, + head: str, + base: str = "main", + body: str = "", + ) -> Dict[str, Any]: + """Create a new pull request.""" + return self._request( + "POST", + f"/repos/{repo}/pulls", + data={"title": title, "head": head, "base": base, "body": body}, + ) + + def get_pull_diff(self, repo: str, number: int) -> str: + """Get the diff for a pull request as plain text. + + Returns the raw diff string. Useful for code review and + the destructive-PR detector in tasks.py. + """ + url = f"{self.api}/repos/{repo}/pulls/{number}.diff" + req = urllib.request.Request(url, method="GET") if self.token: req.add_header("Authorization", f"token {self.token}") - req.add_header("Content-Type", "application/json") - req.add_header("Accept", "application/json") + req.add_header("Accept", "text/plain") try: - with urllib.request.urlopen(req, timeout=30) as resp: - raw = resp.read().decode() - return json.loads(raw) if raw else {} + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + return resp.read().decode() except urllib.error.HTTPError as e: - raise Exception(f"Gitea {e.code}: {e.read().decode()}") from e + raise GiteaError(e.code, e.read().decode(), url) from e - def get_file(self, repo: str, path: str, ref: str = "main") -> Dict[str, Any]: - return self._request("GET", f"/repos/{repo}/contents/{path}?ref={ref}") + def create_pull_review( + self, + repo: str, + number: int, + body: str, + event: str = "COMMENT", + ) -> Dict[str, Any]: + """Submit a review on a pull request. - def create_file(self, repo: str, path: str, content: str, message: str, branch: str = "main") -> Dict[str, Any]: - data = { - "branch": branch, - "content": content, # Base64 encoded - "message": message - } - return self._request("POST", f"/repos/{repo}/contents/{path}", data) + Args: + event: "APPROVE", "REQUEST_CHANGES", or "COMMENT" + """ + return self._request( + "POST", + f"/repos/{repo}/pulls/{number}/reviews", + data={"body": body, "event": event}, + ) - def update_file(self, repo: str, path: str, content: str, message: str, sha: str, branch: str = "main") -> Dict[str, Any]: - data = { - "branch": branch, - "content": content, # Base64 encoded - "message": message, - "sha": sha - } - return self._request("PUT", f"/repos/{repo}/contents/{path}", data) + def list_pull_reviews( + self, repo: str, number: int + ) -> List[dict]: + """List reviews on a pull request.""" + return self._paginate(f"/repos/{repo}/pulls/{number}/reviews") + + # ── Branches ───────────────────────────────────────────────────── + + def create_branch( + self, + repo: str, + branch: str, + old_branch: str = "main", + ) -> Dict[str, Any]: + """Create a new branch from an existing one.""" + return self._request( + "POST", + f"/repos/{repo}/branches", + data={ + "new_branch_name": branch, + "old_branch_name": old_branch, + }, + ) + + def delete_branch(self, repo: str, branch: str) -> Dict[str, Any]: + """Delete a branch.""" + return self._request( + "DELETE", f"/repos/{repo}/branches/{branch}", + ) + + # ── Labels ─────────────────────────────────────────────────────── + + def list_labels(self, repo: str) -> List[dict]: + """List all labels in a repository.""" + return self._paginate(f"/repos/{repo}/labels") + + def add_issue_labels( + self, repo: str, number: int, label_ids: List[int] + ) -> List[dict]: + """Add labels to an issue.""" + return self._request( + "POST", + f"/repos/{repo}/issues/{number}/labels", + data={"labels": label_ids}, + ) + + # ── Notifications ──────────────────────────────────────────────── + + def list_notifications( + self, all_: bool = False, limit: int = 20, + ) -> List[dict]: + """List notifications for the authenticated user. + + Args: + all_: Include read notifications + """ + params = {"limit": limit} + if all_: + params["all"] = "true" + return self._request("GET", "/notifications", params=params) + + def mark_notifications_read(self) -> Dict[str, Any]: + """Mark all notifications as read.""" + return self._request("PUT", "/notifications") + + # ── Repository info ────────────────────────────────────────────── + + def get_repo(self, repo: str) -> Dict[str, Any]: + """Get repository metadata.""" + return self._request("GET", f"/repos/{repo}") + + def list_org_repos( + self, org: str, limit: int = 50, + ) -> List[dict]: + """List all repositories for an organization.""" + return self._paginate(f"/orgs/{org}/repos", max_items=limit) + + +# ── Token loader ───────────────────────────────────────────────────── + + +def _load_token_config() -> dict: + """Load Gitea credentials from ~/.timmy/gemini_gitea_token or env. + + Returns dict with 'url' and 'token' keys. Falls back to empty strings + if no config exists. + """ + token_file = Path.home() / ".timmy" / "gemini_gitea_token" + if not token_file.exists(): + return {"url": "", "token": ""} + + config: dict = {"url": "", "token": ""} + try: + for line in token_file.read_text().splitlines(): + line = line.strip() + if line.startswith("GITEA_URL="): + config["url"] = line.split("=", 1)[1].strip().strip('"') + elif line.startswith("GITEA_TOKEN="): + config["token"] = line.split("=", 1)[1].strip().strip('"') + except Exception: + pass + return config