From 128aa4427f120e121b9550f4a670508bbbc597fa Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Mon, 23 Mar 2026 18:33:15 +0000 Subject: [PATCH] =?UTF-8?q?[claude]=20Vassal=20Protocol=20=E2=80=94=20Timm?= =?UTF-8?q?y=20as=20autonomous=20orchestrator=20(#1070)=20(#1142)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.py | 9 + src/timmy/vassal/__init__.py | 21 ++ src/timmy/vassal/agent_health.py | 296 +++++++++++++++++ src/timmy/vassal/backlog.py | 281 ++++++++++++++++ src/timmy/vassal/dispatch.py | 213 ++++++++++++ src/timmy/vassal/house_health.py | 222 +++++++++++++ src/timmy/vassal/orchestration_loop.py | 321 +++++++++++++++++++ tests/unit/test_vassal_agent_health.py | 103 ++++++ tests/unit/test_vassal_backlog.py | 186 +++++++++++ tests/unit/test_vassal_dispatch.py | 114 +++++++ tests/unit/test_vassal_house_health.py | 116 +++++++ tests/unit/test_vassal_orchestration_loop.py | 139 ++++++++ 12 files changed, 2021 insertions(+) create mode 100644 src/timmy/vassal/__init__.py create mode 100644 src/timmy/vassal/agent_health.py create mode 100644 src/timmy/vassal/backlog.py create mode 100644 src/timmy/vassal/dispatch.py create mode 100644 src/timmy/vassal/house_health.py create mode 100644 src/timmy/vassal/orchestration_loop.py create mode 100644 tests/unit/test_vassal_agent_health.py create mode 100644 tests/unit/test_vassal_backlog.py create mode 100644 tests/unit/test_vassal_dispatch.py create mode 100644 tests/unit/test_vassal_house_health.py create mode 100644 tests/unit/test_vassal_orchestration_loop.py diff --git a/src/config.py b/src/config.py index 5b7b734..96177d8 100644 --- a/src/config.py +++ b/src/config.py @@ -321,6 +321,15 @@ class Settings(BaseSettings): loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task loop_qa_max_per_hour: int = 12 # safety throttle + # ── Vassal Protocol (Autonomous Orchestrator) ───────────────────── + # Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health. + # See timmy/vassal/ for implementation. + vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded + vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min) + vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle + vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck" + vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle" + # ── Paperclip AI — orchestration bridge ──────────────────────────── # URL where the Paperclip server listens. # For VPS deployment behind nginx, use the public domain. diff --git a/src/timmy/vassal/__init__.py b/src/timmy/vassal/__init__.py new file mode 100644 index 0000000..04317cd --- /dev/null +++ b/src/timmy/vassal/__init__.py @@ -0,0 +1,21 @@ +"""Vassal Protocol — Timmy as autonomous orchestrator. + +Timmy is Alex's vassal: the lead decision-maker for development direction, +agent management, and house health. He observes the Gitea backlog, decides +priorities, dispatches work to agents (Claude, Kimi, self), monitors output, +and keeps Hermes (M3 Max) running well. + +Public API +---------- + from timmy.vassal import vassal_orchestrator + + await vassal_orchestrator.run_cycle() + snapshot = vassal_orchestrator.get_status() +""" + +from timmy.vassal.orchestration_loop import VassalOrchestrator + +# Module-level singleton — import and use directly. +vassal_orchestrator = VassalOrchestrator() + +__all__ = ["VassalOrchestrator", "vassal_orchestrator"] diff --git a/src/timmy/vassal/agent_health.py b/src/timmy/vassal/agent_health.py new file mode 100644 index 0000000..d5796ac --- /dev/null +++ b/src/timmy/vassal/agent_health.py @@ -0,0 +1,296 @@ +"""Vassal Protocol — agent health monitoring. + +Monitors whether downstream agents (Claude, Kimi) are making progress on +their assigned issues. Detects idle and stuck agents by querying Gitea +for issues with dispatch labels and checking last-comment timestamps. + +Stuck agent heuristic +--------------------- +An agent is considered "stuck" on an issue if: + - The issue has been labeled ``claude-ready`` or ``kimi-ready`` + - No new comment has appeared in the last ``stuck_threshold_minutes`` + - The issue has not been closed + +Idle agent heuristic +-------------------- +An agent is "idle" if it has no currently assigned (labeled) open issues. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +_AGENT_LABELS = { + "claude": "claude-ready", + "kimi": "kimi-ready", +} + +_DEFAULT_STUCK_MINUTES = 120 +_DEFAULT_IDLE_THRESHOLD = 30 + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + + +@dataclass +class AgentStatus: + """Health snapshot for one agent at a point in time.""" + + agent: str # "claude" | "kimi" | "timmy" + is_idle: bool = True + active_issue_numbers: list[int] = field(default_factory=list) + stuck_issue_numbers: list[int] = field(default_factory=list) + checked_at: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + + @property + def is_stuck(self) -> bool: + return bool(self.stuck_issue_numbers) + + @property + def needs_reassignment(self) -> bool: + return self.is_stuck + + +@dataclass +class AgentHealthReport: + """Combined health report for all monitored agents.""" + + agents: list[AgentStatus] = field(default_factory=list) + generated_at: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + + @property + def any_stuck(self) -> bool: + return any(a.is_stuck for a in self.agents) + + @property + def all_idle(self) -> bool: + return all(a.is_idle for a in self.agents) + + def for_agent(self, name: str) -> AgentStatus | None: + for a in self.agents: + if a.agent == name: + return a + return None + + +# --------------------------------------------------------------------------- +# Gitea queries +# --------------------------------------------------------------------------- + + +async def _fetch_labeled_issues( + client: Any, + base_url: str, + headers: dict, + repo: str, + label: str, +) -> list[dict]: + """Return open issues carrying a specific label.""" + try: + resp = await client.get( + f"{base_url}/repos/{repo}/issues", + headers=headers, + params={"state": "open", "labels": label, "limit": 50}, + ) + if resp.status_code == 200: + return [i for i in resp.json() if not i.get("pull_request")] + except Exception as exc: + logger.warning("_fetch_labeled_issues: %s — %s", label, exc) + return [] + + +async def _last_comment_time( + client: Any, + base_url: str, + headers: dict, + repo: str, + issue_number: int, +) -> datetime | None: + """Return the timestamp of the most recent comment on an issue.""" + try: + resp = await client.get( + f"{base_url}/repos/{repo}/issues/{issue_number}/comments", + headers=headers, + params={"limit": 1}, + ) + if resp.status_code == 200: + comments = resp.json() + if comments: + ts = comments[-1].get("updated_at") or comments[-1].get("created_at") + if ts: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except Exception as exc: + logger.debug("_last_comment_time: issue #%d — %s", issue_number, exc) + return None + + +async def _issue_created_time(issue: dict) -> datetime | None: + ts = issue.get("created_at") + if ts: + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except ValueError: + pass + return None + + +# --------------------------------------------------------------------------- +# Health check +# --------------------------------------------------------------------------- + + +async def check_agent_health( + agent_name: str, + stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES, +) -> AgentStatus: + """Query Gitea for issues assigned to *agent_name* and assess health. + + Args: + agent_name: One of "claude", "kimi". + stuck_threshold_minutes: Minutes of silence before an issue is + considered stuck. + + Returns: + AgentStatus for this agent. + """ + status = AgentStatus(agent=agent_name) + + label = _AGENT_LABELS.get(agent_name) + if not label: + logger.debug("check_agent_health: unknown agent %s", agent_name) + return status + + try: + import httpx + + from config import settings + except ImportError as exc: + logger.warning("check_agent_health: missing dependency — %s", exc) + return status + + if not settings.gitea_enabled or not settings.gitea_token: + return status + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = {"Authorization": f"token {settings.gitea_token}"} + cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes) + + try: + async with httpx.AsyncClient(timeout=15) as client: + issues = await _fetch_labeled_issues( + client, base_url, headers, repo, label + ) + + for issue in issues: + num = issue.get("number", 0) + status.active_issue_numbers.append(num) + + # Check last activity + last_activity = await _last_comment_time( + client, base_url, headers, repo, num + ) + if last_activity is None: + last_activity = await _issue_created_time(issue) + + if last_activity is not None and last_activity < cutoff: + status.stuck_issue_numbers.append(num) + logger.info( + "check_agent_health: %s issue #%d stuck since %s", + agent_name, + num, + last_activity.isoformat(), + ) + except Exception as exc: + logger.warning("check_agent_health: %s query failed — %s", agent_name, exc) + + status.is_idle = len(status.active_issue_numbers) == 0 + return status + + +async def get_full_health_report( + stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES, +) -> AgentHealthReport: + """Run health checks for all monitored agents and return combined report. + + Args: + stuck_threshold_minutes: Passed through to each agent check. + + Returns: + AgentHealthReport with status for Claude and Kimi. + """ + import asyncio + + claude_status, kimi_status = await asyncio.gather( + check_agent_health("claude", stuck_threshold_minutes), + check_agent_health("kimi", stuck_threshold_minutes), + ) + return AgentHealthReport(agents=[claude_status, kimi_status]) + + +async def nudge_stuck_agent( + agent_name: str, + issue_number: int, +) -> bool: + """Post a nudge comment on a stuck issue to prompt the agent. + + Args: + agent_name: The agent that appears stuck. + issue_number: The Gitea issue number to nudge. + + Returns: + True if the comment was posted successfully. + """ + try: + import httpx + + from config import settings + except ImportError as exc: + logger.warning("nudge_stuck_agent: missing dependency — %s", exc) + return False + + if not settings.gitea_enabled or not settings.gitea_token: + return False + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + body = ( + f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n" + "Please post a status update or close if complete." + ) + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue_number}/comments", + headers=headers, + json={"body": body}, + ) + if resp.status_code in (200, 201): + logger.info( + "nudge_stuck_agent: nudged %s on issue #%d", + agent_name, + issue_number, + ) + return True + except Exception as exc: + logger.warning("nudge_stuck_agent: failed — %s", exc) + return False diff --git a/src/timmy/vassal/backlog.py b/src/timmy/vassal/backlog.py new file mode 100644 index 0000000..c24e851 --- /dev/null +++ b/src/timmy/vassal/backlog.py @@ -0,0 +1,281 @@ +"""Vassal Protocol — Gitea backlog triage. + +Fetches open issues from Gitea, scores each one for priority and agent +suitability, and returns a ranked list ready for dispatch. + +Complexity scoring heuristics +------------------------------ + high_complexity_keywords → route to Claude (architecture, refactor, review) + research_keywords → route to Kimi (survey, analysis, benchmark) + routine_keywords → route to Timmy/self (docs, chore, config) + otherwise → Timmy self-handles + +Priority scoring +---------------- + URGENT label → 100 + HIGH / critical → 75 + NORMAL (default) → 50 + LOW / chore → 25 + Already assigned → deprioritized (subtract 20) +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from enum import StrEnum +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# Labels that hint at complexity level / agent suitability +_HIGH_COMPLEXITY = frozenset( + { + "architecture", + "refactor", + "code review", + "security", + "performance", + "breaking change", + "design", + "complex", + } +) + +_RESEARCH_KEYWORDS = frozenset( + { + "research", + "survey", + "analysis", + "benchmark", + "comparative", + "investigation", + "deep dive", + "review", + } +) + +_ROUTINE_KEYWORDS = frozenset( + { + "docs", + "documentation", + "chore", + "config", + "typo", + "rename", + "cleanup", + "trivial", + "style", + } +) + +_PRIORITY_LABEL_SCORES: dict[str, int] = { + "urgent": 100, + "critical": 90, + "high": 75, + "normal": 50, + "low": 25, + "chore": 20, +} + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + + +class AgentTarget(StrEnum): + """Which agent should handle this issue.""" + + TIMMY = "timmy" # Timmy handles locally (self) + CLAUDE = "claude" # Dispatch to Claude Code + KIMI = "kimi" # Dispatch to Kimi Code + + +@dataclass +class TriagedIssue: + """A Gitea issue enriched with triage metadata.""" + + number: int + title: str + body: str + labels: list[str] = field(default_factory=list) + assignees: list[str] = field(default_factory=list) + priority_score: int = 50 + agent_target: AgentTarget = AgentTarget.TIMMY + rationale: str = "" + url: str = "" + raw: dict = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Scoring helpers +# --------------------------------------------------------------------------- + + +def _extract_labels(issue: dict[str, Any]) -> list[str]: + """Return normalised label names from a raw Gitea issue dict.""" + return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])] + + +def _score_priority(labels: list[str], assignees: list[str]) -> int: + score = _PRIORITY_LABEL_SCORES.get("normal", 50) + for lbl in labels: + for key, val in _PRIORITY_LABEL_SCORES.items(): + if key in lbl: + score = max(score, val) + if assignees: + score -= 20 # already assigned — lower urgency for fresh dispatch + return max(0, score) + + +def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]: + """Heuristic: pick the best agent and return (target, rationale).""" + combined = f"{title} {body} {' '.join(labels)}".lower() + + if any(kw in combined for kw in _HIGH_COMPLEXITY): + return AgentTarget.CLAUDE, "high-complexity keywords detected" + + if any(kw in combined for kw in _RESEARCH_KEYWORDS): + return AgentTarget.KIMI, "research keywords detected" + + if any(kw in combined for kw in _ROUTINE_KEYWORDS): + return AgentTarget.TIMMY, "routine task — Timmy self-handles" + + return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles" + + +# --------------------------------------------------------------------------- +# Triage +# --------------------------------------------------------------------------- + + +def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]: + """Score and route a list of raw Gitea issue dicts. + + Returns a list sorted by priority_score descending (highest first). + + Args: + raw_issues: List of issue objects from the Gitea API. + + Returns: + Sorted list of TriagedIssue with routing decisions. + """ + results: list[TriagedIssue] = [] + + for issue in raw_issues: + number = issue.get("number", 0) + title = issue.get("title", "") + body = issue.get("body") or "" + labels = _extract_labels(issue) + assignees = [ + a.get("login", "") for a in issue.get("assignees") or [] + ] + url = issue.get("html_url", "") + + priority = _score_priority(labels, assignees) + agent, rationale = _choose_agent(title, body, labels) + + results.append( + TriagedIssue( + number=number, + title=title, + body=body, + labels=labels, + assignees=assignees, + priority_score=priority, + agent_target=agent, + rationale=rationale, + url=url, + raw=issue, + ) + ) + + results.sort(key=lambda i: i.priority_score, reverse=True) + logger.debug( + "Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy", + len(results), + sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE), + sum(1 for i in results if i.agent_target == AgentTarget.KIMI), + sum(1 for i in results if i.agent_target == AgentTarget.TIMMY), + ) + return results + + +# --------------------------------------------------------------------------- +# Gitea fetch (async, gracefully degrading) +# --------------------------------------------------------------------------- + + +async def fetch_open_issues( + limit: int = 50, + exclude_labels: list[str] | None = None, +) -> list[dict[str, Any]]: + """Fetch open issues from the configured Gitea repo. + + Args: + limit: Maximum number of issues to return. + exclude_labels: Labels whose issues should be skipped + (e.g. ``["kimi-ready", "wip"]``). + + Returns: + List of raw issue dicts from the Gitea API, + or empty list if Gitea is unavailable. + """ + try: + import httpx + + from config import settings + except ImportError as exc: + logger.warning("fetch_open_issues: missing dependency — %s", exc) + return [] + + if not settings.gitea_enabled or not settings.gitea_token: + logger.info("fetch_open_issues: Gitea disabled or no token") + return [] + + exclude = set(lbl.lower() for lbl in (exclude_labels or [])) + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = {"Authorization": f"token {settings.gitea_token}"} + params = {"state": "open", "limit": min(limit, 50), "page": 1} + + try: + async with httpx.AsyncClient(timeout=15) as client: + resp = await client.get( + f"{base_url}/repos/{repo}/issues", + headers=headers, + params=params, + ) + if resp.status_code != 200: + logger.warning( + "fetch_open_issues: Gitea returned %s", resp.status_code + ) + return [] + + issues = resp.json() + + # Filter out pull requests and excluded labels + filtered = [] + for issue in issues: + if issue.get("pull_request"): + continue # skip PRs + labels = _extract_labels(issue) + if exclude and any(lbl in exclude for lbl in labels): + continue + filtered.append(issue) + + logger.info( + "fetch_open_issues: fetched %d/%d issues (after filtering)", + len(filtered), + len(issues), + ) + return filtered + + except Exception as exc: + logger.warning("fetch_open_issues: Gitea request failed — %s", exc) + return [] diff --git a/src/timmy/vassal/dispatch.py b/src/timmy/vassal/dispatch.py new file mode 100644 index 0000000..8f1b02c --- /dev/null +++ b/src/timmy/vassal/dispatch.py @@ -0,0 +1,213 @@ +"""Vassal Protocol — agent dispatch. + +Translates triage decisions into concrete Gitea actions: +- Add ``claude-ready`` or ``kimi-ready`` label to an issue +- Post a dispatch comment recording the routing rationale +- Record the dispatch in the in-memory registry so the orchestration loop + can track what was sent and when + +The dispatch registry is intentionally in-memory (ephemeral). Durable +tracking is out of scope for this module — that belongs in the task queue +or a future orchestration DB. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +from timmy.vassal.backlog import AgentTarget, TriagedIssue + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Label names used by the dispatch system +# --------------------------------------------------------------------------- + +_LABEL_MAP: dict[AgentTarget, str] = { + AgentTarget.CLAUDE: "claude-ready", + AgentTarget.KIMI: "kimi-ready", + AgentTarget.TIMMY: "timmy-ready", +} + +_LABEL_COLORS: dict[str, str] = { + "claude-ready": "#8b6f47", # warm brown + "kimi-ready": "#006b75", # dark teal + "timmy-ready": "#0075ca", # blue +} + + +# --------------------------------------------------------------------------- +# Dispatch registry +# --------------------------------------------------------------------------- + + +@dataclass +class DispatchRecord: + """A record of one issue being dispatched to an agent.""" + + issue_number: int + issue_title: str + agent: AgentTarget + rationale: str + dispatched_at: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + label_applied: bool = False + comment_posted: bool = False + + +# Module-level registry: issue_number → DispatchRecord +_registry: dict[int, DispatchRecord] = {} + + +def get_dispatch_registry() -> dict[int, DispatchRecord]: + """Return a copy of the current dispatch registry.""" + return dict(_registry) + + +def clear_dispatch_registry() -> None: + """Clear the dispatch registry (mainly for tests).""" + _registry.clear() + + +# --------------------------------------------------------------------------- +# Gitea helpers +# --------------------------------------------------------------------------- + + +async def _get_or_create_label( + client: Any, + base_url: str, + headers: dict, + repo: str, + label_name: str, +) -> int | None: + """Return the Gitea label ID, creating it if necessary.""" + labels_url = f"{base_url}/repos/{repo}/labels" + try: + resp = await client.get(labels_url, headers=headers) + if resp.status_code == 200: + for lbl in resp.json(): + if lbl.get("name") == label_name: + return lbl["id"] + except Exception as exc: + logger.warning("_get_or_create_label: list failed — %s", exc) + return None + + color = _LABEL_COLORS.get(label_name, "#cccccc") + try: + resp = await client.post( + labels_url, + headers={**headers, "Content-Type": "application/json"}, + json={"name": label_name, "color": color}, + ) + if resp.status_code in (200, 201): + return resp.json().get("id") + except Exception as exc: + logger.warning("_get_or_create_label: create failed — %s", exc) + + return None + + +# --------------------------------------------------------------------------- +# Dispatch action +# --------------------------------------------------------------------------- + + +async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord: + """Apply dispatch label and post a routing comment on the Gitea issue. + + Gracefully degrades: if Gitea is unavailable the record is still + created and returned (with label_applied=False, comment_posted=False). + + Args: + issue: A TriagedIssue with a routing decision. + + Returns: + DispatchRecord summarising what was done. + """ + record = DispatchRecord( + issue_number=issue.number, + issue_title=issue.title, + agent=issue.agent_target, + rationale=issue.rationale, + ) + + if issue.agent_target == AgentTarget.TIMMY: + # Self-dispatch: no label needed — Timmy will handle directly. + logger.info( + "dispatch_issue: #%d '%s' → Timmy (self, no label)", + issue.number, + issue.title[:50], + ) + _registry[issue.number] = record + return record + + try: + import httpx + + from config import settings + except ImportError as exc: + logger.warning("dispatch_issue: missing dependency — %s", exc) + _registry[issue.number] = record + return record + + if not settings.gitea_enabled or not settings.gitea_token: + logger.info("dispatch_issue: Gitea disabled — skipping label/comment") + _registry[issue.number] = record + return record + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + label_name = _LABEL_MAP[issue.agent_target] + + try: + async with httpx.AsyncClient(timeout=15) as client: + label_id = await _get_or_create_label( + client, base_url, headers, repo, label_name + ) + + # Apply label + if label_id is not None: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue.number}/labels", + headers=headers, + json={"labels": [label_id]}, + ) + record.label_applied = resp.status_code in (200, 201) + + # Post routing comment + agent_name = issue.agent_target.value.capitalize() + comment_body = ( + f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n" + f"Priority score: {issue.priority_score} \n" + f"Rationale: {issue.rationale} \n" + f"Label: `{label_name}`" + ) + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue.number}/comments", + headers=headers, + json={"body": comment_body}, + ) + record.comment_posted = resp.status_code in (200, 201) + + except Exception as exc: + logger.warning("dispatch_issue: Gitea action failed — %s", exc) + + _registry[issue.number] = record + logger.info( + "dispatch_issue: #%d '%s' → %s (label=%s comment=%s)", + issue.number, + issue.title[:50], + issue.agent_target, + record.label_applied, + record.comment_posted, + ) + return record diff --git a/src/timmy/vassal/house_health.py b/src/timmy/vassal/house_health.py new file mode 100644 index 0000000..24bdcf0 --- /dev/null +++ b/src/timmy/vassal/house_health.py @@ -0,0 +1,222 @@ +"""Vassal Protocol — Hermes house health monitoring. + +Monitors system resources on the M3 Max (Hermes) and Ollama model state. +Reports warnings when resources are tight and provides cleanup utilities. + +All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention. +""" + +from __future__ import annotations + +import asyncio +import logging +import shutil +from dataclasses import dataclass, field +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Thresholds +# --------------------------------------------------------------------------- + +_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full +_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used +_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained + + +# --------------------------------------------------------------------------- +# Data models +# --------------------------------------------------------------------------- + + +@dataclass +class DiskUsage: + path: str = "/" + total_gb: float = 0.0 + used_gb: float = 0.0 + free_gb: float = 0.0 + percent_used: float = 0.0 + + +@dataclass +class MemoryUsage: + total_gb: float = 0.0 + available_gb: float = 0.0 + percent_used: float = 0.0 + + +@dataclass +class OllamaHealth: + reachable: bool = False + loaded_models: list[str] = field(default_factory=list) + error: str = "" + + +@dataclass +class SystemSnapshot: + """Point-in-time snapshot of Hermes resource usage.""" + + disk: DiskUsage = field(default_factory=DiskUsage) + memory: MemoryUsage = field(default_factory=MemoryUsage) + ollama: OllamaHealth = field(default_factory=OllamaHealth) + warnings: list[str] = field(default_factory=list) + taken_at: str = field( + default_factory=lambda: datetime.now(UTC).isoformat() + ) + + @property + def healthy(self) -> bool: + return len(self.warnings) == 0 + + +# --------------------------------------------------------------------------- +# Resource probes (sync, run in threads) +# --------------------------------------------------------------------------- + + +def _probe_disk(path: str = "/") -> DiskUsage: + try: + usage = shutil.disk_usage(path) + total_gb = usage.total / 1e9 + used_gb = usage.used / 1e9 + free_gb = usage.free / 1e9 + pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0 + return DiskUsage( + path=path, + total_gb=round(total_gb, 2), + used_gb=round(used_gb, 2), + free_gb=round(free_gb, 2), + percent_used=round(pct, 1), + ) + except Exception as exc: + logger.debug("_probe_disk: %s", exc) + return DiskUsage(path=path) + + +def _probe_memory() -> MemoryUsage: + try: + import psutil # optional — gracefully degrade if absent + + vm = psutil.virtual_memory() + return MemoryUsage( + total_gb=round(vm.total / 1e9, 2), + available_gb=round(vm.available / 1e9, 2), + percent_used=round(vm.percent, 1), + ) + except ImportError: + logger.debug("_probe_memory: psutil not installed — skipping") + return MemoryUsage() + except Exception as exc: + logger.debug("_probe_memory: %s", exc) + return MemoryUsage() + + +def _probe_ollama_sync(ollama_url: str) -> OllamaHealth: + """Synchronous Ollama health probe — run in a thread.""" + try: + import urllib.request + import json + + url = ollama_url.rstrip("/") + "/api/tags" + with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310 + data = json.loads(resp.read()) + models = [m.get("name", "") for m in data.get("models", [])] + return OllamaHealth(reachable=True, loaded_models=models) + except Exception as exc: + return OllamaHealth(reachable=False, error=str(exc)[:120]) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +async def get_system_snapshot() -> SystemSnapshot: + """Collect a non-blocking snapshot of system resources. + + Uses asyncio.to_thread() for all blocking I/O per project convention. + + Returns: + SystemSnapshot with disk, memory, and Ollama status. + """ + from config import settings + + disk, memory, ollama = await asyncio.gather( + asyncio.to_thread(_probe_disk, "/"), + asyncio.to_thread(_probe_memory), + asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url), + ) + + warnings: list[str] = [] + + if disk.percent_used >= _WARN_DISK_PCT: + warnings.append( + f"Disk {disk.path}: {disk.percent_used:.0f}% used " + f"({disk.free_gb:.1f} GB free)" + ) + + if memory.percent_used >= _WARN_MEM_PCT: + warnings.append( + f"Memory: {memory.percent_used:.0f}% used " + f"({memory.available_gb:.1f} GB available)" + ) + + if not ollama.reachable: + warnings.append(f"Ollama unreachable: {ollama.error}") + + if warnings: + logger.warning("House health warnings: %s", "; ".join(warnings)) + + return SystemSnapshot( + disk=disk, + memory=memory, + ollama=ollama, + warnings=warnings, + ) + + +async def cleanup_stale_files( + temp_dirs: list[str] | None = None, + max_age_days: int = 7, +) -> dict[str, Any]: + """Remove files older than *max_age_days* from temp directories. + + Only removes files under safe temp paths (never project source). + + Args: + temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``. + max_age_days: Age threshold in days. + + Returns: + Dict with ``deleted_count`` and ``errors``. + """ + import time + + dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108 + cutoff = time.time() - max_age_days * 86400 + deleted = 0 + errors: list[str] = [] + + def _cleanup() -> None: + nonlocal deleted + for d in dirs: + p = Path(d) + if not p.exists(): + continue + for f in p.rglob("*"): + if f.is_file(): + try: + if f.stat().st_mtime < cutoff: + f.unlink() + deleted += 1 + except Exception as exc: + errors.append(str(exc)) + + await asyncio.to_thread(_cleanup) + logger.info( + "cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors) + ) + return {"deleted_count": deleted, "errors": errors} diff --git a/src/timmy/vassal/orchestration_loop.py b/src/timmy/vassal/orchestration_loop.py new file mode 100644 index 0000000..af0c8a8 --- /dev/null +++ b/src/timmy/vassal/orchestration_loop.py @@ -0,0 +1,321 @@ +"""Vassal Protocol — main orchestration loop. + +Ties the backlog, dispatch, agent health, and house health modules together +into a single ``VassalOrchestrator`` that can run as a background service. + +Each cycle: +1. Fetch open Gitea issues +2. Triage: score priority + route to agent +3. Dispatch: apply labels / post routing comments +4. Check agent health: nudge stuck agents +5. Check house health: log warnings, trigger cleanup if needed +6. Return a VassalCycleRecord summarising the cycle + +Usage:: + + from timmy.vassal import vassal_orchestrator + + record = await vassal_orchestrator.run_cycle() + status = vassal_orchestrator.get_status() +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass, field +from datetime import UTC, datetime +from typing import Any + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Cycle record +# --------------------------------------------------------------------------- + + +@dataclass +class VassalCycleRecord: + """Summary of one orchestration cycle.""" + + cycle_id: int + started_at: str + finished_at: str = "" + duration_ms: int = 0 + + issues_fetched: int = 0 + issues_dispatched: int = 0 + dispatched_to_claude: int = 0 + dispatched_to_kimi: int = 0 + dispatched_to_timmy: int = 0 + + stuck_agents: list[str] = field(default_factory=list) + nudges_sent: int = 0 + + house_warnings: list[str] = field(default_factory=list) + cleanup_deleted: int = 0 + + errors: list[str] = field(default_factory=list) + + @property + def healthy(self) -> bool: + return not self.errors and not self.house_warnings + + +# --------------------------------------------------------------------------- +# Orchestrator +# --------------------------------------------------------------------------- + + +class VassalOrchestrator: + """Timmy's autonomous orchestration engine. + + Runs observe → triage → dispatch → monitor → house-check cycles on a + configurable interval. + + Parameters + ---------- + cycle_interval: + Seconds between cycles. Defaults to ``settings.vassal_cycle_interval`` + when available, otherwise 300 s (5 min). + max_dispatch_per_cycle: + Cap on new dispatches per cycle to avoid spamming agents. + """ + + def __init__( + self, + cycle_interval: float | None = None, + max_dispatch_per_cycle: int = 10, + ) -> None: + self._cycle_count = 0 + self._running = False + self._task: asyncio.Task | None = None + self._max_dispatch = max_dispatch_per_cycle + self._history: list[VassalCycleRecord] = [] + + # Resolve interval — lazy to avoid import-time settings read + self._cycle_interval = cycle_interval + + # -- public API -------------------------------------------------------- + + @property + def cycle_count(self) -> int: + return self._cycle_count + + @property + def is_running(self) -> bool: + return self._running + + @property + def history(self) -> list[VassalCycleRecord]: + return list(self._history) + + def get_status(self) -> dict[str, Any]: + """Return a JSON-serialisable status dict.""" + last = self._history[-1] if self._history else None + return { + "running": self._running, + "cycle_count": self._cycle_count, + "last_cycle": { + "cycle_id": last.cycle_id, + "started_at": last.started_at, + "issues_fetched": last.issues_fetched, + "issues_dispatched": last.issues_dispatched, + "stuck_agents": last.stuck_agents, + "house_warnings": last.house_warnings, + "healthy": last.healthy, + } + if last + else None, + } + + # -- single cycle ------------------------------------------------------ + + async def run_cycle(self) -> VassalCycleRecord: + """Execute one full orchestration cycle. + + Gracefully degrades at each step — a failure in one sub-task does + not abort the rest of the cycle. + + Returns: + VassalCycleRecord summarising what happened. + """ + self._cycle_count += 1 + start = time.monotonic() + record = VassalCycleRecord( + cycle_id=self._cycle_count, + started_at=datetime.now(UTC).isoformat(), + ) + + # 1 + 2: Fetch & triage + await self._step_backlog(record) + + # 3: Agent health + await self._step_agent_health(record) + + # 4: House health + await self._step_house_health(record) + + # Finalise record + record.finished_at = datetime.now(UTC).isoformat() + record.duration_ms = int((time.monotonic() - start) * 1000) + self._history.append(record) + + # Broadcast via WebSocket (best-effort) + await self._broadcast(record) + + logger.info( + "VassalOrchestrator cycle #%d complete (%d ms): " + "fetched=%d dispatched=%d stuck=%s house_ok=%s", + record.cycle_id, + record.duration_ms, + record.issues_fetched, + record.issues_dispatched, + record.stuck_agents or "none", + not record.house_warnings, + ) + return record + + # -- background loop --------------------------------------------------- + + async def start(self) -> None: + """Start the recurring orchestration loop as a background task.""" + if self._running: + logger.warning("VassalOrchestrator already running") + return + self._running = True + self._task = asyncio.ensure_future(self._loop()) + + def stop(self) -> None: + """Signal the loop to stop after the current cycle.""" + self._running = False + if self._task and not self._task.done(): + self._task.cancel() + logger.info("VassalOrchestrator stop requested") + + async def _loop(self) -> None: + interval = self._resolve_interval() + logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval) + while self._running: + try: + await self.run_cycle() + except Exception: + logger.exception("VassalOrchestrator cycle failed") + await asyncio.sleep(interval) + + # -- step: backlog ------------------------------------------------------- + + async def _step_backlog(self, record: VassalCycleRecord) -> None: + from timmy.vassal.backlog import fetch_open_issues, triage_issues + from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry + + try: + raw_issues = await fetch_open_issues( + limit=50, + exclude_labels=["wip", "blocked", "needs-info"], + ) + record.issues_fetched = len(raw_issues) + + if not raw_issues: + return + + triaged = triage_issues(raw_issues) + registry = get_dispatch_registry() + + dispatched = 0 + for issue in triaged: + if dispatched >= self._max_dispatch: + break + # Skip already-dispatched issues + if issue.number in registry: + continue + await dispatch_issue(issue) + dispatched += 1 + + from timmy.vassal.backlog import AgentTarget + + if issue.agent_target == AgentTarget.CLAUDE: + record.dispatched_to_claude += 1 + elif issue.agent_target == AgentTarget.KIMI: + record.dispatched_to_kimi += 1 + else: + record.dispatched_to_timmy += 1 + + record.issues_dispatched = dispatched + + except Exception as exc: + logger.exception("_step_backlog failed") + record.errors.append(f"backlog: {exc}") + + # -- step: agent health ------------------------------------------------- + + async def _step_agent_health(self, record: VassalCycleRecord) -> None: + from config import settings + from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent + + try: + threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120) + report = await get_full_health_report(stuck_threshold_minutes=threshold) + + for agent_status in report.agents: + if agent_status.is_stuck: + record.stuck_agents.append(agent_status.agent) + for issue_num in agent_status.stuck_issue_numbers: + ok = await nudge_stuck_agent(agent_status.agent, issue_num) + if ok: + record.nudges_sent += 1 + + except Exception as exc: + logger.exception("_step_agent_health failed") + record.errors.append(f"agent_health: {exc}") + + # -- step: house health ------------------------------------------------- + + async def _step_house_health(self, record: VassalCycleRecord) -> None: + from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot + + try: + snapshot = await get_system_snapshot() + record.house_warnings = snapshot.warnings + + # Auto-cleanup temp files when disk is getting tight + if snapshot.disk.percent_used >= 80.0: + result = await cleanup_stale_files(max_age_days=3) + record.cleanup_deleted = result.get("deleted_count", 0) + + except Exception as exc: + logger.exception("_step_house_health failed") + record.errors.append(f"house_health: {exc}") + + # -- helpers ------------------------------------------------------------ + + def _resolve_interval(self) -> float: + if self._cycle_interval is not None: + return self._cycle_interval + try: + from config import settings + + return float(getattr(settings, "vassal_cycle_interval", 300)) + except Exception: + return 300.0 + + async def _broadcast(self, record: VassalCycleRecord) -> None: + try: + from infrastructure.ws_manager.handler import ws_manager + + await ws_manager.broadcast( + "vassal.cycle", + { + "cycle_id": record.cycle_id, + "started_at": record.started_at, + "issues_fetched": record.issues_fetched, + "issues_dispatched": record.issues_dispatched, + "stuck_agents": record.stuck_agents, + "house_warnings": record.house_warnings, + "duration_ms": record.duration_ms, + "healthy": record.healthy, + }, + ) + except Exception as exc: + logger.debug("VassalOrchestrator broadcast skipped: %s", exc) diff --git a/tests/unit/test_vassal_agent_health.py b/tests/unit/test_vassal_agent_health.py new file mode 100644 index 0000000..299281f --- /dev/null +++ b/tests/unit/test_vassal_agent_health.py @@ -0,0 +1,103 @@ +"""Unit tests for timmy.vassal.agent_health.""" + +from __future__ import annotations + +import pytest + +from timmy.vassal.agent_health import AgentHealthReport, AgentStatus + + +# --------------------------------------------------------------------------- +# AgentStatus +# --------------------------------------------------------------------------- + + +def test_agent_status_idle_default(): + s = AgentStatus(agent="claude") + assert s.is_idle is True + assert s.is_stuck is False + assert s.needs_reassignment is False + + +def test_agent_status_active(): + s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11]) + s.is_idle = len(s.active_issue_numbers) == 0 + assert s.is_idle is False + + +def test_agent_status_stuck(): + s = AgentStatus( + agent="claude", + active_issue_numbers=[7], + stuck_issue_numbers=[7], + is_idle=False, + ) + assert s.is_stuck is True + assert s.needs_reassignment is True + + +# --------------------------------------------------------------------------- +# AgentHealthReport +# --------------------------------------------------------------------------- + + +def test_report_any_stuck(): + claude = AgentStatus(agent="claude", stuck_issue_numbers=[3]) + kimi = AgentStatus(agent="kimi") + report = AgentHealthReport(agents=[claude, kimi]) + assert report.any_stuck is True + + +def test_report_all_idle(): + report = AgentHealthReport( + agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")] + ) + assert report.all_idle is True + + +def test_report_for_agent_found(): + kimi = AgentStatus(agent="kimi", active_issue_numbers=[42]) + report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi]) + found = report.for_agent("kimi") + assert found is kimi + + +def test_report_for_agent_not_found(): + report = AgentHealthReport(agents=[AgentStatus(agent="claude")]) + assert report.for_agent("timmy") is None + + +# --------------------------------------------------------------------------- +# check_agent_health — no Gitea in unit tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_check_agent_health_unknown_agent(): + """Unknown agent name returns idle status without error.""" + from timmy.vassal.agent_health import check_agent_health + + status = await check_agent_health("unknown-bot") + assert status.agent == "unknown-bot" + assert status.is_idle is True + + +@pytest.mark.asyncio +async def test_check_agent_health_no_token(): + """Returns idle status gracefully when Gitea token is absent.""" + from timmy.vassal.agent_health import check_agent_health + + status = await check_agent_health("claude") + # Should not raise; returns idle (no active issues discovered) + assert isinstance(status, AgentStatus) + assert status.agent == "claude" + + +@pytest.mark.asyncio +async def test_get_full_health_report_returns_both_agents(): + from timmy.vassal.agent_health import get_full_health_report + + report = await get_full_health_report() + agent_names = {a.agent for a in report.agents} + assert "claude" in agent_names + assert "kimi" in agent_names diff --git a/tests/unit/test_vassal_backlog.py b/tests/unit/test_vassal_backlog.py new file mode 100644 index 0000000..b37ed95 --- /dev/null +++ b/tests/unit/test_vassal_backlog.py @@ -0,0 +1,186 @@ +"""Unit tests for timmy.vassal.backlog — triage and fetch helpers.""" + +from __future__ import annotations + +import pytest + +from timmy.vassal.backlog import ( + AgentTarget, + TriagedIssue, + _choose_agent, + _extract_labels, + _score_priority, + triage_issues, +) + + +# --------------------------------------------------------------------------- +# _extract_labels +# --------------------------------------------------------------------------- + + +def test_extract_labels_empty(): + assert _extract_labels({}) == [] + + +def test_extract_labels_normalises_case(): + issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]} + assert _extract_labels(issue) == ["high", "feature"] + + +# --------------------------------------------------------------------------- +# _score_priority +# --------------------------------------------------------------------------- + + +def test_priority_urgent(): + assert _score_priority(["urgent"], []) == 100 + + +def test_priority_high(): + assert _score_priority(["high"], []) == 75 + + +def test_priority_normal_default(): + assert _score_priority([], []) == 50 + + +def test_priority_assigned_penalised(): + # already assigned → subtract 20 + score = _score_priority([], ["some-agent"]) + assert score == 30 + + +def test_priority_label_substring_match(): + # "critical" contains "critical" → 90 + assert _score_priority(["critical-bug"], []) == 90 + + +# --------------------------------------------------------------------------- +# _choose_agent +# --------------------------------------------------------------------------- + + +def test_choose_claude_for_architecture(): + target, rationale = _choose_agent("Refactor auth middleware", "", []) + assert target == AgentTarget.CLAUDE + assert "complex" in rationale or "high-complexity" in rationale + + +def test_choose_kimi_for_research(): + target, rationale = _choose_agent("Deep research on embedding models", "", []) + assert target == AgentTarget.KIMI + + +def test_choose_timmy_for_docs(): + target, rationale = _choose_agent("Update documentation for CLI", "", []) + assert target == AgentTarget.TIMMY + + +def test_choose_timmy_default(): + target, rationale = _choose_agent("Fix typo in README", "simple change", []) + # Could route to timmy (docs/trivial) or default — either is valid + assert isinstance(target, AgentTarget) + + +def test_choose_agent_label_wins(): + # "security" label → Claude + target, _ = _choose_agent("Login page", "", ["security"]) + assert target == AgentTarget.CLAUDE + + +# --------------------------------------------------------------------------- +# triage_issues +# --------------------------------------------------------------------------- + + +def _make_raw_issue( + number: int, + title: str, + body: str = "", + labels: list[str] | None = None, + assignees: list[str] | None = None, +) -> dict: + return { + "number": number, + "title": title, + "body": body, + "labels": [{"name": lbl} for lbl in (labels or [])], + "assignees": [{"login": a} for a in (assignees or [])], + "html_url": f"http://gitea/issues/{number}", + } + + +def test_triage_returns_sorted_by_priority(): + issues = [ + _make_raw_issue(1, "Routine docs update", labels=["docs"]), + _make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]), + _make_raw_issue(3, "Normal feature", labels=[]), + ] + triaged = triage_issues(issues) + # Highest priority first + assert triaged[0].number == 2 + assert triaged[0].priority_score == 100 # urgent label + + +def test_triage_prs_can_be_included(): + # triage_issues does not filter PRs — that's fetch_open_issues's job + issues = [_make_raw_issue(10, "A PR-like issue")] + triaged = triage_issues(issues) + assert len(triaged) == 1 + + +def test_triage_empty(): + assert triage_issues([]) == [] + + +def test_triage_routing(): + issues = [ + _make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"), + _make_raw_issue(2, "Refactor agent loader", body="architecture change"), + _make_raw_issue(3, "Fix typo in docs", labels=["docs"]), + ] + triaged = {i.number: i for i in triage_issues(issues)} + + assert triaged[1].agent_target == AgentTarget.KIMI + assert triaged[2].agent_target == AgentTarget.CLAUDE + assert triaged[3].agent_target == AgentTarget.TIMMY + + +def test_triage_preserves_url(): + issues = [_make_raw_issue(42, "Some issue")] + triaged = triage_issues(issues) + assert triaged[0].url == "http://gitea/issues/42" + + +# --------------------------------------------------------------------------- +# fetch_open_issues — no Gitea available in unit tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch): + """When Gitea is disabled, fetch returns [] without raising.""" + import timmy.vassal.backlog as bl + + # Patch settings + class FakeSettings: + gitea_enabled = False + gitea_token = "" + gitea_url = "http://localhost:3000" + gitea_repo = "owner/repo" + + monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import + + # We can't easily monkeypatch `from config import settings` inside the function, + # so test the no-token path via environment + import os + + original = os.environ.pop("GITEA_TOKEN", None) + try: + result = await bl.fetch_open_issues() + # Should return [] gracefully (no token configured by default in test env) + assert isinstance(result, list) + finally: + if original is not None: + os.environ["GITEA_TOKEN"] = original diff --git a/tests/unit/test_vassal_dispatch.py b/tests/unit/test_vassal_dispatch.py new file mode 100644 index 0000000..a75caec --- /dev/null +++ b/tests/unit/test_vassal_dispatch.py @@ -0,0 +1,114 @@ +"""Unit tests for timmy.vassal.dispatch — routing and label helpers.""" + +from __future__ import annotations + +import pytest + +from timmy.vassal.backlog import AgentTarget, TriagedIssue +from timmy.vassal.dispatch import ( + DispatchRecord, + clear_dispatch_registry, + get_dispatch_registry, +) + + +def _make_triaged( + number: int, + title: str, + agent: AgentTarget, + priority: int = 50, +) -> TriagedIssue: + return TriagedIssue( + number=number, + title=title, + body="", + agent_target=agent, + priority_score=priority, + rationale="test rationale", + url=f"http://gitea/issues/{number}", + ) + + +# --------------------------------------------------------------------------- +# Registry helpers +# --------------------------------------------------------------------------- + + +def test_registry_starts_empty(): + clear_dispatch_registry() + assert get_dispatch_registry() == {} + + +def test_registry_returns_copy(): + clear_dispatch_registry() + reg = get_dispatch_registry() + reg[999] = None # type: ignore[assignment] + assert 999 not in get_dispatch_registry() + + +# --------------------------------------------------------------------------- +# dispatch_issue — Timmy self-dispatch (no Gitea required) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_dispatch_timmy_self_no_gitea(): + """Timmy self-dispatch records without hitting Gitea.""" + clear_dispatch_registry() + + issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY) + from timmy.vassal.dispatch import dispatch_issue + + record = await dispatch_issue(issue) + + assert isinstance(record, DispatchRecord) + assert record.issue_number == 1 + assert record.agent == AgentTarget.TIMMY + assert 1 in get_dispatch_registry() + + +@pytest.mark.asyncio +async def test_dispatch_claude_no_gitea_token(): + """Claude dispatch gracefully degrades when Gitea token is absent.""" + clear_dispatch_registry() + + issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE) + from timmy.vassal.dispatch import dispatch_issue + + record = await dispatch_issue(issue) + + assert record.issue_number == 2 + assert record.agent == AgentTarget.CLAUDE + # label/comment not applied — no token + assert record.label_applied is False + assert 2 in get_dispatch_registry() + + +@pytest.mark.asyncio +async def test_dispatch_kimi_no_gitea_token(): + clear_dispatch_registry() + + issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI) + from timmy.vassal.dispatch import dispatch_issue + + record = await dispatch_issue(issue) + + assert record.agent == AgentTarget.KIMI + assert record.label_applied is False + + +# --------------------------------------------------------------------------- +# DispatchRecord fields +# --------------------------------------------------------------------------- + + +def test_dispatch_record_defaults(): + r = DispatchRecord( + issue_number=5, + issue_title="Test issue", + agent=AgentTarget.TIMMY, + rationale="because", + ) + assert r.label_applied is False + assert r.comment_posted is False + assert r.dispatched_at # has a timestamp diff --git a/tests/unit/test_vassal_house_health.py b/tests/unit/test_vassal_house_health.py new file mode 100644 index 0000000..a9241e6 --- /dev/null +++ b/tests/unit/test_vassal_house_health.py @@ -0,0 +1,116 @@ +"""Unit tests for timmy.vassal.house_health.""" + +from __future__ import annotations + +import pytest + +from timmy.vassal.house_health import ( + DiskUsage, + MemoryUsage, + OllamaHealth, + SystemSnapshot, + _probe_disk, +) + + +# --------------------------------------------------------------------------- +# Data model tests +# --------------------------------------------------------------------------- + + +def test_system_snapshot_healthy_when_no_warnings(): + snap = SystemSnapshot() + assert snap.healthy is True + + +def test_system_snapshot_unhealthy_with_warnings(): + snap = SystemSnapshot(warnings=["disk 90% full"]) + assert snap.healthy is False + + +def test_disk_usage_defaults(): + d = DiskUsage() + assert d.percent_used == 0.0 + assert d.path == "/" + + +def test_memory_usage_defaults(): + m = MemoryUsage() + assert m.percent_used == 0.0 + + +def test_ollama_health_defaults(): + o = OllamaHealth() + assert o.reachable is False + assert o.loaded_models == [] + + +# --------------------------------------------------------------------------- +# _probe_disk — runs against real filesystem +# --------------------------------------------------------------------------- + + +def test_probe_disk_root(): + result = _probe_disk("/") + assert result.total_gb > 0 + assert 0.0 <= result.percent_used <= 100.0 + assert result.free_gb >= 0 + + +def test_probe_disk_bad_path(): + result = _probe_disk("/nonexistent_path_xyz") + # Should not raise — returns zeroed DiskUsage + assert result.percent_used == 0.0 + + +# --------------------------------------------------------------------------- +# get_system_snapshot — async +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_system_snapshot_returns_snapshot(): + from timmy.vassal.house_health import get_system_snapshot + + snap = await get_system_snapshot() + assert isinstance(snap, SystemSnapshot) + # Disk is always probed + assert snap.disk.total_gb >= 0 + # Ollama is likely unreachable in test env — that's fine + assert isinstance(snap.ollama, OllamaHealth) + + +@pytest.mark.asyncio +async def test_get_system_snapshot_disk_warning(monkeypatch): + """When disk is above threshold, a warning is generated.""" + import timmy.vassal.house_health as hh + + # Patch _probe_disk to return high usage + def _full_disk(path: str) -> DiskUsage: + return DiskUsage( + path=path, + total_gb=100.0, + used_gb=90.0, + free_gb=10.0, + percent_used=90.0, + ) + + monkeypatch.setattr(hh, "_probe_disk", _full_disk) + + snap = await hh.get_system_snapshot() + assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings) + + +# --------------------------------------------------------------------------- +# cleanup_stale_files — temp dir test +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_cleanup_stale_files_missing_dir(): + """Should not raise when the target dir doesn't exist.""" + from timmy.vassal.house_health import cleanup_stale_files + + result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"]) + assert result["deleted_count"] == 0 + assert result["errors"] == [] diff --git a/tests/unit/test_vassal_orchestration_loop.py b/tests/unit/test_vassal_orchestration_loop.py new file mode 100644 index 0000000..c6dd659 --- /dev/null +++ b/tests/unit/test_vassal_orchestration_loop.py @@ -0,0 +1,139 @@ +"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator.""" + +from __future__ import annotations + +import pytest + +from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator + + +# --------------------------------------------------------------------------- +# VassalCycleRecord +# --------------------------------------------------------------------------- + + +def test_cycle_record_healthy_when_no_errors(): + r = VassalCycleRecord( + cycle_id=1, + started_at="2026-01-01T00:00:00+00:00", + ) + assert r.healthy is True + + +def test_cycle_record_unhealthy_with_errors(): + r = VassalCycleRecord( + cycle_id=1, + started_at="2026-01-01T00:00:00+00:00", + errors=["backlog: connection refused"], + ) + assert r.healthy is False + + +def test_cycle_record_unhealthy_with_warnings(): + r = VassalCycleRecord( + cycle_id=1, + started_at="2026-01-01T00:00:00+00:00", + house_warnings=["disk 90% full"], + ) + assert r.healthy is False + + +# --------------------------------------------------------------------------- +# VassalOrchestrator state +# --------------------------------------------------------------------------- + + +def test_orchestrator_initial_state(): + orch = VassalOrchestrator() + assert orch.cycle_count == 0 + assert orch.is_running is False + assert orch.history == [] + + +def test_orchestrator_get_status_no_cycles(): + orch = VassalOrchestrator() + status = orch.get_status() + assert status["running"] is False + assert status["cycle_count"] == 0 + assert status["last_cycle"] is None + + +# --------------------------------------------------------------------------- +# run_cycle — integration (no Gitea, no Ollama in test env) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_run_cycle_completes_without_services(): + """run_cycle must complete and record even when external services are down.""" + from timmy.vassal.dispatch import clear_dispatch_registry + + clear_dispatch_registry() + orch = VassalOrchestrator(cycle_interval=300) + + record = await orch.run_cycle() + + assert isinstance(record, VassalCycleRecord) + assert record.cycle_id == 1 + assert record.finished_at # was set + assert record.duration_ms >= 0 + # No Gitea → fetched = 0, dispatched = 0 + assert record.issues_fetched == 0 + assert record.issues_dispatched == 0 + # History updated + assert len(orch.history) == 1 + assert orch.cycle_count == 1 + + +@pytest.mark.asyncio +async def test_run_cycle_increments_cycle_count(): + from timmy.vassal.dispatch import clear_dispatch_registry + + clear_dispatch_registry() + orch = VassalOrchestrator() + + await orch.run_cycle() + await orch.run_cycle() + + assert orch.cycle_count == 2 + assert len(orch.history) == 2 + + +@pytest.mark.asyncio +async def test_get_status_after_cycle(): + from timmy.vassal.dispatch import clear_dispatch_registry + + clear_dispatch_registry() + orch = VassalOrchestrator() + + await orch.run_cycle() + status = orch.get_status() + + assert status["cycle_count"] == 1 + last = status["last_cycle"] + assert last is not None + assert last["cycle_id"] == 1 + assert last["issues_fetched"] == 0 + + +# --------------------------------------------------------------------------- +# start / stop +# --------------------------------------------------------------------------- + + +def test_orchestrator_stop_when_not_running(): + """stop() on an idle orchestrator must not raise.""" + orch = VassalOrchestrator() + orch.stop() # should be a no-op + assert orch.is_running is False + + +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + + +def test_module_singleton_exists(): + from timmy.vassal import vassal_orchestrator, VassalOrchestrator + + assert isinstance(vassal_orchestrator, VassalOrchestrator)