Merge pull request '[sovereign] The Orchestration Client Timmy Deserves' (#76) from gemini/sovereign-gitea-client into main
This commit was merged in pull request #76.
This commit is contained in:
375
tests/tools/test_gitea_client.py
Normal file
375
tests/tools/test_gitea_client.py
Normal file
@@ -0,0 +1,375 @@
|
|||||||
|
"""Tests for the sovereign Gitea API client.
|
||||||
|
|
||||||
|
Validates:
|
||||||
|
- Retry logic with jitter on transient errors (429, 502, 503)
|
||||||
|
- Pagination across multi-page results
|
||||||
|
- Defensive None handling (assignees, labels)
|
||||||
|
- Error handling and GiteaError
|
||||||
|
- find_unassigned_issues filtering
|
||||||
|
- Token loading from config file
|
||||||
|
- Backward compatibility (existing get_file/create_file/update_file API)
|
||||||
|
|
||||||
|
These tests are fully self-contained — no network calls, no Gitea server,
|
||||||
|
no firecrawl dependency. The gitea_client module is imported directly by
|
||||||
|
file path to bypass tools/__init__.py's eager imports.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import io
|
||||||
|
import inspect
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import urllib.error
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ── Direct module import ─────────────────────────────────────────────
|
||||||
|
# Import gitea_client directly by file path to bypass tools/__init__.py
|
||||||
|
# which eagerly imports web_tools → firecrawl (not always installed).
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
|
||||||
|
PROJECT_ROOT = Path(__file__).parent.parent.parent
|
||||||
|
_spec = importlib.util.spec_from_file_location(
|
||||||
|
"gitea_client_test",
|
||||||
|
PROJECT_ROOT / "tools" / "gitea_client.py",
|
||||||
|
)
|
||||||
|
_mod = importlib.util.module_from_spec(_spec)
|
||||||
|
_spec.loader.exec_module(_mod)
|
||||||
|
|
||||||
|
GiteaClient = _mod.GiteaClient
|
||||||
|
GiteaError = _mod.GiteaError
|
||||||
|
_load_token_config = _mod._load_token_config
|
||||||
|
|
||||||
|
# Module path for patching — must target our loaded module, not tools.gitea_client
|
||||||
|
_MOD_NAME = "gitea_client_test"
|
||||||
|
sys.modules[_MOD_NAME] = _mod
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_response(data: Any, status: int = 200):
|
||||||
|
"""Create a mock HTTP response context manager."""
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.read.return_value = json.dumps(data).encode()
|
||||||
|
resp.status = status
|
||||||
|
resp.__enter__ = MagicMock(return_value=resp)
|
||||||
|
resp.__exit__ = MagicMock(return_value=False)
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
def _make_http_error(code: int, msg: str):
|
||||||
|
"""Create a real urllib HTTPError for testing."""
|
||||||
|
return urllib.error.HTTPError(
|
||||||
|
url="http://test",
|
||||||
|
code=code,
|
||||||
|
msg=msg,
|
||||||
|
hdrs={}, # type: ignore
|
||||||
|
fp=io.BytesIO(msg.encode()),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fixtures ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client():
|
||||||
|
"""Client with no real credentials (won't hit network)."""
|
||||||
|
return GiteaClient(base_url="http://localhost:3000", token="test_token")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_urlopen():
|
||||||
|
"""Patch urllib.request.urlopen on our directly-loaded module."""
|
||||||
|
with patch.object(_mod.urllib.request, "urlopen") as mock:
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
|
||||||
|
# ── Core request tests ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestCoreRequest:
|
||||||
|
def test_successful_get(self, client, mock_urlopen):
|
||||||
|
"""Basic GET request returns parsed JSON."""
|
||||||
|
mock_urlopen.return_value = _make_response({"id": 1, "name": "test"})
|
||||||
|
result = client._request("GET", "/repos/org/repo")
|
||||||
|
assert result == {"id": 1, "name": "test"}
|
||||||
|
mock_urlopen.assert_called_once()
|
||||||
|
|
||||||
|
def test_auth_header_set(self, client, mock_urlopen):
|
||||||
|
"""Token is included in Authorization header."""
|
||||||
|
mock_urlopen.return_value = _make_response({})
|
||||||
|
client._request("GET", "/test")
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
assert req.get_header("Authorization") == "token test_token"
|
||||||
|
|
||||||
|
def test_post_sends_json_body(self, client, mock_urlopen):
|
||||||
|
"""POST with data sends JSON-encoded body."""
|
||||||
|
mock_urlopen.return_value = _make_response({"id": 42})
|
||||||
|
client._request("POST", "/test", data={"title": "hello"})
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
assert req.data == json.dumps({"title": "hello"}).encode()
|
||||||
|
assert req.get_method() == "POST"
|
||||||
|
|
||||||
|
def test_params_become_query_string(self, client, mock_urlopen):
|
||||||
|
"""Query params are URL-encoded."""
|
||||||
|
mock_urlopen.return_value = _make_response([])
|
||||||
|
client._request("GET", "/issues", params={"state": "open", "limit": 50})
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
assert "state=open" in req.full_url
|
||||||
|
assert "limit=50" in req.full_url
|
||||||
|
|
||||||
|
def test_none_params_excluded(self, client, mock_urlopen):
|
||||||
|
"""None values in params dict are excluded from query string."""
|
||||||
|
mock_urlopen.return_value = _make_response([])
|
||||||
|
client._request("GET", "/issues", params={"state": "open", "labels": None})
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
assert "state=open" in req.full_url
|
||||||
|
assert "labels" not in req.full_url
|
||||||
|
|
||||||
|
|
||||||
|
# ── Retry tests ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestRetry:
|
||||||
|
def test_retries_on_429(self, client, mock_urlopen):
|
||||||
|
"""429 (rate limit) triggers retry with jitter."""
|
||||||
|
mock_urlopen.side_effect = [
|
||||||
|
_make_http_error(429, "rate limited"),
|
||||||
|
_make_response({"ok": True}),
|
||||||
|
]
|
||||||
|
with patch.object(_mod.time, "sleep"):
|
||||||
|
result = client._request("GET", "/test")
|
||||||
|
assert result == {"ok": True}
|
||||||
|
assert mock_urlopen.call_count == 2
|
||||||
|
|
||||||
|
def test_retries_on_502(self, client, mock_urlopen):
|
||||||
|
"""502 (bad gateway) triggers retry."""
|
||||||
|
mock_urlopen.side_effect = [
|
||||||
|
_make_http_error(502, "bad gateway"),
|
||||||
|
_make_response({"recovered": True}),
|
||||||
|
]
|
||||||
|
with patch.object(_mod.time, "sleep"):
|
||||||
|
result = client._request("GET", "/test")
|
||||||
|
assert result == {"recovered": True}
|
||||||
|
|
||||||
|
def test_retries_on_503(self, client, mock_urlopen):
|
||||||
|
"""503 (service unavailable) triggers retry."""
|
||||||
|
mock_urlopen.side_effect = [
|
||||||
|
_make_http_error(503, "unavailable"),
|
||||||
|
_make_http_error(503, "unavailable"),
|
||||||
|
_make_response({"third_time": True}),
|
||||||
|
]
|
||||||
|
with patch.object(_mod.time, "sleep"):
|
||||||
|
result = client._request("GET", "/test")
|
||||||
|
assert result == {"third_time": True}
|
||||||
|
assert mock_urlopen.call_count == 3
|
||||||
|
|
||||||
|
def test_non_retryable_error_raises_immediately(self, client, mock_urlopen):
|
||||||
|
"""404 is not retryable — raises GiteaError immediately."""
|
||||||
|
mock_urlopen.side_effect = _make_http_error(404, "not found")
|
||||||
|
with pytest.raises(GiteaError) as exc_info:
|
||||||
|
client._request("GET", "/nonexistent")
|
||||||
|
assert exc_info.value.status_code == 404
|
||||||
|
assert mock_urlopen.call_count == 1
|
||||||
|
|
||||||
|
def test_max_retries_exhausted(self, client, mock_urlopen):
|
||||||
|
"""After max retries, raises the last error."""
|
||||||
|
mock_urlopen.side_effect = [
|
||||||
|
_make_http_error(503, "unavailable"),
|
||||||
|
] * 4
|
||||||
|
with patch.object(_mod.time, "sleep"):
|
||||||
|
with pytest.raises(GiteaError) as exc_info:
|
||||||
|
client._request("GET", "/test")
|
||||||
|
assert exc_info.value.status_code == 503
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pagination tests ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPagination:
|
||||||
|
def test_single_page(self, client, mock_urlopen):
|
||||||
|
"""Single page of results (fewer items than limit)."""
|
||||||
|
items = [{"id": i} for i in range(10)]
|
||||||
|
mock_urlopen.return_value = _make_response(items)
|
||||||
|
result = client._paginate("/repos/org/repo/issues")
|
||||||
|
assert len(result) == 10
|
||||||
|
assert mock_urlopen.call_count == 1
|
||||||
|
|
||||||
|
def test_multi_page(self, client, mock_urlopen):
|
||||||
|
"""Results spanning multiple pages."""
|
||||||
|
page1 = [{"id": i} for i in range(50)]
|
||||||
|
page2 = [{"id": i} for i in range(50, 75)]
|
||||||
|
mock_urlopen.side_effect = [
|
||||||
|
_make_response(page1),
|
||||||
|
_make_response(page2),
|
||||||
|
]
|
||||||
|
result = client._paginate("/test")
|
||||||
|
assert len(result) == 75
|
||||||
|
assert mock_urlopen.call_count == 2
|
||||||
|
|
||||||
|
def test_max_items_respected(self, client, mock_urlopen):
|
||||||
|
"""max_items truncates results."""
|
||||||
|
page1 = [{"id": i} for i in range(50)]
|
||||||
|
mock_urlopen.return_value = _make_response(page1)
|
||||||
|
result = client._paginate("/test", max_items=20)
|
||||||
|
assert len(result) == 20
|
||||||
|
|
||||||
|
|
||||||
|
# ── Issue methods ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestIssues:
|
||||||
|
def test_list_issues(self, client, mock_urlopen):
|
||||||
|
"""list_issues passes correct params."""
|
||||||
|
mock_urlopen.return_value = _make_response([
|
||||||
|
{"number": 1, "title": "Bug"},
|
||||||
|
{"number": 2, "title": "Feature"},
|
||||||
|
])
|
||||||
|
result = client.list_issues("org/repo", state="open")
|
||||||
|
assert len(result) == 2
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
assert "state=open" in req.full_url
|
||||||
|
assert "type=issues" in req.full_url
|
||||||
|
|
||||||
|
def test_create_issue_comment(self, client, mock_urlopen):
|
||||||
|
"""create_issue_comment sends body."""
|
||||||
|
mock_urlopen.return_value = _make_response({"id": 99, "body": "Fixed"})
|
||||||
|
result = client.create_issue_comment("org/repo", 42, "Fixed in PR #102")
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
body = json.loads(req.data)
|
||||||
|
assert body["body"] == "Fixed in PR #102"
|
||||||
|
assert "/repos/org/repo/issues/42/comments" in req.full_url
|
||||||
|
|
||||||
|
def test_find_unassigned_none_assignees(self, client, mock_urlopen):
|
||||||
|
"""find_unassigned_issues handles None assignees field.
|
||||||
|
|
||||||
|
Gitea sometimes returns null for assignees on issues created
|
||||||
|
without setting one. This was a bug found in the audit —
|
||||||
|
tasks.py crashed with TypeError when iterating None.
|
||||||
|
"""
|
||||||
|
mock_urlopen.return_value = _make_response([
|
||||||
|
{"number": 1, "title": "Bug", "assignees": None, "labels": []},
|
||||||
|
{"number": 2, "title": "Assigned", "assignees": [{"login": "dev"}], "labels": []},
|
||||||
|
{"number": 3, "title": "Empty", "assignees": [], "labels": []},
|
||||||
|
])
|
||||||
|
result = client.find_unassigned_issues("org/repo")
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result[0]["number"] == 1
|
||||||
|
assert result[1]["number"] == 3
|
||||||
|
|
||||||
|
def test_find_unassigned_excludes_labels(self, client, mock_urlopen):
|
||||||
|
"""find_unassigned_issues respects exclude_labels."""
|
||||||
|
mock_urlopen.return_value = _make_response([
|
||||||
|
{"number": 1, "title": "Bug", "assignees": None,
|
||||||
|
"labels": [{"name": "wontfix"}]},
|
||||||
|
{"number": 2, "title": "Todo", "assignees": None,
|
||||||
|
"labels": [{"name": "enhancement"}]},
|
||||||
|
])
|
||||||
|
result = client.find_unassigned_issues(
|
||||||
|
"org/repo", exclude_labels=["wontfix"]
|
||||||
|
)
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0]["number"] == 2
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pull Request methods ────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPullRequests:
|
||||||
|
def test_create_pull(self, client, mock_urlopen):
|
||||||
|
"""create_pull sends correct data."""
|
||||||
|
mock_urlopen.return_value = _make_response(
|
||||||
|
{"number": 105, "state": "open"}
|
||||||
|
)
|
||||||
|
result = client.create_pull(
|
||||||
|
"org/repo", title="Fix bugs",
|
||||||
|
head="fix-branch", base="main", body="Fixes #42",
|
||||||
|
)
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
body = json.loads(req.data)
|
||||||
|
assert body["title"] == "Fix bugs"
|
||||||
|
assert body["head"] == "fix-branch"
|
||||||
|
assert body["base"] == "main"
|
||||||
|
assert result["number"] == 105
|
||||||
|
|
||||||
|
def test_create_pull_review(self, client, mock_urlopen):
|
||||||
|
"""create_pull_review sends review event."""
|
||||||
|
mock_urlopen.return_value = _make_response({"id": 1})
|
||||||
|
client.create_pull_review("org/repo", 42, "LGTM", event="APPROVE")
|
||||||
|
req = mock_urlopen.call_args[0][0]
|
||||||
|
body = json.loads(req.data)
|
||||||
|
assert body["event"] == "APPROVE"
|
||||||
|
assert body["body"] == "LGTM"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Backward compatibility ──────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestBackwardCompat:
|
||||||
|
"""Ensure the expanded client doesn't break graph_store.py or
|
||||||
|
knowledge_ingester.py which import the old 3-method interface."""
|
||||||
|
|
||||||
|
def test_get_file_signature(self, client):
|
||||||
|
"""get_file accepts (repo, path, ref) — same as before."""
|
||||||
|
sig = inspect.signature(client.get_file)
|
||||||
|
params = list(sig.parameters.keys())
|
||||||
|
assert params == ["repo", "path", "ref"]
|
||||||
|
|
||||||
|
def test_create_file_signature(self, client):
|
||||||
|
"""create_file accepts (repo, path, content, message, branch)."""
|
||||||
|
sig = inspect.signature(client.create_file)
|
||||||
|
params = list(sig.parameters.keys())
|
||||||
|
assert "repo" in params and "content" in params and "message" in params
|
||||||
|
|
||||||
|
def test_update_file_signature(self, client):
|
||||||
|
"""update_file accepts (repo, path, content, message, sha, branch)."""
|
||||||
|
sig = inspect.signature(client.update_file)
|
||||||
|
params = list(sig.parameters.keys())
|
||||||
|
assert "sha" in params
|
||||||
|
|
||||||
|
def test_constructor_env_var_fallback(self):
|
||||||
|
"""Constructor reads GITEA_URL and GITEA_TOKEN from env."""
|
||||||
|
with patch.dict(os.environ, {
|
||||||
|
"GITEA_URL": "http://myserver:3000",
|
||||||
|
"GITEA_TOKEN": "mytoken",
|
||||||
|
}):
|
||||||
|
c = GiteaClient()
|
||||||
|
assert c.base_url == "http://myserver:3000"
|
||||||
|
assert c.token == "mytoken"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Token config loading ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestTokenConfig:
|
||||||
|
def test_load_missing_file(self, tmp_path):
|
||||||
|
"""Missing token file returns empty dict."""
|
||||||
|
with patch.object(_mod.Path, "home", return_value=tmp_path / "nope"):
|
||||||
|
config = _load_token_config()
|
||||||
|
assert config == {"url": "", "token": ""}
|
||||||
|
|
||||||
|
def test_load_valid_file(self, tmp_path):
|
||||||
|
"""Valid token file is parsed correctly."""
|
||||||
|
token_file = tmp_path / ".timmy" / "gemini_gitea_token"
|
||||||
|
token_file.parent.mkdir(parents=True)
|
||||||
|
token_file.write_text(
|
||||||
|
'GITEA_URL=http://143.198.27.163:3000\n'
|
||||||
|
'GITEA_TOKEN=abc123\n'
|
||||||
|
)
|
||||||
|
with patch.object(_mod.Path, "home", return_value=tmp_path):
|
||||||
|
config = _load_token_config()
|
||||||
|
assert config["url"] == "http://143.198.27.163:3000"
|
||||||
|
assert config["token"] == "abc123"
|
||||||
|
|
||||||
|
|
||||||
|
# ── GiteaError ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGiteaError:
|
||||||
|
def test_error_attributes(self):
|
||||||
|
err = GiteaError(404, "not found", "http://example.com/api/v1/test")
|
||||||
|
assert err.status_code == 404
|
||||||
|
assert err.url == "http://example.com/api/v1/test"
|
||||||
|
assert "404" in str(err)
|
||||||
|
assert "not found" in str(err)
|
||||||
|
|
||||||
|
def test_error_is_exception(self):
|
||||||
|
"""GiteaError is a proper exception that can be caught."""
|
||||||
|
with pytest.raises(GiteaError):
|
||||||
|
raise GiteaError(500, "server error")
|
||||||
@@ -1,59 +1,512 @@
|
|||||||
"""
|
"""
|
||||||
Gitea API Client — typed, sovereign, zero-dependency.
|
Gitea API Client — typed, sovereign, zero-dependency.
|
||||||
|
|
||||||
Enables the agent to interact with Timmy's sovereign Gitea instance
|
Connects Hermes to Timmy's sovereign Gitea instance for:
|
||||||
for issue tracking, PR management, and knowledge persistence.
|
- Issue tracking (create, list, comment, label)
|
||||||
|
- Pull request management (create, list, review, merge)
|
||||||
|
- File operations (read, create, update)
|
||||||
|
- Branch management (create, delete)
|
||||||
|
|
||||||
|
Design principles:
|
||||||
|
- Zero pip dependencies — uses only urllib (stdlib)
|
||||||
|
- Retry with random jitter on 429/5xx (same pattern as SessionDB)
|
||||||
|
- Pagination-aware: all list methods return complete results
|
||||||
|
- Defensive None handling on all response fields
|
||||||
|
- Rate-limit aware: backs off on 429, never hammers the server
|
||||||
|
|
||||||
|
This client is the foundation for:
|
||||||
|
- graph_store.py (knowledge persistence)
|
||||||
|
- knowledge_ingester.py (session ingestion)
|
||||||
|
- tasks.py orchestration (timmy-home)
|
||||||
|
- Playbook engine (dpo-trainer, pr-reviewer, etc.)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
client = GiteaClient()
|
||||||
|
issues = client.list_issues("Timmy_Foundation/the-nexus", state="open")
|
||||||
|
client.create_issue_comment("Timmy_Foundation/the-nexus", 42, "Fixed in PR #102")
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
|
import time
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import urllib.error
|
import urllib.error
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional, Dict, List
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Retry configuration ──────────────────────────────────────────────
|
||||||
|
# Same jitter pattern as SessionDB._execute_write: random backoff
|
||||||
|
# to avoid convoy effects when multiple agents hit the API.
|
||||||
|
|
||||||
|
_MAX_RETRIES = 4
|
||||||
|
_RETRY_MIN_S = 0.5
|
||||||
|
_RETRY_MAX_S = 2.0
|
||||||
|
_RETRYABLE_CODES = frozenset({429, 500, 502, 503, 504})
|
||||||
|
_DEFAULT_TIMEOUT = 30
|
||||||
|
_DEFAULT_PAGE_LIMIT = 50 # Gitea's max per page
|
||||||
|
|
||||||
|
|
||||||
|
class GiteaError(Exception):
|
||||||
|
"""Raised when the Gitea API returns an error."""
|
||||||
|
|
||||||
|
def __init__(self, status_code: int, message: str, url: str = ""):
|
||||||
|
self.status_code = status_code
|
||||||
|
self.url = url
|
||||||
|
super().__init__(f"Gitea {status_code}: {message}")
|
||||||
|
|
||||||
|
|
||||||
class GiteaClient:
|
class GiteaClient:
|
||||||
def __init__(self, base_url: Optional[str] = None, token: Optional[str] = None):
|
"""Sovereign Gitea API client with retry, pagination, and defensive handling."""
|
||||||
self.base_url = base_url or os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
|
|
||||||
self.token = token or os.environ.get("GITEA_TOKEN")
|
|
||||||
self.api = f"{self.base_url.rstrip('/')}/api/v1"
|
|
||||||
|
|
||||||
def _request(self, method: str, path: str, data: Optional[dict] = None) -> Any:
|
def __init__(
|
||||||
|
self,
|
||||||
|
base_url: Optional[str] = None,
|
||||||
|
token: Optional[str] = None,
|
||||||
|
timeout: int = _DEFAULT_TIMEOUT,
|
||||||
|
):
|
||||||
|
self.base_url = (
|
||||||
|
base_url
|
||||||
|
or os.environ.get("GITEA_URL", "")
|
||||||
|
or _load_token_config().get("url", "http://localhost:3000")
|
||||||
|
)
|
||||||
|
self.token = (
|
||||||
|
token
|
||||||
|
or os.environ.get("GITEA_TOKEN", "")
|
||||||
|
or _load_token_config().get("token", "")
|
||||||
|
)
|
||||||
|
self.api = f"{self.base_url.rstrip('/')}/api/v1"
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
# ── Core HTTP ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _request(
|
||||||
|
self,
|
||||||
|
method: str,
|
||||||
|
path: str,
|
||||||
|
data: Optional[dict] = None,
|
||||||
|
params: Optional[dict] = None,
|
||||||
|
) -> Any:
|
||||||
|
"""Make an authenticated API request with retry on transient errors.
|
||||||
|
|
||||||
|
Returns parsed JSON response. Raises GiteaError on non-retryable
|
||||||
|
failures.
|
||||||
|
"""
|
||||||
url = f"{self.api}{path}"
|
url = f"{self.api}{path}"
|
||||||
|
if params:
|
||||||
|
query = urllib.parse.urlencode(
|
||||||
|
{k: v for k, v in params.items() if v is not None}
|
||||||
|
)
|
||||||
|
url = f"{url}?{query}"
|
||||||
|
|
||||||
body = json.dumps(data).encode() if data else None
|
body = json.dumps(data).encode() if data else None
|
||||||
req = urllib.request.Request(url, data=body, method=method)
|
|
||||||
|
last_err: Optional[Exception] = None
|
||||||
|
for attempt in range(_MAX_RETRIES):
|
||||||
|
req = urllib.request.Request(url, data=body, method=method)
|
||||||
|
if self.token:
|
||||||
|
req.add_header("Authorization", f"token {self.token}")
|
||||||
|
req.add_header("Content-Type", "application/json")
|
||||||
|
req.add_header("Accept", "application/json")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
||||||
|
raw = resp.read().decode()
|
||||||
|
return json.loads(raw) if raw.strip() else {}
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
status = e.code
|
||||||
|
err_body = ""
|
||||||
|
try:
|
||||||
|
err_body = e.read().decode()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if status in _RETRYABLE_CODES and attempt < _MAX_RETRIES - 1:
|
||||||
|
jitter = random.uniform(_RETRY_MIN_S, _RETRY_MAX_S)
|
||||||
|
logger.debug(
|
||||||
|
"Gitea %d on %s %s, retry %d/%d in %.1fs",
|
||||||
|
status, method, path, attempt + 1, _MAX_RETRIES, jitter,
|
||||||
|
)
|
||||||
|
last_err = GiteaError(status, err_body, url)
|
||||||
|
time.sleep(jitter)
|
||||||
|
continue
|
||||||
|
|
||||||
|
raise GiteaError(status, err_body, url) from e
|
||||||
|
except (urllib.error.URLError, TimeoutError, OSError) as e:
|
||||||
|
if attempt < _MAX_RETRIES - 1:
|
||||||
|
jitter = random.uniform(_RETRY_MIN_S, _RETRY_MAX_S)
|
||||||
|
logger.debug(
|
||||||
|
"Gitea connection error on %s %s: %s, retry %d/%d",
|
||||||
|
method, path, e, attempt + 1, _MAX_RETRIES,
|
||||||
|
)
|
||||||
|
last_err = e
|
||||||
|
time.sleep(jitter)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
|
raise last_err or GiteaError(0, "Max retries exceeded")
|
||||||
|
|
||||||
|
def _paginate(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
params: Optional[dict] = None,
|
||||||
|
max_items: int = 200,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""Fetch all pages of a paginated endpoint.
|
||||||
|
|
||||||
|
Gitea uses `page` + `limit` query params. This method fetches
|
||||||
|
pages until we get fewer items than the limit, or hit max_items.
|
||||||
|
"""
|
||||||
|
params = dict(params or {})
|
||||||
|
params.setdefault("limit", _DEFAULT_PAGE_LIMIT)
|
||||||
|
page = 1
|
||||||
|
all_items: List[dict] = []
|
||||||
|
|
||||||
|
while len(all_items) < max_items:
|
||||||
|
params["page"] = page
|
||||||
|
items = self._request("GET", path, params=params)
|
||||||
|
if not isinstance(items, list):
|
||||||
|
break
|
||||||
|
all_items.extend(items)
|
||||||
|
if len(items) < params["limit"]:
|
||||||
|
break # Last page
|
||||||
|
page += 1
|
||||||
|
|
||||||
|
return all_items[:max_items]
|
||||||
|
|
||||||
|
# ── File operations (existing API) ───────────────────────────────
|
||||||
|
|
||||||
|
def get_file(
|
||||||
|
self, repo: str, path: str, ref: str = "main"
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Get file content and metadata from a repository."""
|
||||||
|
return self._request(
|
||||||
|
"GET",
|
||||||
|
f"/repos/{repo}/contents/{path}",
|
||||||
|
params={"ref": ref},
|
||||||
|
)
|
||||||
|
|
||||||
|
def create_file(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
path: str,
|
||||||
|
content: str,
|
||||||
|
message: str,
|
||||||
|
branch: str = "main",
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Create a new file in a repository.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: Base64-encoded file content
|
||||||
|
message: Commit message
|
||||||
|
"""
|
||||||
|
return self._request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{repo}/contents/{path}",
|
||||||
|
data={"branch": branch, "content": content, "message": message},
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_file(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
path: str,
|
||||||
|
content: str,
|
||||||
|
message: str,
|
||||||
|
sha: str,
|
||||||
|
branch: str = "main",
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Update an existing file in a repository.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: Base64-encoded file content
|
||||||
|
sha: SHA of the file being replaced (for conflict detection)
|
||||||
|
"""
|
||||||
|
return self._request(
|
||||||
|
"PUT",
|
||||||
|
f"/repos/{repo}/contents/{path}",
|
||||||
|
data={
|
||||||
|
"branch": branch,
|
||||||
|
"content": content,
|
||||||
|
"message": message,
|
||||||
|
"sha": sha,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Issues ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def list_issues(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
state: str = "open",
|
||||||
|
labels: Optional[str] = None,
|
||||||
|
sort: str = "updated",
|
||||||
|
direction: str = "desc",
|
||||||
|
limit: int = 50,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""List issues in a repository.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
state: "open", "closed", or "all"
|
||||||
|
labels: Comma-separated label names
|
||||||
|
sort: "created", "updated", "comments"
|
||||||
|
direction: "asc" or "desc"
|
||||||
|
"""
|
||||||
|
params = {
|
||||||
|
"state": state,
|
||||||
|
"type": "issues",
|
||||||
|
"sort": sort,
|
||||||
|
"direction": direction,
|
||||||
|
}
|
||||||
|
if labels:
|
||||||
|
params["labels"] = labels
|
||||||
|
return self._paginate(
|
||||||
|
f"/repos/{repo}/issues", params=params, max_items=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_issue(self, repo: str, number: int) -> Dict[str, Any]:
|
||||||
|
"""Get a single issue by number."""
|
||||||
|
return self._request("GET", f"/repos/{repo}/issues/{number}")
|
||||||
|
|
||||||
|
def create_issue(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
title: str,
|
||||||
|
body: str = "",
|
||||||
|
labels: Optional[List[int]] = None,
|
||||||
|
assignees: Optional[List[str]] = None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Create a new issue."""
|
||||||
|
data: Dict[str, Any] = {"title": title, "body": body}
|
||||||
|
if labels:
|
||||||
|
data["labels"] = labels
|
||||||
|
if assignees:
|
||||||
|
data["assignees"] = assignees
|
||||||
|
return self._request("POST", f"/repos/{repo}/issues", data=data)
|
||||||
|
|
||||||
|
def create_issue_comment(
|
||||||
|
self, repo: str, number: int, body: str
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Add a comment to an issue or pull request."""
|
||||||
|
return self._request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{repo}/issues/{number}/comments",
|
||||||
|
data={"body": body},
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_issue_comments(
|
||||||
|
self, repo: str, number: int, limit: int = 50,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""List comments on an issue or pull request."""
|
||||||
|
return self._paginate(
|
||||||
|
f"/repos/{repo}/issues/{number}/comments",
|
||||||
|
max_items=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
def find_unassigned_issues(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
state: str = "open",
|
||||||
|
exclude_labels: Optional[List[str]] = None,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""Find issues with no assignee.
|
||||||
|
|
||||||
|
Defensively handles None assignees (Gitea sometimes returns null
|
||||||
|
for the assignees list on issues that were created without one).
|
||||||
|
"""
|
||||||
|
issues = self.list_issues(repo, state=state, limit=100)
|
||||||
|
unassigned = []
|
||||||
|
for issue in issues:
|
||||||
|
assignees = issue.get("assignees") or [] # None → []
|
||||||
|
if not assignees:
|
||||||
|
# Check exclude_labels
|
||||||
|
if exclude_labels:
|
||||||
|
issue_labels = {
|
||||||
|
(lbl.get("name") or "").lower()
|
||||||
|
for lbl in (issue.get("labels") or [])
|
||||||
|
}
|
||||||
|
if issue_labels & {l.lower() for l in exclude_labels}:
|
||||||
|
continue
|
||||||
|
unassigned.append(issue)
|
||||||
|
return unassigned
|
||||||
|
|
||||||
|
# ── Pull Requests ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def list_pulls(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
state: str = "open",
|
||||||
|
sort: str = "updated",
|
||||||
|
direction: str = "desc",
|
||||||
|
limit: int = 50,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""List pull requests in a repository."""
|
||||||
|
return self._paginate(
|
||||||
|
f"/repos/{repo}/pulls",
|
||||||
|
params={"state": state, "sort": sort, "direction": direction},
|
||||||
|
max_items=limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_pull(self, repo: str, number: int) -> Dict[str, Any]:
|
||||||
|
"""Get a single pull request by number."""
|
||||||
|
return self._request("GET", f"/repos/{repo}/pulls/{number}")
|
||||||
|
|
||||||
|
def create_pull(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
title: str,
|
||||||
|
head: str,
|
||||||
|
base: str = "main",
|
||||||
|
body: str = "",
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Create a new pull request."""
|
||||||
|
return self._request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{repo}/pulls",
|
||||||
|
data={"title": title, "head": head, "base": base, "body": body},
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_pull_diff(self, repo: str, number: int) -> str:
|
||||||
|
"""Get the diff for a pull request as plain text.
|
||||||
|
|
||||||
|
Returns the raw diff string. Useful for code review and
|
||||||
|
the destructive-PR detector in tasks.py.
|
||||||
|
"""
|
||||||
|
url = f"{self.api}/repos/{repo}/pulls/{number}.diff"
|
||||||
|
req = urllib.request.Request(url, method="GET")
|
||||||
if self.token:
|
if self.token:
|
||||||
req.add_header("Authorization", f"token {self.token}")
|
req.add_header("Authorization", f"token {self.token}")
|
||||||
req.add_header("Content-Type", "application/json")
|
req.add_header("Accept", "text/plain")
|
||||||
req.add_header("Accept", "application/json")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
|
||||||
raw = resp.read().decode()
|
return resp.read().decode()
|
||||||
return json.loads(raw) if raw else {}
|
|
||||||
except urllib.error.HTTPError as e:
|
except urllib.error.HTTPError as e:
|
||||||
raise Exception(f"Gitea {e.code}: {e.read().decode()}") from e
|
raise GiteaError(e.code, e.read().decode(), url) from e
|
||||||
|
|
||||||
def get_file(self, repo: str, path: str, ref: str = "main") -> Dict[str, Any]:
|
def create_pull_review(
|
||||||
return self._request("GET", f"/repos/{repo}/contents/{path}?ref={ref}")
|
self,
|
||||||
|
repo: str,
|
||||||
|
number: int,
|
||||||
|
body: str,
|
||||||
|
event: str = "COMMENT",
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Submit a review on a pull request.
|
||||||
|
|
||||||
def create_file(self, repo: str, path: str, content: str, message: str, branch: str = "main") -> Dict[str, Any]:
|
Args:
|
||||||
data = {
|
event: "APPROVE", "REQUEST_CHANGES", or "COMMENT"
|
||||||
"branch": branch,
|
"""
|
||||||
"content": content, # Base64 encoded
|
return self._request(
|
||||||
"message": message
|
"POST",
|
||||||
}
|
f"/repos/{repo}/pulls/{number}/reviews",
|
||||||
return self._request("POST", f"/repos/{repo}/contents/{path}", data)
|
data={"body": body, "event": event},
|
||||||
|
)
|
||||||
|
|
||||||
def update_file(self, repo: str, path: str, content: str, message: str, sha: str, branch: str = "main") -> Dict[str, Any]:
|
def list_pull_reviews(
|
||||||
data = {
|
self, repo: str, number: int
|
||||||
"branch": branch,
|
) -> List[dict]:
|
||||||
"content": content, # Base64 encoded
|
"""List reviews on a pull request."""
|
||||||
"message": message,
|
return self._paginate(f"/repos/{repo}/pulls/{number}/reviews")
|
||||||
"sha": sha
|
|
||||||
}
|
# ── Branches ─────────────────────────────────────────────────────
|
||||||
return self._request("PUT", f"/repos/{repo}/contents/{path}", data)
|
|
||||||
|
def create_branch(
|
||||||
|
self,
|
||||||
|
repo: str,
|
||||||
|
branch: str,
|
||||||
|
old_branch: str = "main",
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""Create a new branch from an existing one."""
|
||||||
|
return self._request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{repo}/branches",
|
||||||
|
data={
|
||||||
|
"new_branch_name": branch,
|
||||||
|
"old_branch_name": old_branch,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def delete_branch(self, repo: str, branch: str) -> Dict[str, Any]:
|
||||||
|
"""Delete a branch."""
|
||||||
|
return self._request(
|
||||||
|
"DELETE", f"/repos/{repo}/branches/{branch}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Labels ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def list_labels(self, repo: str) -> List[dict]:
|
||||||
|
"""List all labels in a repository."""
|
||||||
|
return self._paginate(f"/repos/{repo}/labels")
|
||||||
|
|
||||||
|
def add_issue_labels(
|
||||||
|
self, repo: str, number: int, label_ids: List[int]
|
||||||
|
) -> List[dict]:
|
||||||
|
"""Add labels to an issue."""
|
||||||
|
return self._request(
|
||||||
|
"POST",
|
||||||
|
f"/repos/{repo}/issues/{number}/labels",
|
||||||
|
data={"labels": label_ids},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Notifications ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def list_notifications(
|
||||||
|
self, all_: bool = False, limit: int = 20,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""List notifications for the authenticated user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
all_: Include read notifications
|
||||||
|
"""
|
||||||
|
params = {"limit": limit}
|
||||||
|
if all_:
|
||||||
|
params["all"] = "true"
|
||||||
|
return self._request("GET", "/notifications", params=params)
|
||||||
|
|
||||||
|
def mark_notifications_read(self) -> Dict[str, Any]:
|
||||||
|
"""Mark all notifications as read."""
|
||||||
|
return self._request("PUT", "/notifications")
|
||||||
|
|
||||||
|
# ── Repository info ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
def get_repo(self, repo: str) -> Dict[str, Any]:
|
||||||
|
"""Get repository metadata."""
|
||||||
|
return self._request("GET", f"/repos/{repo}")
|
||||||
|
|
||||||
|
def list_org_repos(
|
||||||
|
self, org: str, limit: int = 50,
|
||||||
|
) -> List[dict]:
|
||||||
|
"""List all repositories for an organization."""
|
||||||
|
return self._paginate(f"/orgs/{org}/repos", max_items=limit)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Token loader ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _load_token_config() -> dict:
|
||||||
|
"""Load Gitea credentials from ~/.timmy/gemini_gitea_token or env.
|
||||||
|
|
||||||
|
Returns dict with 'url' and 'token' keys. Falls back to empty strings
|
||||||
|
if no config exists.
|
||||||
|
"""
|
||||||
|
token_file = Path.home() / ".timmy" / "gemini_gitea_token"
|
||||||
|
if not token_file.exists():
|
||||||
|
return {"url": "", "token": ""}
|
||||||
|
|
||||||
|
config: dict = {"url": "", "token": ""}
|
||||||
|
try:
|
||||||
|
for line in token_file.read_text().splitlines():
|
||||||
|
line = line.strip()
|
||||||
|
if line.startswith("GITEA_URL="):
|
||||||
|
config["url"] = line.split("=", 1)[1].strip().strip('"')
|
||||||
|
elif line.startswith("GITEA_TOKEN="):
|
||||||
|
config["token"] = line.split("=", 1)[1].strip().strip('"')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return config
|
||||||
|
|||||||
Reference in New Issue
Block a user