Files
Timmy-time-dashboard/src/timmy/vassal/agent_health.py
Claude (Opus 4.6) 495c1ac2bd
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
[claude] Fix 27 ruff lint errors blocking all pushes (#1149) (#1153)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 19:06:11 +00:00

289 lines
8.7 KiB
Python

"""Vassal Protocol — agent health monitoring.
Monitors whether downstream agents (Claude, Kimi) are making progress on
their assigned issues. Detects idle and stuck agents by querying Gitea
for issues with dispatch labels and checking last-comment timestamps.
Stuck agent heuristic
---------------------
An agent is considered "stuck" on an issue if:
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
- No new comment has appeared in the last ``stuck_threshold_minutes``
- The issue has not been closed
Idle agent heuristic
--------------------
An agent is "idle" if it has no currently assigned (labeled) open issues.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_AGENT_LABELS = {
"claude": "claude-ready",
"kimi": "kimi-ready",
}
_DEFAULT_STUCK_MINUTES = 120
_DEFAULT_IDLE_THRESHOLD = 30
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@property
def is_stuck(self) -> bool:
return bool(self.stuck_issue_numbers)
@property
def needs_reassignment(self) -> bool:
return self.is_stuck
@dataclass
class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@property
def any_stuck(self) -> bool:
return any(a.is_stuck for a in self.agents)
@property
def all_idle(self) -> bool:
return all(a.is_idle for a in self.agents)
def for_agent(self, name: str) -> AgentStatus | None:
for a in self.agents:
if a.agent == name:
return a
return None
# ---------------------------------------------------------------------------
# Gitea queries
# ---------------------------------------------------------------------------
async def _fetch_labeled_issues(
client: Any,
base_url: str,
headers: dict,
repo: str,
label: str,
) -> list[dict]:
"""Return open issues carrying a specific label."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "labels": label, "limit": 50},
)
if resp.status_code == 200:
return [i for i in resp.json() if not i.get("pull_request")]
except Exception as exc:
logger.warning("_fetch_labeled_issues: %s%s", label, exc)
return []
async def _last_comment_time(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
) -> datetime | None:
"""Return the timestamp of the most recent comment on an issue."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
params={"limit": 1},
)
if resp.status_code == 200:
comments = resp.json()
if comments:
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
if ts:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception as exc:
logger.debug("_last_comment_time: issue #%d%s", issue_number, exc)
return None
async def _issue_created_time(issue: dict) -> datetime | None:
ts = issue.get("created_at")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
async def check_agent_health(
agent_name: str,
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentStatus:
"""Query Gitea for issues assigned to *agent_name* and assess health.
Args:
agent_name: One of "claude", "kimi".
stuck_threshold_minutes: Minutes of silence before an issue is
considered stuck.
Returns:
AgentStatus for this agent.
"""
status = AgentStatus(agent=agent_name)
label = _AGENT_LABELS.get(agent_name)
if not label:
logger.debug("check_agent_health: unknown agent %s", agent_name)
return status
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("check_agent_health: missing dependency — %s", exc)
return status
if not settings.gitea_enabled or not settings.gitea_token:
return status
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(client, base_url, headers, repo, label)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(client, base_url, headers, repo, num)
if last_activity is None:
last_activity = await _issue_created_time(issue)
if last_activity is not None and last_activity < cutoff:
status.stuck_issue_numbers.append(num)
logger.info(
"check_agent_health: %s issue #%d stuck since %s",
agent_name,
num,
last_activity.isoformat(),
)
except Exception as exc:
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
status.is_idle = len(status.active_issue_numbers) == 0
return status
async def get_full_health_report(
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentHealthReport:
"""Run health checks for all monitored agents and return combined report.
Args:
stuck_threshold_minutes: Passed through to each agent check.
Returns:
AgentHealthReport with status for Claude and Kimi.
"""
import asyncio
claude_status, kimi_status = await asyncio.gather(
check_agent_health("claude", stuck_threshold_minutes),
check_agent_health("kimi", stuck_threshold_minutes),
)
return AgentHealthReport(agents=[claude_status, kimi_status])
async def nudge_stuck_agent(
agent_name: str,
issue_number: int,
) -> bool:
"""Post a nudge comment on a stuck issue to prompt the agent.
Args:
agent_name: The agent that appears stuck.
issue_number: The Gitea issue number to nudge.
Returns:
True if the comment was posted successfully.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
return False
if not settings.gitea_enabled or not settings.gitea_token:
return False
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
"Please post a status update or close if complete."
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
logger.info(
"nudge_stuck_agent: nudged %s on issue #%d",
agent_name,
issue_number,
)
return True
except Exception as exc:
logger.warning("nudge_stuck_agent: failed — %s", exc)
return False