1
0

[claude] Vassal Protocol — Timmy as autonomous orchestrator (#1070) (#1142)

This commit is contained in:
2026-03-23 18:33:15 +00:00
parent 4f8e86348c
commit 128aa4427f
12 changed files with 2021 additions and 0 deletions

View File

@@ -321,6 +321,15 @@ class Settings(BaseSettings):
loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
loop_qa_max_per_hour: int = 12 # safety throttle
# ── Vassal Protocol (Autonomous Orchestrator) ─────────────────────
# Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health.
# See timmy/vassal/ for implementation.
vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded
vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min)
vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle
vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck"
vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle"
# ── Paperclip AI — orchestration bridge ────────────────────────────
# URL where the Paperclip server listens.
# For VPS deployment behind nginx, use the public domain.

View File

@@ -0,0 +1,21 @@
"""Vassal Protocol — Timmy as autonomous orchestrator.
Timmy is Alex's vassal: the lead decision-maker for development direction,
agent management, and house health. He observes the Gitea backlog, decides
priorities, dispatches work to agents (Claude, Kimi, self), monitors output,
and keeps Hermes (M3 Max) running well.
Public API
----------
from timmy.vassal import vassal_orchestrator
await vassal_orchestrator.run_cycle()
snapshot = vassal_orchestrator.get_status()
"""
from timmy.vassal.orchestration_loop import VassalOrchestrator
# Module-level singleton — import and use directly.
vassal_orchestrator = VassalOrchestrator()
__all__ = ["VassalOrchestrator", "vassal_orchestrator"]

View File

@@ -0,0 +1,296 @@
"""Vassal Protocol — agent health monitoring.
Monitors whether downstream agents (Claude, Kimi) are making progress on
their assigned issues. Detects idle and stuck agents by querying Gitea
for issues with dispatch labels and checking last-comment timestamps.
Stuck agent heuristic
---------------------
An agent is considered "stuck" on an issue if:
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
- No new comment has appeared in the last ``stuck_threshold_minutes``
- The issue has not been closed
Idle agent heuristic
--------------------
An agent is "idle" if it has no currently assigned (labeled) open issues.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_AGENT_LABELS = {
"claude": "claude-ready",
"kimi": "kimi-ready",
}
_DEFAULT_STUCK_MINUTES = 120
_DEFAULT_IDLE_THRESHOLD = 30
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def is_stuck(self) -> bool:
return bool(self.stuck_issue_numbers)
@property
def needs_reassignment(self) -> bool:
return self.is_stuck
@dataclass
class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def any_stuck(self) -> bool:
return any(a.is_stuck for a in self.agents)
@property
def all_idle(self) -> bool:
return all(a.is_idle for a in self.agents)
def for_agent(self, name: str) -> AgentStatus | None:
for a in self.agents:
if a.agent == name:
return a
return None
# ---------------------------------------------------------------------------
# Gitea queries
# ---------------------------------------------------------------------------
async def _fetch_labeled_issues(
client: Any,
base_url: str,
headers: dict,
repo: str,
label: str,
) -> list[dict]:
"""Return open issues carrying a specific label."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "labels": label, "limit": 50},
)
if resp.status_code == 200:
return [i for i in resp.json() if not i.get("pull_request")]
except Exception as exc:
logger.warning("_fetch_labeled_issues: %s%s", label, exc)
return []
async def _last_comment_time(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
) -> datetime | None:
"""Return the timestamp of the most recent comment on an issue."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
params={"limit": 1},
)
if resp.status_code == 200:
comments = resp.json()
if comments:
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
if ts:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception as exc:
logger.debug("_last_comment_time: issue #%d%s", issue_number, exc)
return None
async def _issue_created_time(issue: dict) -> datetime | None:
ts = issue.get("created_at")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
async def check_agent_health(
agent_name: str,
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentStatus:
"""Query Gitea for issues assigned to *agent_name* and assess health.
Args:
agent_name: One of "claude", "kimi".
stuck_threshold_minutes: Minutes of silence before an issue is
considered stuck.
Returns:
AgentStatus for this agent.
"""
status = AgentStatus(agent=agent_name)
label = _AGENT_LABELS.get(agent_name)
if not label:
logger.debug("check_agent_health: unknown agent %s", agent_name)
return status
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("check_agent_health: missing dependency — %s", exc)
return status
if not settings.gitea_enabled or not settings.gitea_token:
return status
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(
client, base_url, headers, repo, label
)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(
client, base_url, headers, repo, num
)
if last_activity is None:
last_activity = await _issue_created_time(issue)
if last_activity is not None and last_activity < cutoff:
status.stuck_issue_numbers.append(num)
logger.info(
"check_agent_health: %s issue #%d stuck since %s",
agent_name,
num,
last_activity.isoformat(),
)
except Exception as exc:
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
status.is_idle = len(status.active_issue_numbers) == 0
return status
async def get_full_health_report(
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentHealthReport:
"""Run health checks for all monitored agents and return combined report.
Args:
stuck_threshold_minutes: Passed through to each agent check.
Returns:
AgentHealthReport with status for Claude and Kimi.
"""
import asyncio
claude_status, kimi_status = await asyncio.gather(
check_agent_health("claude", stuck_threshold_minutes),
check_agent_health("kimi", stuck_threshold_minutes),
)
return AgentHealthReport(agents=[claude_status, kimi_status])
async def nudge_stuck_agent(
agent_name: str,
issue_number: int,
) -> bool:
"""Post a nudge comment on a stuck issue to prompt the agent.
Args:
agent_name: The agent that appears stuck.
issue_number: The Gitea issue number to nudge.
Returns:
True if the comment was posted successfully.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
return False
if not settings.gitea_enabled or not settings.gitea_token:
return False
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
"Please post a status update or close if complete."
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
logger.info(
"nudge_stuck_agent: nudged %s on issue #%d",
agent_name,
issue_number,
)
return True
except Exception as exc:
logger.warning("nudge_stuck_agent: failed — %s", exc)
return False

281
src/timmy/vassal/backlog.py Normal file
View File

@@ -0,0 +1,281 @@
"""Vassal Protocol — Gitea backlog triage.
Fetches open issues from Gitea, scores each one for priority and agent
suitability, and returns a ranked list ready for dispatch.
Complexity scoring heuristics
------------------------------
high_complexity_keywords → route to Claude (architecture, refactor, review)
research_keywords → route to Kimi (survey, analysis, benchmark)
routine_keywords → route to Timmy/self (docs, chore, config)
otherwise → Timmy self-handles
Priority scoring
----------------
URGENT label → 100
HIGH / critical → 75
NORMAL (default) → 50
LOW / chore → 25
Already assigned → deprioritized (subtract 20)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Labels that hint at complexity level / agent suitability
_HIGH_COMPLEXITY = frozenset(
{
"architecture",
"refactor",
"code review",
"security",
"performance",
"breaking change",
"design",
"complex",
}
)
_RESEARCH_KEYWORDS = frozenset(
{
"research",
"survey",
"analysis",
"benchmark",
"comparative",
"investigation",
"deep dive",
"review",
}
)
_ROUTINE_KEYWORDS = frozenset(
{
"docs",
"documentation",
"chore",
"config",
"typo",
"rename",
"cleanup",
"trivial",
"style",
}
)
_PRIORITY_LABEL_SCORES: dict[str, int] = {
"urgent": 100,
"critical": 90,
"high": 75,
"normal": 50,
"low": 25,
"chore": 20,
}
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class AgentTarget(StrEnum):
"""Which agent should handle this issue."""
TIMMY = "timmy" # Timmy handles locally (self)
CLAUDE = "claude" # Dispatch to Claude Code
KIMI = "kimi" # Dispatch to Kimi Code
@dataclass
class TriagedIssue:
"""A Gitea issue enriched with triage metadata."""
number: int
title: str
body: str
labels: list[str] = field(default_factory=list)
assignees: list[str] = field(default_factory=list)
priority_score: int = 50
agent_target: AgentTarget = AgentTarget.TIMMY
rationale: str = ""
url: str = ""
raw: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _extract_labels(issue: dict[str, Any]) -> list[str]:
"""Return normalised label names from a raw Gitea issue dict."""
return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])]
def _score_priority(labels: list[str], assignees: list[str]) -> int:
score = _PRIORITY_LABEL_SCORES.get("normal", 50)
for lbl in labels:
for key, val in _PRIORITY_LABEL_SCORES.items():
if key in lbl:
score = max(score, val)
if assignees:
score -= 20 # already assigned — lower urgency for fresh dispatch
return max(0, score)
def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]:
"""Heuristic: pick the best agent and return (target, rationale)."""
combined = f"{title} {body} {' '.join(labels)}".lower()
if any(kw in combined for kw in _HIGH_COMPLEXITY):
return AgentTarget.CLAUDE, "high-complexity keywords detected"
if any(kw in combined for kw in _RESEARCH_KEYWORDS):
return AgentTarget.KIMI, "research keywords detected"
if any(kw in combined for kw in _ROUTINE_KEYWORDS):
return AgentTarget.TIMMY, "routine task — Timmy self-handles"
return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles"
# ---------------------------------------------------------------------------
# Triage
# ---------------------------------------------------------------------------
def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
"""Score and route a list of raw Gitea issue dicts.
Returns a list sorted by priority_score descending (highest first).
Args:
raw_issues: List of issue objects from the Gitea API.
Returns:
Sorted list of TriagedIssue with routing decisions.
"""
results: list[TriagedIssue] = []
for issue in raw_issues:
number = issue.get("number", 0)
title = issue.get("title", "")
body = issue.get("body") or ""
labels = _extract_labels(issue)
assignees = [
a.get("login", "") for a in issue.get("assignees") or []
]
url = issue.get("html_url", "")
priority = _score_priority(labels, assignees)
agent, rationale = _choose_agent(title, body, labels)
results.append(
TriagedIssue(
number=number,
title=title,
body=body,
labels=labels,
assignees=assignees,
priority_score=priority,
agent_target=agent,
rationale=rationale,
url=url,
raw=issue,
)
)
results.sort(key=lambda i: i.priority_score, reverse=True)
logger.debug(
"Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy",
len(results),
sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE),
sum(1 for i in results if i.agent_target == AgentTarget.KIMI),
sum(1 for i in results if i.agent_target == AgentTarget.TIMMY),
)
return results
# ---------------------------------------------------------------------------
# Gitea fetch (async, gracefully degrading)
# ---------------------------------------------------------------------------
async def fetch_open_issues(
limit: int = 50,
exclude_labels: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Fetch open issues from the configured Gitea repo.
Args:
limit: Maximum number of issues to return.
exclude_labels: Labels whose issues should be skipped
(e.g. ``["kimi-ready", "wip"]``).
Returns:
List of raw issue dicts from the Gitea API,
or empty list if Gitea is unavailable.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("fetch_open_issues: missing dependency — %s", exc)
return []
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("fetch_open_issues: Gitea disabled or no token")
return []
exclude = set(lbl.lower() for lbl in (exclude_labels or []))
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
params = {"state": "open", "limit": min(limit, 50), "page": 1}
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params=params,
)
if resp.status_code != 200:
logger.warning(
"fetch_open_issues: Gitea returned %s", resp.status_code
)
return []
issues = resp.json()
# Filter out pull requests and excluded labels
filtered = []
for issue in issues:
if issue.get("pull_request"):
continue # skip PRs
labels = _extract_labels(issue)
if exclude and any(lbl in exclude for lbl in labels):
continue
filtered.append(issue)
logger.info(
"fetch_open_issues: fetched %d/%d issues (after filtering)",
len(filtered),
len(issues),
)
return filtered
except Exception as exc:
logger.warning("fetch_open_issues: Gitea request failed — %s", exc)
return []

View File

@@ -0,0 +1,213 @@
"""Vassal Protocol — agent dispatch.
Translates triage decisions into concrete Gitea actions:
- Add ``claude-ready`` or ``kimi-ready`` label to an issue
- Post a dispatch comment recording the routing rationale
- Record the dispatch in the in-memory registry so the orchestration loop
can track what was sent and when
The dispatch registry is intentionally in-memory (ephemeral). Durable
tracking is out of scope for this module — that belongs in the task queue
or a future orchestration DB.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from timmy.vassal.backlog import AgentTarget, TriagedIssue
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Label names used by the dispatch system
# ---------------------------------------------------------------------------
_LABEL_MAP: dict[AgentTarget, str] = {
AgentTarget.CLAUDE: "claude-ready",
AgentTarget.KIMI: "kimi-ready",
AgentTarget.TIMMY: "timmy-ready",
}
_LABEL_COLORS: dict[str, str] = {
"claude-ready": "#8b6f47", # warm brown
"kimi-ready": "#006b75", # dark teal
"timmy-ready": "#0075ca", # blue
}
# ---------------------------------------------------------------------------
# Dispatch registry
# ---------------------------------------------------------------------------
@dataclass
class DispatchRecord:
"""A record of one issue being dispatched to an agent."""
issue_number: int
issue_title: str
agent: AgentTarget
rationale: str
dispatched_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
label_applied: bool = False
comment_posted: bool = False
# Module-level registry: issue_number → DispatchRecord
_registry: dict[int, DispatchRecord] = {}
def get_dispatch_registry() -> dict[int, DispatchRecord]:
"""Return a copy of the current dispatch registry."""
return dict(_registry)
def clear_dispatch_registry() -> None:
"""Clear the dispatch registry (mainly for tests)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict,
repo: str,
label_name: str,
) -> int | None:
"""Return the Gitea label ID, creating it if necessary."""
labels_url = f"{base_url}/repos/{repo}/labels"
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
return lbl["id"]
except Exception as exc:
logger.warning("_get_or_create_label: list failed — %s", exc)
return None
color = _LABEL_COLORS.get(label_name, "#cccccc")
try:
resp = await client.post(
labels_url,
headers={**headers, "Content-Type": "application/json"},
json={"name": label_name, "color": color},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
except Exception as exc:
logger.warning("_get_or_create_label: create failed — %s", exc)
return None
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
"""Apply dispatch label and post a routing comment on the Gitea issue.
Gracefully degrades: if Gitea is unavailable the record is still
created and returned (with label_applied=False, comment_posted=False).
Args:
issue: A TriagedIssue with a routing decision.
Returns:
DispatchRecord summarising what was done.
"""
record = DispatchRecord(
issue_number=issue.number,
issue_title=issue.title,
agent=issue.agent_target,
rationale=issue.rationale,
)
if issue.agent_target == AgentTarget.TIMMY:
# Self-dispatch: no label needed — Timmy will handle directly.
logger.info(
"dispatch_issue: #%d '%s' → Timmy (self, no label)",
issue.number,
issue.title[:50],
)
_registry[issue.number] = record
return record
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(
client, base_url, headers, repo, label_name
)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(
"dispatch_issue: #%d '%s'%s (label=%s comment=%s)",
issue.number,
issue.title[:50],
issue.agent_target,
record.label_applied,
record.comment_posted,
)
return record

View File

@@ -0,0 +1,222 @@
"""Vassal Protocol — Hermes house health monitoring.
Monitors system resources on the M3 Max (Hermes) and Ollama model state.
Reports warnings when resources are tight and provides cleanup utilities.
All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention.
"""
from __future__ import annotations
import asyncio
import logging
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class DiskUsage:
path: str = "/"
total_gb: float = 0.0
used_gb: float = 0.0
free_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class MemoryUsage:
total_gb: float = 0.0
available_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class OllamaHealth:
reachable: bool = False
loaded_models: list[str] = field(default_factory=list)
error: str = ""
@dataclass
class SystemSnapshot:
"""Point-in-time snapshot of Hermes resource usage."""
disk: DiskUsage = field(default_factory=DiskUsage)
memory: MemoryUsage = field(default_factory=MemoryUsage)
ollama: OllamaHealth = field(default_factory=OllamaHealth)
warnings: list[str] = field(default_factory=list)
taken_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def healthy(self) -> bool:
return len(self.warnings) == 0
# ---------------------------------------------------------------------------
# Resource probes (sync, run in threads)
# ---------------------------------------------------------------------------
def _probe_disk(path: str = "/") -> DiskUsage:
try:
usage = shutil.disk_usage(path)
total_gb = usage.total / 1e9
used_gb = usage.used / 1e9
free_gb = usage.free / 1e9
pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0
return DiskUsage(
path=path,
total_gb=round(total_gb, 2),
used_gb=round(used_gb, 2),
free_gb=round(free_gb, 2),
percent_used=round(pct, 1),
)
except Exception as exc:
logger.debug("_probe_disk: %s", exc)
return DiskUsage(path=path)
def _probe_memory() -> MemoryUsage:
try:
import psutil # optional — gracefully degrade if absent
vm = psutil.virtual_memory()
return MemoryUsage(
total_gb=round(vm.total / 1e9, 2),
available_gb=round(vm.available / 1e9, 2),
percent_used=round(vm.percent, 1),
)
except ImportError:
logger.debug("_probe_memory: psutil not installed — skipping")
return MemoryUsage()
except Exception as exc:
logger.debug("_probe_memory: %s", exc)
return MemoryUsage()
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
"""Synchronous Ollama health probe — run in a thread."""
try:
import urllib.request
import json
url = ollama_url.rstrip("/") + "/api/tags"
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
data = json.loads(resp.read())
models = [m.get("name", "") for m in data.get("models", [])]
return OllamaHealth(reachable=True, loaded_models=models)
except Exception as exc:
return OllamaHealth(reachable=False, error=str(exc)[:120])
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_system_snapshot() -> SystemSnapshot:
"""Collect a non-blocking snapshot of system resources.
Uses asyncio.to_thread() for all blocking I/O per project convention.
Returns:
SystemSnapshot with disk, memory, and Ollama status.
"""
from config import settings
disk, memory, ollama = await asyncio.gather(
asyncio.to_thread(_probe_disk, "/"),
asyncio.to_thread(_probe_memory),
asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url),
)
warnings: list[str] = []
if disk.percent_used >= _WARN_DISK_PCT:
warnings.append(
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
f"({disk.free_gb:.1f} GB free)"
)
if memory.percent_used >= _WARN_MEM_PCT:
warnings.append(
f"Memory: {memory.percent_used:.0f}% used "
f"({memory.available_gb:.1f} GB available)"
)
if not ollama.reachable:
warnings.append(f"Ollama unreachable: {ollama.error}")
if warnings:
logger.warning("House health warnings: %s", "; ".join(warnings))
return SystemSnapshot(
disk=disk,
memory=memory,
ollama=ollama,
warnings=warnings,
)
async def cleanup_stale_files(
temp_dirs: list[str] | None = None,
max_age_days: int = 7,
) -> dict[str, Any]:
"""Remove files older than *max_age_days* from temp directories.
Only removes files under safe temp paths (never project source).
Args:
temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``.
max_age_days: Age threshold in days.
Returns:
Dict with ``deleted_count`` and ``errors``.
"""
import time
dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108
cutoff = time.time() - max_age_days * 86400
deleted = 0
errors: list[str] = []
def _cleanup() -> None:
nonlocal deleted
for d in dirs:
p = Path(d)
if not p.exists():
continue
for f in p.rglob("*"):
if f.is_file():
try:
if f.stat().st_mtime < cutoff:
f.unlink()
deleted += 1
except Exception as exc:
errors.append(str(exc))
await asyncio.to_thread(_cleanup)
logger.info(
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
)
return {"deleted_count": deleted, "errors": errors}

View File

@@ -0,0 +1,321 @@
"""Vassal Protocol — main orchestration loop.
Ties the backlog, dispatch, agent health, and house health modules together
into a single ``VassalOrchestrator`` that can run as a background service.
Each cycle:
1. Fetch open Gitea issues
2. Triage: score priority + route to agent
3. Dispatch: apply labels / post routing comments
4. Check agent health: nudge stuck agents
5. Check house health: log warnings, trigger cleanup if needed
6. Return a VassalCycleRecord summarising the cycle
Usage::
from timmy.vassal import vassal_orchestrator
record = await vassal_orchestrator.run_cycle()
status = vassal_orchestrator.get_status()
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle record
# ---------------------------------------------------------------------------
@dataclass
class VassalCycleRecord:
"""Summary of one orchestration cycle."""
cycle_id: int
started_at: str
finished_at: str = ""
duration_ms: int = 0
issues_fetched: int = 0
issues_dispatched: int = 0
dispatched_to_claude: int = 0
dispatched_to_kimi: int = 0
dispatched_to_timmy: int = 0
stuck_agents: list[str] = field(default_factory=list)
nudges_sent: int = 0
house_warnings: list[str] = field(default_factory=list)
cleanup_deleted: int = 0
errors: list[str] = field(default_factory=list)
@property
def healthy(self) -> bool:
return not self.errors and not self.house_warnings
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class VassalOrchestrator:
"""Timmy's autonomous orchestration engine.
Runs observe → triage → dispatch → monitor → house-check cycles on a
configurable interval.
Parameters
----------
cycle_interval:
Seconds between cycles. Defaults to ``settings.vassal_cycle_interval``
when available, otherwise 300 s (5 min).
max_dispatch_per_cycle:
Cap on new dispatches per cycle to avoid spamming agents.
"""
def __init__(
self,
cycle_interval: float | None = None,
max_dispatch_per_cycle: int = 10,
) -> None:
self._cycle_count = 0
self._running = False
self._task: asyncio.Task | None = None
self._max_dispatch = max_dispatch_per_cycle
self._history: list[VassalCycleRecord] = []
# Resolve interval — lazy to avoid import-time settings read
self._cycle_interval = cycle_interval
# -- public API --------------------------------------------------------
@property
def cycle_count(self) -> int:
return self._cycle_count
@property
def is_running(self) -> bool:
return self._running
@property
def history(self) -> list[VassalCycleRecord]:
return list(self._history)
def get_status(self) -> dict[str, Any]:
"""Return a JSON-serialisable status dict."""
last = self._history[-1] if self._history else None
return {
"running": self._running,
"cycle_count": self._cycle_count,
"last_cycle": {
"cycle_id": last.cycle_id,
"started_at": last.started_at,
"issues_fetched": last.issues_fetched,
"issues_dispatched": last.issues_dispatched,
"stuck_agents": last.stuck_agents,
"house_warnings": last.house_warnings,
"healthy": last.healthy,
}
if last
else None,
}
# -- single cycle ------------------------------------------------------
async def run_cycle(self) -> VassalCycleRecord:
"""Execute one full orchestration cycle.
Gracefully degrades at each step — a failure in one sub-task does
not abort the rest of the cycle.
Returns:
VassalCycleRecord summarising what happened.
"""
self._cycle_count += 1
start = time.monotonic()
record = VassalCycleRecord(
cycle_id=self._cycle_count,
started_at=datetime.now(UTC).isoformat(),
)
# 1 + 2: Fetch & triage
await self._step_backlog(record)
# 3: Agent health
await self._step_agent_health(record)
# 4: House health
await self._step_house_health(record)
# Finalise record
record.finished_at = datetime.now(UTC).isoformat()
record.duration_ms = int((time.monotonic() - start) * 1000)
self._history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
logger.info(
"VassalOrchestrator cycle #%d complete (%d ms): "
"fetched=%d dispatched=%d stuck=%s house_ok=%s",
record.cycle_id,
record.duration_ms,
record.issues_fetched,
record.issues_dispatched,
record.stuck_agents or "none",
not record.house_warnings,
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring orchestration loop as a background task."""
if self._running:
logger.warning("VassalOrchestrator already running")
return
self._running = True
self._task = asyncio.ensure_future(self._loop())
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
logger.info("VassalOrchestrator stop requested")
async def _loop(self) -> None:
interval = self._resolve_interval()
logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval)
while self._running:
try:
await self.run_cycle()
except Exception:
logger.exception("VassalOrchestrator cycle failed")
await asyncio.sleep(interval)
# -- step: backlog -------------------------------------------------------
async def _step_backlog(self, record: VassalCycleRecord) -> None:
from timmy.vassal.backlog import fetch_open_issues, triage_issues
from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry
try:
raw_issues = await fetch_open_issues(
limit=50,
exclude_labels=["wip", "blocked", "needs-info"],
)
record.issues_fetched = len(raw_issues)
if not raw_issues:
return
triaged = triage_issues(raw_issues)
registry = get_dispatch_registry()
dispatched = 0
for issue in triaged:
if dispatched >= self._max_dispatch:
break
# Skip already-dispatched issues
if issue.number in registry:
continue
await dispatch_issue(issue)
dispatched += 1
from timmy.vassal.backlog import AgentTarget
if issue.agent_target == AgentTarget.CLAUDE:
record.dispatched_to_claude += 1
elif issue.agent_target == AgentTarget.KIMI:
record.dispatched_to_kimi += 1
else:
record.dispatched_to_timmy += 1
record.issues_dispatched = dispatched
except Exception as exc:
logger.exception("_step_backlog failed")
record.errors.append(f"backlog: {exc}")
# -- step: agent health -------------------------------------------------
async def _step_agent_health(self, record: VassalCycleRecord) -> None:
from config import settings
from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent
try:
threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120)
report = await get_full_health_report(stuck_threshold_minutes=threshold)
for agent_status in report.agents:
if agent_status.is_stuck:
record.stuck_agents.append(agent_status.agent)
for issue_num in agent_status.stuck_issue_numbers:
ok = await nudge_stuck_agent(agent_status.agent, issue_num)
if ok:
record.nudges_sent += 1
except Exception as exc:
logger.exception("_step_agent_health failed")
record.errors.append(f"agent_health: {exc}")
# -- step: house health -------------------------------------------------
async def _step_house_health(self, record: VassalCycleRecord) -> None:
from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot
try:
snapshot = await get_system_snapshot()
record.house_warnings = snapshot.warnings
# Auto-cleanup temp files when disk is getting tight
if snapshot.disk.percent_used >= 80.0:
result = await cleanup_stale_files(max_age_days=3)
record.cleanup_deleted = result.get("deleted_count", 0)
except Exception as exc:
logger.exception("_step_house_health failed")
record.errors.append(f"house_health: {exc}")
# -- helpers ------------------------------------------------------------
def _resolve_interval(self) -> float:
if self._cycle_interval is not None:
return self._cycle_interval
try:
from config import settings
return float(getattr(settings, "vassal_cycle_interval", 300))
except Exception:
return 300.0
async def _broadcast(self, record: VassalCycleRecord) -> None:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"vassal.cycle",
{
"cycle_id": record.cycle_id,
"started_at": record.started_at,
"issues_fetched": record.issues_fetched,
"issues_dispatched": record.issues_dispatched,
"stuck_agents": record.stuck_agents,
"house_warnings": record.house_warnings,
"duration_ms": record.duration_ms,
"healthy": record.healthy,
},
)
except Exception as exc:
logger.debug("VassalOrchestrator broadcast skipped: %s", exc)

View File

@@ -0,0 +1,103 @@
"""Unit tests for timmy.vassal.agent_health."""
from __future__ import annotations
import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
def test_agent_status_idle_default():
s = AgentStatus(agent="claude")
assert s.is_idle is True
assert s.is_stuck is False
assert s.needs_reassignment is False
def test_agent_status_active():
s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11])
s.is_idle = len(s.active_issue_numbers) == 0
assert s.is_idle is False
def test_agent_status_stuck():
s = AgentStatus(
agent="claude",
active_issue_numbers=[7],
stuck_issue_numbers=[7],
is_idle=False,
)
assert s.is_stuck is True
assert s.needs_reassignment is True
# ---------------------------------------------------------------------------
# AgentHealthReport
# ---------------------------------------------------------------------------
def test_report_any_stuck():
claude = AgentStatus(agent="claude", stuck_issue_numbers=[3])
kimi = AgentStatus(agent="kimi")
report = AgentHealthReport(agents=[claude, kimi])
assert report.any_stuck is True
def test_report_all_idle():
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.all_idle is True
def test_report_for_agent_found():
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
found = report.for_agent("kimi")
assert found is kimi
def test_report_for_agent_not_found():
report = AgentHealthReport(agents=[AgentStatus(agent="claude")])
assert report.for_agent("timmy") is None
# ---------------------------------------------------------------------------
# check_agent_health — no Gitea in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_agent_health_unknown_agent():
"""Unknown agent name returns idle status without error."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("unknown-bot")
assert status.agent == "unknown-bot"
assert status.is_idle is True
@pytest.mark.asyncio
async def test_check_agent_health_no_token():
"""Returns idle status gracefully when Gitea token is absent."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("claude")
# Should not raise; returns idle (no active issues discovered)
assert isinstance(status, AgentStatus)
assert status.agent == "claude"
@pytest.mark.asyncio
async def test_get_full_health_report_returns_both_agents():
from timmy.vassal.agent_health import get_full_health_report
report = await get_full_health_report()
agent_names = {a.agent for a in report.agents}
assert "claude" in agent_names
assert "kimi" in agent_names

View File

@@ -0,0 +1,186 @@
"""Unit tests for timmy.vassal.backlog — triage and fetch helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import (
AgentTarget,
TriagedIssue,
_choose_agent,
_extract_labels,
_score_priority,
triage_issues,
)
# ---------------------------------------------------------------------------
# _extract_labels
# ---------------------------------------------------------------------------
def test_extract_labels_empty():
assert _extract_labels({}) == []
def test_extract_labels_normalises_case():
issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]}
assert _extract_labels(issue) == ["high", "feature"]
# ---------------------------------------------------------------------------
# _score_priority
# ---------------------------------------------------------------------------
def test_priority_urgent():
assert _score_priority(["urgent"], []) == 100
def test_priority_high():
assert _score_priority(["high"], []) == 75
def test_priority_normal_default():
assert _score_priority([], []) == 50
def test_priority_assigned_penalised():
# already assigned → subtract 20
score = _score_priority([], ["some-agent"])
assert score == 30
def test_priority_label_substring_match():
# "critical" contains "critical" → 90
assert _score_priority(["critical-bug"], []) == 90
# ---------------------------------------------------------------------------
# _choose_agent
# ---------------------------------------------------------------------------
def test_choose_claude_for_architecture():
target, rationale = _choose_agent("Refactor auth middleware", "", [])
assert target == AgentTarget.CLAUDE
assert "complex" in rationale or "high-complexity" in rationale
def test_choose_kimi_for_research():
target, rationale = _choose_agent("Deep research on embedding models", "", [])
assert target == AgentTarget.KIMI
def test_choose_timmy_for_docs():
target, rationale = _choose_agent("Update documentation for CLI", "", [])
assert target == AgentTarget.TIMMY
def test_choose_timmy_default():
target, rationale = _choose_agent("Fix typo in README", "simple change", [])
# Could route to timmy (docs/trivial) or default — either is valid
assert isinstance(target, AgentTarget)
def test_choose_agent_label_wins():
# "security" label → Claude
target, _ = _choose_agent("Login page", "", ["security"])
assert target == AgentTarget.CLAUDE
# ---------------------------------------------------------------------------
# triage_issues
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int,
title: str,
body: str = "",
labels: list[str] | None = None,
assignees: list[str] | None = None,
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"html_url": f"http://gitea/issues/{number}",
}
def test_triage_returns_sorted_by_priority():
issues = [
_make_raw_issue(1, "Routine docs update", labels=["docs"]),
_make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]),
_make_raw_issue(3, "Normal feature", labels=[]),
]
triaged = triage_issues(issues)
# Highest priority first
assert triaged[0].number == 2
assert triaged[0].priority_score == 100 # urgent label
def test_triage_prs_can_be_included():
# triage_issues does not filter PRs — that's fetch_open_issues's job
issues = [_make_raw_issue(10, "A PR-like issue")]
triaged = triage_issues(issues)
assert len(triaged) == 1
def test_triage_empty():
assert triage_issues([]) == []
def test_triage_routing():
issues = [
_make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"),
_make_raw_issue(2, "Refactor agent loader", body="architecture change"),
_make_raw_issue(3, "Fix typo in docs", labels=["docs"]),
]
triaged = {i.number: i for i in triage_issues(issues)}
assert triaged[1].agent_target == AgentTarget.KIMI
assert triaged[2].agent_target == AgentTarget.CLAUDE
assert triaged[3].agent_target == AgentTarget.TIMMY
def test_triage_preserves_url():
issues = [_make_raw_issue(42, "Some issue")]
triaged = triage_issues(issues)
assert triaged[0].url == "http://gitea/issues/42"
# ---------------------------------------------------------------------------
# fetch_open_issues — no Gitea available in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch):
"""When Gitea is disabled, fetch returns [] without raising."""
import timmy.vassal.backlog as bl
# Patch settings
class FakeSettings:
gitea_enabled = False
gitea_token = ""
gitea_url = "http://localhost:3000"
gitea_repo = "owner/repo"
monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import
# We can't easily monkeypatch `from config import settings` inside the function,
# so test the no-token path via environment
import os
original = os.environ.pop("GITEA_TOKEN", None)
try:
result = await bl.fetch_open_issues()
# Should return [] gracefully (no token configured by default in test env)
assert isinstance(result, list)
finally:
if original is not None:
os.environ["GITEA_TOKEN"] = original

View File

@@ -0,0 +1,114 @@
"""Unit tests for timmy.vassal.dispatch — routing and label helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import AgentTarget, TriagedIssue
from timmy.vassal.dispatch import (
DispatchRecord,
clear_dispatch_registry,
get_dispatch_registry,
)
def _make_triaged(
number: int,
title: str,
agent: AgentTarget,
priority: int = 50,
) -> TriagedIssue:
return TriagedIssue(
number=number,
title=title,
body="",
agent_target=agent,
priority_score=priority,
rationale="test rationale",
url=f"http://gitea/issues/{number}",
)
# ---------------------------------------------------------------------------
# Registry helpers
# ---------------------------------------------------------------------------
def test_registry_starts_empty():
clear_dispatch_registry()
assert get_dispatch_registry() == {}
def test_registry_returns_copy():
clear_dispatch_registry()
reg = get_dispatch_registry()
reg[999] = None # type: ignore[assignment]
assert 999 not in get_dispatch_registry()
# ---------------------------------------------------------------------------
# dispatch_issue — Timmy self-dispatch (no Gitea required)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dispatch_timmy_self_no_gitea():
"""Timmy self-dispatch records without hitting Gitea."""
clear_dispatch_registry()
issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert isinstance(record, DispatchRecord)
assert record.issue_number == 1
assert record.agent == AgentTarget.TIMMY
assert 1 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_claude_no_gitea_token():
"""Claude dispatch gracefully degrades when Gitea token is absent."""
clear_dispatch_registry()
issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.issue_number == 2
assert record.agent == AgentTarget.CLAUDE
# label/comment not applied — no token
assert record.label_applied is False
assert 2 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_kimi_no_gitea_token():
clear_dispatch_registry()
issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.agent == AgentTarget.KIMI
assert record.label_applied is False
# ---------------------------------------------------------------------------
# DispatchRecord fields
# ---------------------------------------------------------------------------
def test_dispatch_record_defaults():
r = DispatchRecord(
issue_number=5,
issue_title="Test issue",
agent=AgentTarget.TIMMY,
rationale="because",
)
assert r.label_applied is False
assert r.comment_posted is False
assert r.dispatched_at # has a timestamp

View File

@@ -0,0 +1,116 @@
"""Unit tests for timmy.vassal.house_health."""
from __future__ import annotations
import pytest
from timmy.vassal.house_health import (
DiskUsage,
MemoryUsage,
OllamaHealth,
SystemSnapshot,
_probe_disk,
)
# ---------------------------------------------------------------------------
# Data model tests
# ---------------------------------------------------------------------------
def test_system_snapshot_healthy_when_no_warnings():
snap = SystemSnapshot()
assert snap.healthy is True
def test_system_snapshot_unhealthy_with_warnings():
snap = SystemSnapshot(warnings=["disk 90% full"])
assert snap.healthy is False
def test_disk_usage_defaults():
d = DiskUsage()
assert d.percent_used == 0.0
assert d.path == "/"
def test_memory_usage_defaults():
m = MemoryUsage()
assert m.percent_used == 0.0
def test_ollama_health_defaults():
o = OllamaHealth()
assert o.reachable is False
assert o.loaded_models == []
# ---------------------------------------------------------------------------
# _probe_disk — runs against real filesystem
# ---------------------------------------------------------------------------
def test_probe_disk_root():
result = _probe_disk("/")
assert result.total_gb > 0
assert 0.0 <= result.percent_used <= 100.0
assert result.free_gb >= 0
def test_probe_disk_bad_path():
result = _probe_disk("/nonexistent_path_xyz")
# Should not raise — returns zeroed DiskUsage
assert result.percent_used == 0.0
# ---------------------------------------------------------------------------
# get_system_snapshot — async
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_system_snapshot_returns_snapshot():
from timmy.vassal.house_health import get_system_snapshot
snap = await get_system_snapshot()
assert isinstance(snap, SystemSnapshot)
# Disk is always probed
assert snap.disk.total_gb >= 0
# Ollama is likely unreachable in test env — that's fine
assert isinstance(snap.ollama, OllamaHealth)
@pytest.mark.asyncio
async def test_get_system_snapshot_disk_warning(monkeypatch):
"""When disk is above threshold, a warning is generated."""
import timmy.vassal.house_health as hh
# Patch _probe_disk to return high usage
def _full_disk(path: str) -> DiskUsage:
return DiskUsage(
path=path,
total_gb=100.0,
used_gb=90.0,
free_gb=10.0,
percent_used=90.0,
)
monkeypatch.setattr(hh, "_probe_disk", _full_disk)
snap = await hh.get_system_snapshot()
assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings)
# ---------------------------------------------------------------------------
# cleanup_stale_files — temp dir test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cleanup_stale_files_missing_dir():
"""Should not raise when the target dir doesn't exist."""
from timmy.vassal.house_health import cleanup_stale_files
result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"])
assert result["deleted_count"] == 0
assert result["errors"] == []

View File

@@ -0,0 +1,139 @@
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
from __future__ import annotations
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
def test_cycle_record_healthy_when_no_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
)
assert r.healthy is True
def test_cycle_record_unhealthy_with_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
errors=["backlog: connection refused"],
)
assert r.healthy is False
def test_cycle_record_unhealthy_with_warnings():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
house_warnings=["disk 90% full"],
)
assert r.healthy is False
# ---------------------------------------------------------------------------
# VassalOrchestrator state
# ---------------------------------------------------------------------------
def test_orchestrator_initial_state():
orch = VassalOrchestrator()
assert orch.cycle_count == 0
assert orch.is_running is False
assert orch.history == []
def test_orchestrator_get_status_no_cycles():
orch = VassalOrchestrator()
status = orch.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
# ---------------------------------------------------------------------------
# run_cycle — integration (no Gitea, no Ollama in test env)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_cycle_completes_without_services():
"""run_cycle must complete and record even when external services are down."""
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator(cycle_interval=300)
record = await orch.run_cycle()
assert isinstance(record, VassalCycleRecord)
assert record.cycle_id == 1
assert record.finished_at # was set
assert record.duration_ms >= 0
# No Gitea → fetched = 0, dispatched = 0
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
# History updated
assert len(orch.history) == 1
assert orch.cycle_count == 1
@pytest.mark.asyncio
async def test_run_cycle_increments_cycle_count():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
await orch.run_cycle()
assert orch.cycle_count == 2
assert len(orch.history) == 2
@pytest.mark.asyncio
async def test_get_status_after_cycle():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
status = orch.get_status()
assert status["cycle_count"] == 1
last = status["last_cycle"]
assert last is not None
assert last["cycle_id"] == 1
assert last["issues_fetched"] == 0
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
def test_orchestrator_stop_when_not_running():
"""stop() on an idle orchestrator must not raise."""
orch = VassalOrchestrator()
orch.stop() # should be a no-op
assert orch.is_running is False
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
def test_module_singleton_exists():
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)