1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/infrastructure/hands/gitea.py
Trip T 350e6f54ff fix: prevent "Event loop is closed" on repeated Gitea API calls
The httpx AsyncClient was cached across asyncio.run() boundaries.
Each asyncio.run() creates and closes a new event loop, leaving the
cached client's connections on a dead loop.  Second+ calls would fail
with "Event loop is closed".

Fix: create a fresh client per request and close it in a finally block.
No more cross-loop client reuse.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 20:40:39 -04:00

296 lines
9.9 KiB
Python

"""Gitea Hand — issue tracking and self-improvement channel for Timmy.
Provides Gitea API capabilities with:
- Token auth (env var, config, or ~/.config/gitea/token fallback)
- Structured result parsing
- Dedup checks before creating issues
- Graceful degradation: log warning, return fallback, never crash
Follows project conventions:
- Config via ``from config import settings``
- Singleton pattern for module-level import
- Async httpx client (like Paperclip client pattern)
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
@dataclass
class GiteaResult:
"""Result from a Gitea API operation."""
operation: str
success: bool
data: dict = field(default_factory=dict)
error: str = ""
latency_ms: float = 0.0
def _resolve_token() -> str:
"""Resolve Gitea API token from settings or filesystem fallback."""
if settings.gitea_token:
return settings.gitea_token
try:
return _TOKEN_FILE.read_text().strip()
except (FileNotFoundError, PermissionError):
return ""
def _title_similar(a: str, b: str, threshold: float = 0.6) -> bool:
"""Check if two issue titles are similar enough to be duplicates."""
return SequenceMatcher(None, a.lower(), b.lower()).ratio() > threshold
class GiteaHand:
"""Gitea API hand for Timmy.
All methods degrade gracefully — if Gitea is unreachable or the
API call fails, the hand returns a ``GiteaResult(success=False)``
rather than raising.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
repo: str | None = None,
timeout: int | None = None,
) -> None:
self._base_url = (base_url or settings.gitea_url).rstrip("/")
self._token = token or _resolve_token()
self._repo = repo or settings.gitea_repo
self._timeout = timeout or settings.gitea_timeout
if not self._token:
logger.warning(
"Gitea token not configured — set GITEA_TOKEN or place token in %s",
_TOKEN_FILE,
)
else:
logger.info(
"GiteaHand initialised — %s/%s",
self._base_url,
self._repo,
)
@property
def available(self) -> bool:
"""Check if Gitea integration is configured and enabled."""
return bool(settings.gitea_enabled and self._token and self._repo)
def _get_client(self):
"""Create a fresh async HTTP client for the current event loop.
Always creates a new client rather than caching, because tool
functions call us via ``asyncio.run()`` which creates a new loop
each time — a cached client from a previous loop would raise
"Event loop is closed".
"""
import httpx
return httpx.AsyncClient(
base_url=self._base_url,
headers={
"Authorization": f"token {self._token}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=self._timeout,
)
async def _request(self, method: str, path: str, **kwargs) -> GiteaResult:
"""Make an API request with full error handling."""
start = time.time()
operation = f"{method.upper()} {path}"
if not self.available:
return GiteaResult(
operation=operation,
success=False,
error="Gitea not configured (missing token or repo)",
)
client = self._get_client()
try:
resp = await client.request(method, path, **kwargs)
latency = (time.time() - start) * 1000
if resp.status_code >= 400:
error_body = resp.text[:500]
logger.warning(
"Gitea API %s returned %d: %s",
operation,
resp.status_code,
error_body,
)
return GiteaResult(
operation=operation,
success=False,
error=f"HTTP {resp.status_code}: {error_body}",
latency_ms=latency,
)
return GiteaResult(
operation=operation,
success=True,
data=resp.json() if resp.text else {},
latency_ms=latency,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Gitea API %s failed: %s", operation, exc)
return GiteaResult(
operation=operation,
success=False,
error=str(exc),
latency_ms=latency,
)
finally:
await client.aclose()
# ── Issue operations ─────────────────────────────────────────────────
async def create_issue(
self,
title: str,
body: str = "",
labels: list[str] | None = None,
) -> GiteaResult:
"""Create an issue in the configured repository.
Args:
title: Issue title (required).
body: Issue body in markdown.
labels: Optional list of label names (must exist in repo).
Returns:
GiteaResult with issue data (number, html_url, etc.).
"""
owner, repo = self._repo.split("/", 1)
payload: dict = {"title": title, "body": body}
# Resolve label names to IDs if provided
if labels:
label_ids = await self._resolve_label_ids(owner, repo, labels)
if label_ids:
payload["labels"] = label_ids
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues",
json=payload,
)
async def list_issues(
self,
state: str = "open",
labels: list[str] | None = None,
limit: int = 50,
) -> GiteaResult:
"""List issues in the configured repository.
Args:
state: Filter by state ("open", "closed", "all").
labels: Filter by label names.
limit: Max issues to return.
Returns:
GiteaResult with list of issue dicts.
"""
owner, repo = self._repo.split("/", 1)
params: dict = {"state": state, "limit": limit, "type": "issues"}
if labels:
params["labels"] = ",".join(labels)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues",
params=params,
)
async def get_issue(self, number: int) -> GiteaResult:
"""Get a single issue by number."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
)
async def add_comment(self, number: int, body: str) -> GiteaResult:
"""Add a comment to an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues/{number}/comments",
json={"body": body},
)
async def close_issue(self, number: int) -> GiteaResult:
"""Close an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"PATCH",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
json={"state": "closed"},
)
# ── Dedup helper ─────────────────────────────────────────────────────
async def find_duplicate(self, title: str, threshold: float = 0.6) -> dict | None:
"""Check if an open issue with a similar title already exists.
Returns the matching issue dict, or None if no duplicate found.
"""
result = await self.list_issues(state="open", limit=100)
if not result.success or not isinstance(result.data, list):
return None
for issue in result.data:
existing_title = issue.get("title", "")
if _title_similar(title, existing_title, threshold):
return issue
return None
# ── Label helper ─────────────────────────────────────────────────────
async def _resolve_label_ids(self, owner: str, repo: str, label_names: list[str]) -> list[int]:
"""Resolve label names to IDs. Returns empty list on failure."""
result = await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/labels",
)
if not result.success or not isinstance(result.data, list):
return []
name_to_id = {label["name"].lower(): label["id"] for label in result.data}
return [name_to_id[name.lower()] for name in label_names if name.lower() in name_to_id]
# ── Status ───────────────────────────────────────────────────────────
def info(self) -> dict:
"""Return a status summary for the dashboard."""
return {
"base_url": self._base_url,
"repo": self._repo,
"available": self.available,
"has_token": bool(self._token),
}
# ── Module-level singleton ──────────────────────────────────────────────────
gitea_hand = GiteaHand()