Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
f79899e283 chore: Acknowledge closed issue #1012
Issue #1012 (Enhancement: Integrated "Knowledge Graph" Explorer) was marked as closed due to not aligning with the harness-first strategy. No code changes were made.
2026-03-23 14:36:17 -04:00
128aa4427f [claude] Vassal Protocol — Timmy as autonomous orchestrator (#1070) (#1142)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:33:15 +00:00
4f8e86348c [claude] Build Timmy autonomous backlog triage loop (#1071) (#1141)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:32:27 +00:00
0c627f175b [gemini] refactor: Gracefully handle tool registration errors (#938) (#1132)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:26:40 +00:00
21 changed files with 3439 additions and 621 deletions

View File

@@ -25,19 +25,6 @@ providers:
tier: local
url: "http://localhost:11434"
models:
# ── Dual-model routing: Qwen3-8B (fast) + Qwen3-14B (quality) ──────────
# Both models fit simultaneously: ~6.6 GB + ~10.5 GB = ~17 GB combined.
# Requires OLLAMA_MAX_LOADED_MODELS=2 (set in .env) to stay hot.
# Ref: issue #1065 — Qwen3-8B/14B dual-model routing strategy
- name: qwen3:8b
context_window: 32768
capabilities: [text, tools, json, streaming, routine]
description: "Qwen3-8B Q6_K — fast router for routine tasks (~6.6 GB, 45-55 tok/s)"
- name: qwen3:14b
context_window: 40960
capabilities: [text, tools, json, streaming, complex, reasoning]
description: "Qwen3-14B Q5_K_M — complex reasoning and planning (~10.5 GB, 20-28 tok/s)"
# Text + Tools models
- name: qwen3:30b
default: true
@@ -200,20 +187,6 @@ fallback_chains:
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Complexity-based routing chains (issue #1065) ───────────────────────
# Routine tasks: prefer Qwen3-8B for low latency (~45-55 tok/s)
routine:
- qwen3:8b # Primary fast model
- llama3.1:8b-instruct # Fallback fast model
- llama3.2:3b # Smallest available
# Complex tasks: prefer Qwen3-14B for quality (~20-28 tok/s)
complex:
- qwen3:14b # Primary quality model
- hermes4-14b # Native tool calling, hybrid reasoning
- qwen3:30b # Highest local quality
- qwen2.5:14b # Additional fallback
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

View File

@@ -41,13 +41,6 @@ class Settings(BaseSettings):
# 4096 keeps memory at ~19GB. Set to 0 to use model defaults.
ollama_num_ctx: int = 4096
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
# or add it to your systemd/launchd unit before starting the harness.
ollama_max_loaded_models: int = 2
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:30b,llama3.1"
# Or edit config/providers.yaml → fallback_chains for the canonical source.
@@ -311,6 +304,16 @@ class Settings(BaseSettings):
mcp_timeout: int = 15
mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds)
# ── Backlog Triage Loop ────────────────────────────────────────────
# Autonomous loop: fetch open issues, score, assign to agents.
backlog_triage_enabled: bool = False
# Seconds between triage cycles (default: 15 minutes).
backlog_triage_interval_seconds: int = 900
# When True, score and summarize but don't write to Gitea.
backlog_triage_dry_run: bool = False
# Create a daily triage summary issue/comment.
backlog_triage_daily_summary: bool = True
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.
loop_qa_enabled: bool = True
@@ -318,6 +321,15 @@ class Settings(BaseSettings):
loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
loop_qa_max_per_hour: int = 12 # safety throttle
# ── Vassal Protocol (Autonomous Orchestrator) ─────────────────────
# Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health.
# See timmy/vassal/ for implementation.
vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded
vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min)
vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle
vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck"
vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle"
# ── Paperclip AI — orchestration bridge ────────────────────────────
# URL where the Paperclip server listens.
# For VPS deployment behind nginx, use the public domain.

View File

@@ -2,7 +2,6 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .classifier import TaskComplexity, classify_task
from .history import HealthHistoryStore, get_history_store
__all__ = [
@@ -13,6 +12,4 @@ __all__ = [
"router",
"HealthHistoryStore",
"get_history_store",
"TaskComplexity",
"classify_task",
]

View File

@@ -528,34 +528,6 @@ class CascadeRouter:
return True
def _get_model_for_complexity(
self, provider: Provider, complexity: "TaskComplexity"
) -> str | None:
"""Return the best model on *provider* for the given complexity tier.
Checks fallback chains first (routine / complex), then falls back to
any model with the matching capability tag, then the provider default.
"""
from infrastructure.router.classifier import TaskComplexity
chain_key = "routine" if complexity == TaskComplexity.SIMPLE else "complex"
# Walk the capability fallback chain — first model present on this provider wins
for model_name in self.config.fallback_chains.get(chain_key, []):
if any(m["name"] == model_name for m in provider.models):
return model_name
# Direct capability lookup — only return if a model explicitly has the tag
# (do not use get_model_with_capability here as it falls back to the default)
cap_model = next(
(m["name"] for m in provider.models if chain_key in m.get("capabilities", [])),
None,
)
if cap_model:
return cap_model
return None # Caller will use provider default
async def complete(
self,
messages: list[dict],
@@ -563,7 +535,6 @@ class CascadeRouter:
temperature: float = 0.7,
max_tokens: int | None = None,
cascade_tier: str | None = None,
complexity_hint: str | None = None,
) -> dict:
"""Complete a chat conversation with automatic failover.
@@ -572,48 +543,24 @@ class CascadeRouter:
- Falls back to vision-capable models when needed
- Supports image URLs, paths, and base64 encoding
Complexity-based routing (issue #1065):
- ``complexity_hint="simple"`` → routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` → routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) → auto-classifies from messages
Args:
messages: List of message dicts with role and content
model: Preferred model (tries this first; complexity routing is
skipped when an explicit model is given)
model: Preferred model (tries this first, then provider defaults)
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
cascade_tier: If specified, filters providers by this tier.
- "frontier_required": Uses only Anthropic provider for top-tier models.
complexity_hint: "simple", "complex", or None (auto-detect).
Returns:
Dict with content, provider_used, model, latency_ms,
is_fallback_model, and complexity fields.
Dict with content, provider_used, and metrics
Raises:
RuntimeError: If all providers fail
"""
from infrastructure.router.classifier import TaskComplexity, classify_task
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
# Resolve task complexity ─────────────────────────────────────────────
# Skip complexity routing when caller explicitly specifies a model.
complexity: TaskComplexity | None = None
if model is None:
if complexity_hint is not None:
try:
complexity = TaskComplexity(complexity_hint.lower())
except ValueError:
logger.warning("Unknown complexity_hint %r, auto-classifying", complexity_hint)
complexity = classify_task(messages)
else:
complexity = classify_task(messages)
logger.debug("Task complexity: %s", complexity.value)
errors = []
providers = self.providers
@@ -626,6 +573,7 @@ class CascadeRouter:
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
if not self._is_provider_available(provider):
continue
@@ -639,21 +587,7 @@ class CascadeRouter:
)
continue
# Complexity-based model selection (only when no explicit model) ──
effective_model = model
if effective_model is None and complexity is not None:
effective_model = self._get_model_for_complexity(provider, complexity)
if effective_model:
logger.debug(
"Complexity routing [%s]: %s%s",
complexity.value,
provider.name,
effective_model,
)
selected_model, is_fallback_model = self._select_model(
provider, effective_model, content_type
)
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
@@ -676,7 +610,6 @@ class CascadeRouter:
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
"complexity": complexity.value if complexity is not None else None,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -1,166 +0,0 @@
"""Task complexity classifier for Qwen3 dual-model routing.
Classifies incoming tasks as SIMPLE (route to Qwen3-8B for low-latency)
or COMPLEX (route to Qwen3-14B for quality-sensitive work).
Classification is fully heuristic — no LLM inference required.
"""
import re
from enum import Enum
class TaskComplexity(Enum):
"""Task complexity tier for model routing."""
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
# Keywords strongly associated with complex tasks
_COMPLEX_KEYWORDS: frozenset[str] = frozenset(
[
"plan",
"review",
"analyze",
"analyse",
"triage",
"refactor",
"design",
"architecture",
"implement",
"compare",
"debug",
"explain",
"prioritize",
"prioritise",
"strategy",
"optimize",
"optimise",
"evaluate",
"assess",
"brainstorm",
"outline",
"summarize",
"summarise",
"generate code",
"write a",
"write the",
"code review",
"pull request",
"multi-step",
"multi step",
"step by step",
"backlog prioriti",
"issue triage",
"root cause",
"how does",
"why does",
"what are the",
]
)
# Keywords strongly associated with simple/routine tasks
_SIMPLE_KEYWORDS: frozenset[str] = frozenset(
[
"status",
"list ",
"show ",
"what is",
"how many",
"ping",
"run ",
"execute ",
"ls ",
"cat ",
"ps ",
"fetch ",
"count ",
"tail ",
"head ",
"grep ",
"find file",
"read file",
"get ",
"query ",
"check ",
"yes",
"no",
"ok",
"done",
"thanks",
]
)
# Content longer than this is treated as complex regardless of keywords
_COMPLEX_CHAR_THRESHOLD = 500
# Short content defaults to simple
_SIMPLE_CHAR_THRESHOLD = 150
# More than this many messages suggests an ongoing complex conversation
_COMPLEX_CONVERSATION_DEPTH = 6
def classify_task(messages: list[dict]) -> TaskComplexity:
"""Classify task complexity from a list of messages.
Uses heuristic rules — no LLM call required. Errs toward COMPLEX
when uncertain so that quality is preserved.
Args:
messages: List of message dicts with ``role`` and ``content`` keys.
Returns:
TaskComplexity.SIMPLE or TaskComplexity.COMPLEX
"""
if not messages:
return TaskComplexity.SIMPLE
# Concatenate all user-turn content for analysis
user_content = " ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human")
and isinstance(msg.get("content"), str)
).lower().strip()
if not user_content:
return TaskComplexity.SIMPLE
# Complexity signals override everything -----------------------------------
# Explicit complex keywords
for kw in _COMPLEX_KEYWORDS:
if kw in user_content:
return TaskComplexity.COMPLEX
# Numbered / multi-step instruction list: "1. do this 2. do that"
if re.search(r"\b\d+\.\s+\w", user_content):
return TaskComplexity.COMPLEX
# Code blocks embedded in messages
if "```" in user_content:
return TaskComplexity.COMPLEX
# Long content → complex reasoning likely required
if len(user_content) > _COMPLEX_CHAR_THRESHOLD:
return TaskComplexity.COMPLEX
# Deep conversation → complex ongoing task
if len(messages) > _COMPLEX_CONVERSATION_DEPTH:
return TaskComplexity.COMPLEX
# Simplicity signals -------------------------------------------------------
# Explicit simple keywords
for kw in _SIMPLE_KEYWORDS:
if kw in user_content:
return TaskComplexity.SIMPLE
# Short single-sentence messages default to simple
if len(user_content) <= _SIMPLE_CHAR_THRESHOLD:
return TaskComplexity.SIMPLE
# When uncertain, prefer quality (complex model)
return TaskComplexity.COMPLEX

759
src/timmy/backlog_triage.py Normal file
View File

@@ -0,0 +1,759 @@
"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work.
Continuously fetches open issues, scores/prioritizes them, and decides
what to work on next without waiting to be asked.
Loop flow::
while true:
1. Fetch all open issues from Gitea API
2. Score/prioritize by labels, age, type, blocked status
3. Identify unassigned high-priority items
4. Decide: assign to claude, dispatch to kimi, or flag for Alex
5. Execute the assignment (comment + assign)
6. Optionally post a daily triage summary
7. Sleep for configurable interval (default 15 min)
Priority tiers:
P0 — security, data loss, blocking bugs → immediate action
P1 — core functionality, ready issues → next sprint
P2 — improvements, low-score issues → backlog
P3 — philosophy, meta → someday/never (skip in triage)
Usage::
from timmy.backlog_triage import BacklogTriageLoop
loop = BacklogTriageLoop()
await loop.run_once() # single triage cycle
await loop.start() # background daemon loop
loop.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────
# Minimum triage score to be considered "ready" for assignment
READY_THRESHOLD = 5
# Agent Gitea logins
AGENT_CLAUDE = "claude"
AGENT_KIMI = "kimi"
OWNER_LOGIN = "rockachopa" # Alex — human owner
# Labels
KIMI_READY_LABEL = "kimi-ready"
TRIAGE_DONE_LABEL = "triage-done"
# Tag sets (mirrors scripts/triage_score.py)
_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"})
_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"})
_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"})
_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"})
_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"})
_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"})
_LOOP_TAG = "loop-generated"
# Regex patterns for scoring
_TAG_RE = re.compile(r"\[([^\]]+)\]")
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
_ACCEPT_RE = re.compile(
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
re.IGNORECASE,
)
_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE)
_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE)
# ── Data types ───────────────────────────────────────────────────────────────
@dataclass
class ScoredIssue:
"""A Gitea issue enriched with triage scoring."""
number: int
title: str
body: str
labels: list[str]
tags: set[str]
assignees: list[str]
created_at: datetime
issue_type: str # bug | feature | refactor | philosophy | research | unknown
score: int = 0
scope: int = 0
acceptance: int = 0
alignment: int = 0
ready: bool = False
age_days: int = 0
is_p0: bool = False
is_blocked: bool = False
@property
def is_unassigned(self) -> bool:
return len(self.assignees) == 0
@property
def needs_kimi(self) -> bool:
return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels
@dataclass
class TriageDecision:
"""The outcome of a triage decision for a single issue."""
issue_number: int
action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip"
reason: str
agent: str = "" # the agent assigned (login)
executed: bool = False
error: str = ""
@dataclass
class TriageCycleResult:
"""Summary of one complete triage cycle."""
timestamp: str
total_open: int
scored: int
ready: int
decisions: list[TriageDecision] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
duration_ms: int = 0
# ── Scoring ──────────────────────────────────────────────────────────────────
def _extract_tags(title: str, labels: list[str]) -> set[str]:
"""Pull tags from [bracket] title notation + Gitea label names."""
tags: set[str] = set()
for m in _TAG_RE.finditer(title):
tags.add(m.group(1).lower().strip())
for lbl in labels:
tags.add(lbl.lower().strip())
return tags
def _score_scope(title: str, body: str, tags: set[str]) -> int:
"""03: How well-scoped is this issue?"""
text = f"{title}\n{body}"
score = 0
if _FILE_RE.search(text):
score += 1
if _FUNC_RE.search(text):
score += 1
clean = _TAG_RE.sub("", title).strip()
if len(clean) < 80:
score += 1
if tags & _META_TAGS:
score = max(0, score - 2)
return min(3, score)
def _score_acceptance(title: str, body: str, tags: set[str]) -> int:
"""03: Does this have clear acceptance criteria?"""
text = f"{title}\n{body}"
score = 0
matches = len(_ACCEPT_RE.findall(text))
if matches >= 3:
score += 2
elif matches >= 1:
score += 1
if _TEST_RE.search(text):
score += 1
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
score += 1
if tags & _META_TAGS:
score = max(0, score - 1)
return min(3, score)
def _score_alignment(title: str, body: str, tags: set[str]) -> int:
"""03: How aligned is this with the north star?"""
score = 0
if tags & _BUG_TAGS:
return 3
if tags & _REFACTOR_TAGS:
score += 2
if tags & _FEATURE_TAGS:
score += 2
if _LOOP_TAG in tags:
score += 1
if tags & _META_TAGS:
score = 0
return min(3, score)
def score_issue(issue: dict[str, Any]) -> ScoredIssue:
"""Score and classify a raw Gitea issue dict."""
number = issue["number"]
title = issue.get("title", "")
body = issue.get("body") or ""
label_names = [lbl["name"] for lbl in issue.get("labels", [])]
tags = _extract_tags(title, label_names)
assignees = [a["login"] for a in issue.get("assignees", [])]
# Parse created_at
raw_ts = issue.get("created_at", "")
try:
created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(UTC)
age_days = (datetime.now(UTC) - created_at).days
# Scores
scope = _score_scope(title, body, tags)
acceptance = _score_acceptance(title, body, tags)
alignment = _score_alignment(title, body, tags)
total = scope + acceptance + alignment
# Classify
if tags & _BUG_TAGS:
issue_type = "bug"
elif tags & _RESEARCH_TAGS:
issue_type = "research"
elif tags & _FEATURE_TAGS:
issue_type = "feature"
elif tags & _REFACTOR_TAGS:
issue_type = "refactor"
elif tags & _META_TAGS:
issue_type = "philosophy"
else:
issue_type = "unknown"
is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug"
is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body))
return ScoredIssue(
number=number,
title=_TAG_RE.sub("", title).strip(),
body=body,
labels=label_names,
tags=tags,
assignees=assignees,
created_at=created_at,
issue_type=issue_type,
score=total,
scope=scope,
acceptance=acceptance,
alignment=alignment,
ready=total >= READY_THRESHOLD,
age_days=age_days,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ── Decision logic ───────────────────────────────────────────────────────────
def decide(issue: ScoredIssue) -> TriageDecision:
"""Decide what to do with an issue.
Returns a TriageDecision with action, reason, and agent.
Decision is not yet executed — call execute_decision() for that.
"""
num = issue.number
# Skip philosophy/meta — not dev-actionable
if issue.issue_type == "philosophy":
return TriageDecision(
issue_number=num,
action="skip",
reason="Philosophy/meta issue — not dev-actionable in the triage loop.",
)
# Skip already-assigned issues
if not issue.is_unassigned:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Already assigned to: {', '.join(issue.assignees)}.",
)
# Skip if not ready (low score)
if not issue.ready:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.",
)
# Blocked: flag for Alex
if issue.is_blocked:
return TriageDecision(
issue_number=num,
action="flag_alex",
agent=OWNER_LOGIN,
reason=(
"Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment."
),
)
# Research / Kimi-ready
if issue.needs_kimi:
return TriageDecision(
issue_number=num,
action="assign_kimi",
agent=AGENT_KIMI,
reason=(
f"Issue type '{issue.issue_type}' with research/investigation scope. "
f"Assigning kimi-ready label for Kimi agent to pick up."
),
)
# P0 bugs and blocking issues → Claude immediately
if issue.is_p0:
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). "
f"Assigning to Claude Code for immediate attention."
),
)
# Everything else that is ready → Claude Code
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, "
f"age={issue.age_days}d). Assigning to Claude Code."
),
)
# ── Gitea API client ─────────────────────────────────────────────────────────
def _api_headers() -> dict[str, str]:
return {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _repo_url(path: str) -> str:
owner, repo = settings.gitea_repo.split("/", 1)
return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}"
async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]:
"""Fetch all open issues from Gitea, paginating as needed."""
all_issues: list[dict[str, Any]] = []
page = 1
while True:
url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}")
try:
resp = await client.get(url, headers=_api_headers())
if resp.status_code != 200:
logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code)
break
batch: list[dict[str, Any]] = resp.json()
if not batch:
break
all_issues.extend(batch)
if len(batch) < 50:
break
page += 1
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Gitea connection error fetching issues: %s", exc)
break
return all_issues
async def post_comment(
client: httpx.AsyncClient,
issue_number: int,
body: str,
) -> bool:
"""Post a comment on a Gitea issue. Returns True on success."""
url = _repo_url(f"issues/{issue_number}/comments")
try:
resp = await client.post(url, headers=_api_headers(), json={"body": body})
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post comment on #%d: %s", issue_number, exc)
return False
async def assign_issue(
client: httpx.AsyncClient,
issue_number: int,
assignee: str,
) -> bool:
"""Assign an issue to a Gitea user. Returns True on success."""
url = _repo_url(f"issues/{issue_number}")
try:
resp = await client.patch(
url,
headers=_api_headers(),
json={"assignees": [assignee]},
)
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc)
return False
async def add_label(
client: httpx.AsyncClient,
issue_number: int,
label_name: str,
) -> bool:
"""Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success."""
owner, repo = settings.gitea_repo.split("/", 1)
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
headers = _api_headers()
try:
# Fetch existing labels
resp = await client.get(labels_url, headers=headers)
if resp.status_code != 200:
return False
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
if label_name in existing:
label_id = existing[label_name]
else:
# Auto-create the label
create_resp = await client.post(
labels_url,
headers=headers,
json={"name": label_name, "color": "#006b75"},
)
if create_resp.status_code not in (200, 201):
return False
label_id = create_resp.json()["id"]
# Apply to the issue
apply_url = _repo_url(f"issues/{issue_number}/labels")
apply_resp = await client.post(
apply_url, headers=headers, json={"labels": [label_id]}
)
return apply_resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc)
return False
# ── Decision execution ───────────────────────────────────────────────────────
async def execute_decision(
client: httpx.AsyncClient,
decision: TriageDecision,
dry_run: bool = False,
) -> TriageDecision:
"""Execute a triage decision — comment + assign/label.
When dry_run=True, logs the decision but makes no Gitea API calls.
Returns the updated decision with executed=True on success.
"""
num = decision.issue_number
if decision.action == "skip":
logger.debug("Triage skip #%d: %s", num, decision.reason)
decision.executed = True
return decision
audit_comment = _build_audit_comment(decision)
if dry_run:
logger.info(
"[DRY RUN] #%d%s (%s): %s",
num,
decision.action,
decision.agent,
decision.reason,
)
decision.executed = True
return decision
# Post audit comment first (always, so Alex can see reasoning)
comment_ok = await post_comment(client, num, audit_comment)
if not comment_ok:
decision.error = "Failed to post audit comment"
logger.warning("Triage #%d: comment failed", num)
return decision
# Execute assignment
ok = False
if decision.action == "assign_claude":
ok = await assign_issue(client, num, AGENT_CLAUDE)
elif decision.action == "assign_kimi":
ok = await add_label(client, num, KIMI_READY_LABEL)
elif decision.action == "flag_alex":
# Comment already posted above — that's sufficient for flagging
ok = True
if ok:
decision.executed = True
logger.info("Triage #%d%s OK", num, decision.action)
else:
decision.error = f"Action {decision.action!r} failed"
logger.warning("Triage #%d: action %r failed", num, decision.action)
return decision
def _build_audit_comment(decision: TriageDecision) -> str:
"""Build the audit trail comment that Alex can read to see reasoning."""
ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
action_text = {
"assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.",
"assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.",
"flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.",
}.get(decision.action, decision.action)
return (
f"**[Timmy Triage — {ts}]**\n\n"
f"**Decision:** {action_text}\n\n"
f"**Why:** {decision.reason}\n\n"
f"*Autonomous triage by Timmy. Reply to override.*"
)
# ── Daily summary ─────────────────────────────────────────────────────────────
def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str:
"""Build the daily triage summary body."""
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
assigned = [d for d in result.decisions if d.executed and d.action != "skip"]
skipped = [d for d in result.decisions if d.action == "skip"]
lines = [
f"# Timmy Backlog Triage — {now}",
"",
f"**Open issues:** {result.total_open} | "
f"**Scored:** {result.scored} | "
f"**Ready:** {result.ready} | "
f"**Assigned this cycle:** {len(assigned)}",
"",
"## Top 10 Ready Issues (by score)",
"",
]
top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10]
for s in top:
flag = "🐛" if s.issue_type == "bug" else "" if s.is_p0 else ""
lines.append(
f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}"
)
if assigned:
lines += ["", "## Actions Taken", ""]
for d in assigned:
lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}")
if skipped:
lines += ["", f"## Skipped ({len(skipped)} issues)", ""]
for d in skipped[:5]:
lines.append(f"- #{d.issue_number}: {d.reason[:80]}")
if len(skipped) > 5:
lines.append(f"- … and {len(skipped) - 5} more")
lines += [
"",
"---",
"*Auto-generated by Timmy's backlog triage loop. "
"Override any decision by reassigning or commenting.*",
]
return "\n".join(lines)
async def post_daily_summary(
client: httpx.AsyncClient,
result: TriageCycleResult,
scored: list[ScoredIssue],
dry_run: bool = False,
) -> bool:
"""Post a daily triage summary as a new Gitea issue."""
today = datetime.now(UTC).strftime("%Y-%m-%d")
title = f"[Triage] Daily backlog summary — {today}"
body = _build_daily_summary(result, scored)
if dry_run:
logger.info("[DRY RUN] Would post daily summary: %s", title)
return True
url = _repo_url("issues")
try:
resp = await client.post(
url,
headers=_api_headers(),
json={
"title": title,
"body": body,
"labels": [],
},
)
if resp.status_code in (200, 201):
issue_num = resp.json().get("number", "?")
logger.info("Daily triage summary posted as issue #%s", issue_num)
return True
logger.warning("Daily summary post failed (HTTP %s)", resp.status_code)
return False
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post daily summary: %s", exc)
return False
# ── Main loop class ───────────────────────────────────────────────────────────
class BacklogTriageLoop:
"""Autonomous backlog triage loop.
Fetches, scores, and assigns Gitea issues on a configurable interval.
Parameters
----------
interval:
Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds.
dry_run:
When True, score and log decisions but don't write to Gitea.
daily_summary:
When True, post a daily triage summary issue after each cycle.
"""
def __init__(
self,
*,
interval: float | None = None,
dry_run: bool | None = None,
daily_summary: bool | None = None,
) -> None:
self._interval = float(interval or settings.backlog_triage_interval_seconds)
self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run
self._daily_summary = (
daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary
)
self._running = False
self._task: asyncio.Task | None = None
self._cycle_count = 0
self._last_summary_date: str = ""
self.history: list[TriageCycleResult] = []
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
async def run_once(self) -> TriageCycleResult:
"""Execute one full triage cycle.
1. Fetch all open Gitea issues
2. Score and prioritize
3. Decide on each unassigned ready issue
4. Execute decisions
5. Optionally post daily summary
"""
import time
self._cycle_count += 1
start = time.monotonic()
ts = datetime.now(UTC).isoformat()
result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0)
if not settings.gitea_enabled or not settings.gitea_token:
logger.warning("Backlog triage: Gitea not configured — skipping cycle")
return result
async with httpx.AsyncClient(timeout=30) as client:
# 1. Fetch
raw_issues = await fetch_open_issues(client)
result.total_open = len(raw_issues)
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
# 2. Score
scored = [score_issue(i) for i in raw_issues]
result.scored = len(scored)
result.ready = sum(1 for s in scored if s.ready)
# 3 & 4. Decide and execute for each issue
for issue in scored:
decision = decide(issue)
if decision.action == "skip":
result.decisions.append(decision)
continue
decision = await execute_decision(client, decision, dry_run=self._dry_run)
result.decisions.append(decision)
# Rate-limit: short pause between API writes to avoid hammering Gitea
if not self._dry_run:
await asyncio.sleep(0.5)
# 5. Daily summary (once per UTC day)
today = datetime.now(UTC).strftime("%Y-%m-%d")
if self._daily_summary and today != self._last_summary_date:
await post_daily_summary(client, result, scored, dry_run=self._dry_run)
self._last_summary_date = today
result.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(result)
assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip")
logger.info(
"Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned",
self._cycle_count,
result.duration_ms,
result.total_open,
result.ready,
assigned_count,
)
return result
async def start(self) -> None:
"""Start the triage loop as a background task."""
if self._running:
logger.warning("BacklogTriageLoop already running")
return
self._running = True
await self._loop()
async def _loop(self) -> None:
logger.info(
"BacklogTriageLoop started (interval=%.0fs, dry_run=%s)",
self._interval,
self._dry_run,
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Backlog triage cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
logger.info("BacklogTriageLoop stop requested")

View File

@@ -462,7 +462,8 @@ def consult_grok(query: str) -> str:
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.warning("Tool execution failed (Lightning invoice): %s", exc)
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
@@ -533,7 +534,8 @@ def _register_web_fetch_tool(toolkit: Toolkit) -> None:
try:
toolkit.register(web_fetch, name="web_fetch")
except Exception as exc:
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
logger.error("Failed to register web_fetch tool: %s", exc)
raise
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
@@ -565,8 +567,8 @@ def _register_grok_tool(toolkit: Toolkit) -> None:
toolkit.register(consult_grok, name="consult_grok")
logger.info("Grok consultation tool registered")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Grok registration): %s", exc)
logger.debug("Grok tool not available")
logger.error("Failed to register Grok tool: %s", exc)
raise
def _register_memory_tools(toolkit: Toolkit) -> None:
@@ -579,8 +581,8 @@ def _register_memory_tools(toolkit: Toolkit) -> None:
toolkit.register(memory_read, name="memory_read")
toolkit.register(memory_forget, name="memory_forget")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Memory tools registration): %s", exc)
logger.debug("Memory tools not available")
logger.error("Failed to register Memory tools: %s", exc)
raise
def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
@@ -628,8 +630,8 @@ def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
toolkit.register(plan_and_execute, name="plan_and_execute")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (plan_and_execute registration): %s", exc)
logger.debug("plan_and_execute tool not available")
logger.error("Failed to register plan_and_execute tool: %s", exc)
raise
def _register_introspection_tools(toolkit: Toolkit) -> None:
@@ -647,15 +649,16 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
toolkit.register(get_memory_status, name="get_memory_status")
toolkit.register(run_self_tests, name="run_self_tests")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
logger.debug("Introspection tools not available")
logger.error("Failed to register Introspection tools: %s", exc)
raise
try:
from timmy.mcp_tools import update_gitea_avatar
toolkit.register(update_gitea_avatar, name="update_gitea_avatar")
except (ImportError, AttributeError) as exc:
logger.debug("update_gitea_avatar tool not available: %s", exc)
logger.error("Failed to register update_gitea_avatar tool: %s", exc)
raise
try:
from timmy.session_logger import self_reflect, session_history
@@ -663,8 +666,8 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
toolkit.register(session_history, name="session_history")
toolkit.register(self_reflect, name="self_reflect")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (session_history registration): %s", exc)
logger.debug("session_history tool not available")
logger.error("Failed to register session_history tool: %s", exc)
raise
def _register_delegation_tools(toolkit: Toolkit) -> None:
@@ -676,8 +679,8 @@ def _register_delegation_tools(toolkit: Toolkit) -> None:
toolkit.register(delegate_to_kimi, name="delegate_to_kimi")
toolkit.register(list_swarm_agents, name="list_swarm_agents")
except Exception as exc:
logger.warning("Tool execution failed (Delegation tools registration): %s", exc)
logger.debug("Delegation tools not available")
logger.error("Failed to register Delegation tools: %s", exc)
raise
def _register_gematria_tool(toolkit: Toolkit) -> None:
@@ -687,8 +690,8 @@ def _register_gematria_tool(toolkit: Toolkit) -> None:
toolkit.register(gematria, name="gematria")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Gematria registration): %s", exc)
logger.debug("Gematria tool not available")
logger.error("Failed to register Gematria tool: %s", exc)
raise
def _register_artifact_tools(toolkit: Toolkit) -> None:
@@ -699,8 +702,8 @@ def _register_artifact_tools(toolkit: Toolkit) -> None:
toolkit.register(jot_note, name="jot_note")
toolkit.register(log_decision, name="log_decision")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Artifact tools registration): %s", exc)
logger.debug("Artifact tools not available")
logger.error("Failed to register Artifact tools: %s", exc)
raise
def _register_thinking_tools(toolkit: Toolkit) -> None:
@@ -710,8 +713,8 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
toolkit.register(search_thoughts, name="thought_search")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (Thinking tools registration): %s", exc)
logger.debug("Thinking tools not available")
logger.error("Failed to register Thinking tools: %s", exc)
raise
def create_full_toolkit(base_dir: str | Path | None = None):

View File

@@ -0,0 +1,21 @@
"""Vassal Protocol — Timmy as autonomous orchestrator.
Timmy is Alex's vassal: the lead decision-maker for development direction,
agent management, and house health. He observes the Gitea backlog, decides
priorities, dispatches work to agents (Claude, Kimi, self), monitors output,
and keeps Hermes (M3 Max) running well.
Public API
----------
from timmy.vassal import vassal_orchestrator
await vassal_orchestrator.run_cycle()
snapshot = vassal_orchestrator.get_status()
"""
from timmy.vassal.orchestration_loop import VassalOrchestrator
# Module-level singleton — import and use directly.
vassal_orchestrator = VassalOrchestrator()
__all__ = ["VassalOrchestrator", "vassal_orchestrator"]

View File

@@ -0,0 +1,296 @@
"""Vassal Protocol — agent health monitoring.
Monitors whether downstream agents (Claude, Kimi) are making progress on
their assigned issues. Detects idle and stuck agents by querying Gitea
for issues with dispatch labels and checking last-comment timestamps.
Stuck agent heuristic
---------------------
An agent is considered "stuck" on an issue if:
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
- No new comment has appeared in the last ``stuck_threshold_minutes``
- The issue has not been closed
Idle agent heuristic
--------------------
An agent is "idle" if it has no currently assigned (labeled) open issues.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_AGENT_LABELS = {
"claude": "claude-ready",
"kimi": "kimi-ready",
}
_DEFAULT_STUCK_MINUTES = 120
_DEFAULT_IDLE_THRESHOLD = 30
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def is_stuck(self) -> bool:
return bool(self.stuck_issue_numbers)
@property
def needs_reassignment(self) -> bool:
return self.is_stuck
@dataclass
class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def any_stuck(self) -> bool:
return any(a.is_stuck for a in self.agents)
@property
def all_idle(self) -> bool:
return all(a.is_idle for a in self.agents)
def for_agent(self, name: str) -> AgentStatus | None:
for a in self.agents:
if a.agent == name:
return a
return None
# ---------------------------------------------------------------------------
# Gitea queries
# ---------------------------------------------------------------------------
async def _fetch_labeled_issues(
client: Any,
base_url: str,
headers: dict,
repo: str,
label: str,
) -> list[dict]:
"""Return open issues carrying a specific label."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "labels": label, "limit": 50},
)
if resp.status_code == 200:
return [i for i in resp.json() if not i.get("pull_request")]
except Exception as exc:
logger.warning("_fetch_labeled_issues: %s%s", label, exc)
return []
async def _last_comment_time(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
) -> datetime | None:
"""Return the timestamp of the most recent comment on an issue."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
params={"limit": 1},
)
if resp.status_code == 200:
comments = resp.json()
if comments:
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
if ts:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception as exc:
logger.debug("_last_comment_time: issue #%d%s", issue_number, exc)
return None
async def _issue_created_time(issue: dict) -> datetime | None:
ts = issue.get("created_at")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
async def check_agent_health(
agent_name: str,
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentStatus:
"""Query Gitea for issues assigned to *agent_name* and assess health.
Args:
agent_name: One of "claude", "kimi".
stuck_threshold_minutes: Minutes of silence before an issue is
considered stuck.
Returns:
AgentStatus for this agent.
"""
status = AgentStatus(agent=agent_name)
label = _AGENT_LABELS.get(agent_name)
if not label:
logger.debug("check_agent_health: unknown agent %s", agent_name)
return status
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("check_agent_health: missing dependency — %s", exc)
return status
if not settings.gitea_enabled or not settings.gitea_token:
return status
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(
client, base_url, headers, repo, label
)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(
client, base_url, headers, repo, num
)
if last_activity is None:
last_activity = await _issue_created_time(issue)
if last_activity is not None and last_activity < cutoff:
status.stuck_issue_numbers.append(num)
logger.info(
"check_agent_health: %s issue #%d stuck since %s",
agent_name,
num,
last_activity.isoformat(),
)
except Exception as exc:
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
status.is_idle = len(status.active_issue_numbers) == 0
return status
async def get_full_health_report(
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentHealthReport:
"""Run health checks for all monitored agents and return combined report.
Args:
stuck_threshold_minutes: Passed through to each agent check.
Returns:
AgentHealthReport with status for Claude and Kimi.
"""
import asyncio
claude_status, kimi_status = await asyncio.gather(
check_agent_health("claude", stuck_threshold_minutes),
check_agent_health("kimi", stuck_threshold_minutes),
)
return AgentHealthReport(agents=[claude_status, kimi_status])
async def nudge_stuck_agent(
agent_name: str,
issue_number: int,
) -> bool:
"""Post a nudge comment on a stuck issue to prompt the agent.
Args:
agent_name: The agent that appears stuck.
issue_number: The Gitea issue number to nudge.
Returns:
True if the comment was posted successfully.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
return False
if not settings.gitea_enabled or not settings.gitea_token:
return False
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
"Please post a status update or close if complete."
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
logger.info(
"nudge_stuck_agent: nudged %s on issue #%d",
agent_name,
issue_number,
)
return True
except Exception as exc:
logger.warning("nudge_stuck_agent: failed — %s", exc)
return False

281
src/timmy/vassal/backlog.py Normal file
View File

@@ -0,0 +1,281 @@
"""Vassal Protocol — Gitea backlog triage.
Fetches open issues from Gitea, scores each one for priority and agent
suitability, and returns a ranked list ready for dispatch.
Complexity scoring heuristics
------------------------------
high_complexity_keywords → route to Claude (architecture, refactor, review)
research_keywords → route to Kimi (survey, analysis, benchmark)
routine_keywords → route to Timmy/self (docs, chore, config)
otherwise → Timmy self-handles
Priority scoring
----------------
URGENT label → 100
HIGH / critical → 75
NORMAL (default) → 50
LOW / chore → 25
Already assigned → deprioritized (subtract 20)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Labels that hint at complexity level / agent suitability
_HIGH_COMPLEXITY = frozenset(
{
"architecture",
"refactor",
"code review",
"security",
"performance",
"breaking change",
"design",
"complex",
}
)
_RESEARCH_KEYWORDS = frozenset(
{
"research",
"survey",
"analysis",
"benchmark",
"comparative",
"investigation",
"deep dive",
"review",
}
)
_ROUTINE_KEYWORDS = frozenset(
{
"docs",
"documentation",
"chore",
"config",
"typo",
"rename",
"cleanup",
"trivial",
"style",
}
)
_PRIORITY_LABEL_SCORES: dict[str, int] = {
"urgent": 100,
"critical": 90,
"high": 75,
"normal": 50,
"low": 25,
"chore": 20,
}
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class AgentTarget(StrEnum):
"""Which agent should handle this issue."""
TIMMY = "timmy" # Timmy handles locally (self)
CLAUDE = "claude" # Dispatch to Claude Code
KIMI = "kimi" # Dispatch to Kimi Code
@dataclass
class TriagedIssue:
"""A Gitea issue enriched with triage metadata."""
number: int
title: str
body: str
labels: list[str] = field(default_factory=list)
assignees: list[str] = field(default_factory=list)
priority_score: int = 50
agent_target: AgentTarget = AgentTarget.TIMMY
rationale: str = ""
url: str = ""
raw: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _extract_labels(issue: dict[str, Any]) -> list[str]:
"""Return normalised label names from a raw Gitea issue dict."""
return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])]
def _score_priority(labels: list[str], assignees: list[str]) -> int:
score = _PRIORITY_LABEL_SCORES.get("normal", 50)
for lbl in labels:
for key, val in _PRIORITY_LABEL_SCORES.items():
if key in lbl:
score = max(score, val)
if assignees:
score -= 20 # already assigned — lower urgency for fresh dispatch
return max(0, score)
def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]:
"""Heuristic: pick the best agent and return (target, rationale)."""
combined = f"{title} {body} {' '.join(labels)}".lower()
if any(kw in combined for kw in _HIGH_COMPLEXITY):
return AgentTarget.CLAUDE, "high-complexity keywords detected"
if any(kw in combined for kw in _RESEARCH_KEYWORDS):
return AgentTarget.KIMI, "research keywords detected"
if any(kw in combined for kw in _ROUTINE_KEYWORDS):
return AgentTarget.TIMMY, "routine task — Timmy self-handles"
return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles"
# ---------------------------------------------------------------------------
# Triage
# ---------------------------------------------------------------------------
def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
"""Score and route a list of raw Gitea issue dicts.
Returns a list sorted by priority_score descending (highest first).
Args:
raw_issues: List of issue objects from the Gitea API.
Returns:
Sorted list of TriagedIssue with routing decisions.
"""
results: list[TriagedIssue] = []
for issue in raw_issues:
number = issue.get("number", 0)
title = issue.get("title", "")
body = issue.get("body") or ""
labels = _extract_labels(issue)
assignees = [
a.get("login", "") for a in issue.get("assignees") or []
]
url = issue.get("html_url", "")
priority = _score_priority(labels, assignees)
agent, rationale = _choose_agent(title, body, labels)
results.append(
TriagedIssue(
number=number,
title=title,
body=body,
labels=labels,
assignees=assignees,
priority_score=priority,
agent_target=agent,
rationale=rationale,
url=url,
raw=issue,
)
)
results.sort(key=lambda i: i.priority_score, reverse=True)
logger.debug(
"Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy",
len(results),
sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE),
sum(1 for i in results if i.agent_target == AgentTarget.KIMI),
sum(1 for i in results if i.agent_target == AgentTarget.TIMMY),
)
return results
# ---------------------------------------------------------------------------
# Gitea fetch (async, gracefully degrading)
# ---------------------------------------------------------------------------
async def fetch_open_issues(
limit: int = 50,
exclude_labels: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Fetch open issues from the configured Gitea repo.
Args:
limit: Maximum number of issues to return.
exclude_labels: Labels whose issues should be skipped
(e.g. ``["kimi-ready", "wip"]``).
Returns:
List of raw issue dicts from the Gitea API,
or empty list if Gitea is unavailable.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("fetch_open_issues: missing dependency — %s", exc)
return []
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("fetch_open_issues: Gitea disabled or no token")
return []
exclude = set(lbl.lower() for lbl in (exclude_labels or []))
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
params = {"state": "open", "limit": min(limit, 50), "page": 1}
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params=params,
)
if resp.status_code != 200:
logger.warning(
"fetch_open_issues: Gitea returned %s", resp.status_code
)
return []
issues = resp.json()
# Filter out pull requests and excluded labels
filtered = []
for issue in issues:
if issue.get("pull_request"):
continue # skip PRs
labels = _extract_labels(issue)
if exclude and any(lbl in exclude for lbl in labels):
continue
filtered.append(issue)
logger.info(
"fetch_open_issues: fetched %d/%d issues (after filtering)",
len(filtered),
len(issues),
)
return filtered
except Exception as exc:
logger.warning("fetch_open_issues: Gitea request failed — %s", exc)
return []

View File

@@ -0,0 +1,213 @@
"""Vassal Protocol — agent dispatch.
Translates triage decisions into concrete Gitea actions:
- Add ``claude-ready`` or ``kimi-ready`` label to an issue
- Post a dispatch comment recording the routing rationale
- Record the dispatch in the in-memory registry so the orchestration loop
can track what was sent and when
The dispatch registry is intentionally in-memory (ephemeral). Durable
tracking is out of scope for this module — that belongs in the task queue
or a future orchestration DB.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from timmy.vassal.backlog import AgentTarget, TriagedIssue
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Label names used by the dispatch system
# ---------------------------------------------------------------------------
_LABEL_MAP: dict[AgentTarget, str] = {
AgentTarget.CLAUDE: "claude-ready",
AgentTarget.KIMI: "kimi-ready",
AgentTarget.TIMMY: "timmy-ready",
}
_LABEL_COLORS: dict[str, str] = {
"claude-ready": "#8b6f47", # warm brown
"kimi-ready": "#006b75", # dark teal
"timmy-ready": "#0075ca", # blue
}
# ---------------------------------------------------------------------------
# Dispatch registry
# ---------------------------------------------------------------------------
@dataclass
class DispatchRecord:
"""A record of one issue being dispatched to an agent."""
issue_number: int
issue_title: str
agent: AgentTarget
rationale: str
dispatched_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
label_applied: bool = False
comment_posted: bool = False
# Module-level registry: issue_number → DispatchRecord
_registry: dict[int, DispatchRecord] = {}
def get_dispatch_registry() -> dict[int, DispatchRecord]:
"""Return a copy of the current dispatch registry."""
return dict(_registry)
def clear_dispatch_registry() -> None:
"""Clear the dispatch registry (mainly for tests)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict,
repo: str,
label_name: str,
) -> int | None:
"""Return the Gitea label ID, creating it if necessary."""
labels_url = f"{base_url}/repos/{repo}/labels"
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
return lbl["id"]
except Exception as exc:
logger.warning("_get_or_create_label: list failed — %s", exc)
return None
color = _LABEL_COLORS.get(label_name, "#cccccc")
try:
resp = await client.post(
labels_url,
headers={**headers, "Content-Type": "application/json"},
json={"name": label_name, "color": color},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
except Exception as exc:
logger.warning("_get_or_create_label: create failed — %s", exc)
return None
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
"""Apply dispatch label and post a routing comment on the Gitea issue.
Gracefully degrades: if Gitea is unavailable the record is still
created and returned (with label_applied=False, comment_posted=False).
Args:
issue: A TriagedIssue with a routing decision.
Returns:
DispatchRecord summarising what was done.
"""
record = DispatchRecord(
issue_number=issue.number,
issue_title=issue.title,
agent=issue.agent_target,
rationale=issue.rationale,
)
if issue.agent_target == AgentTarget.TIMMY:
# Self-dispatch: no label needed — Timmy will handle directly.
logger.info(
"dispatch_issue: #%d '%s' → Timmy (self, no label)",
issue.number,
issue.title[:50],
)
_registry[issue.number] = record
return record
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(
client, base_url, headers, repo, label_name
)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(
"dispatch_issue: #%d '%s'%s (label=%s comment=%s)",
issue.number,
issue.title[:50],
issue.agent_target,
record.label_applied,
record.comment_posted,
)
return record

View File

@@ -0,0 +1,222 @@
"""Vassal Protocol — Hermes house health monitoring.
Monitors system resources on the M3 Max (Hermes) and Ollama model state.
Reports warnings when resources are tight and provides cleanup utilities.
All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention.
"""
from __future__ import annotations
import asyncio
import logging
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class DiskUsage:
path: str = "/"
total_gb: float = 0.0
used_gb: float = 0.0
free_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class MemoryUsage:
total_gb: float = 0.0
available_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class OllamaHealth:
reachable: bool = False
loaded_models: list[str] = field(default_factory=list)
error: str = ""
@dataclass
class SystemSnapshot:
"""Point-in-time snapshot of Hermes resource usage."""
disk: DiskUsage = field(default_factory=DiskUsage)
memory: MemoryUsage = field(default_factory=MemoryUsage)
ollama: OllamaHealth = field(default_factory=OllamaHealth)
warnings: list[str] = field(default_factory=list)
taken_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def healthy(self) -> bool:
return len(self.warnings) == 0
# ---------------------------------------------------------------------------
# Resource probes (sync, run in threads)
# ---------------------------------------------------------------------------
def _probe_disk(path: str = "/") -> DiskUsage:
try:
usage = shutil.disk_usage(path)
total_gb = usage.total / 1e9
used_gb = usage.used / 1e9
free_gb = usage.free / 1e9
pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0
return DiskUsage(
path=path,
total_gb=round(total_gb, 2),
used_gb=round(used_gb, 2),
free_gb=round(free_gb, 2),
percent_used=round(pct, 1),
)
except Exception as exc:
logger.debug("_probe_disk: %s", exc)
return DiskUsage(path=path)
def _probe_memory() -> MemoryUsage:
try:
import psutil # optional — gracefully degrade if absent
vm = psutil.virtual_memory()
return MemoryUsage(
total_gb=round(vm.total / 1e9, 2),
available_gb=round(vm.available / 1e9, 2),
percent_used=round(vm.percent, 1),
)
except ImportError:
logger.debug("_probe_memory: psutil not installed — skipping")
return MemoryUsage()
except Exception as exc:
logger.debug("_probe_memory: %s", exc)
return MemoryUsage()
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
"""Synchronous Ollama health probe — run in a thread."""
try:
import urllib.request
import json
url = ollama_url.rstrip("/") + "/api/tags"
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
data = json.loads(resp.read())
models = [m.get("name", "") for m in data.get("models", [])]
return OllamaHealth(reachable=True, loaded_models=models)
except Exception as exc:
return OllamaHealth(reachable=False, error=str(exc)[:120])
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_system_snapshot() -> SystemSnapshot:
"""Collect a non-blocking snapshot of system resources.
Uses asyncio.to_thread() for all blocking I/O per project convention.
Returns:
SystemSnapshot with disk, memory, and Ollama status.
"""
from config import settings
disk, memory, ollama = await asyncio.gather(
asyncio.to_thread(_probe_disk, "/"),
asyncio.to_thread(_probe_memory),
asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url),
)
warnings: list[str] = []
if disk.percent_used >= _WARN_DISK_PCT:
warnings.append(
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
f"({disk.free_gb:.1f} GB free)"
)
if memory.percent_used >= _WARN_MEM_PCT:
warnings.append(
f"Memory: {memory.percent_used:.0f}% used "
f"({memory.available_gb:.1f} GB available)"
)
if not ollama.reachable:
warnings.append(f"Ollama unreachable: {ollama.error}")
if warnings:
logger.warning("House health warnings: %s", "; ".join(warnings))
return SystemSnapshot(
disk=disk,
memory=memory,
ollama=ollama,
warnings=warnings,
)
async def cleanup_stale_files(
temp_dirs: list[str] | None = None,
max_age_days: int = 7,
) -> dict[str, Any]:
"""Remove files older than *max_age_days* from temp directories.
Only removes files under safe temp paths (never project source).
Args:
temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``.
max_age_days: Age threshold in days.
Returns:
Dict with ``deleted_count`` and ``errors``.
"""
import time
dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108
cutoff = time.time() - max_age_days * 86400
deleted = 0
errors: list[str] = []
def _cleanup() -> None:
nonlocal deleted
for d in dirs:
p = Path(d)
if not p.exists():
continue
for f in p.rglob("*"):
if f.is_file():
try:
if f.stat().st_mtime < cutoff:
f.unlink()
deleted += 1
except Exception as exc:
errors.append(str(exc))
await asyncio.to_thread(_cleanup)
logger.info(
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
)
return {"deleted_count": deleted, "errors": errors}

View File

@@ -0,0 +1,321 @@
"""Vassal Protocol — main orchestration loop.
Ties the backlog, dispatch, agent health, and house health modules together
into a single ``VassalOrchestrator`` that can run as a background service.
Each cycle:
1. Fetch open Gitea issues
2. Triage: score priority + route to agent
3. Dispatch: apply labels / post routing comments
4. Check agent health: nudge stuck agents
5. Check house health: log warnings, trigger cleanup if needed
6. Return a VassalCycleRecord summarising the cycle
Usage::
from timmy.vassal import vassal_orchestrator
record = await vassal_orchestrator.run_cycle()
status = vassal_orchestrator.get_status()
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle record
# ---------------------------------------------------------------------------
@dataclass
class VassalCycleRecord:
"""Summary of one orchestration cycle."""
cycle_id: int
started_at: str
finished_at: str = ""
duration_ms: int = 0
issues_fetched: int = 0
issues_dispatched: int = 0
dispatched_to_claude: int = 0
dispatched_to_kimi: int = 0
dispatched_to_timmy: int = 0
stuck_agents: list[str] = field(default_factory=list)
nudges_sent: int = 0
house_warnings: list[str] = field(default_factory=list)
cleanup_deleted: int = 0
errors: list[str] = field(default_factory=list)
@property
def healthy(self) -> bool:
return not self.errors and not self.house_warnings
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class VassalOrchestrator:
"""Timmy's autonomous orchestration engine.
Runs observe → triage → dispatch → monitor → house-check cycles on a
configurable interval.
Parameters
----------
cycle_interval:
Seconds between cycles. Defaults to ``settings.vassal_cycle_interval``
when available, otherwise 300 s (5 min).
max_dispatch_per_cycle:
Cap on new dispatches per cycle to avoid spamming agents.
"""
def __init__(
self,
cycle_interval: float | None = None,
max_dispatch_per_cycle: int = 10,
) -> None:
self._cycle_count = 0
self._running = False
self._task: asyncio.Task | None = None
self._max_dispatch = max_dispatch_per_cycle
self._history: list[VassalCycleRecord] = []
# Resolve interval — lazy to avoid import-time settings read
self._cycle_interval = cycle_interval
# -- public API --------------------------------------------------------
@property
def cycle_count(self) -> int:
return self._cycle_count
@property
def is_running(self) -> bool:
return self._running
@property
def history(self) -> list[VassalCycleRecord]:
return list(self._history)
def get_status(self) -> dict[str, Any]:
"""Return a JSON-serialisable status dict."""
last = self._history[-1] if self._history else None
return {
"running": self._running,
"cycle_count": self._cycle_count,
"last_cycle": {
"cycle_id": last.cycle_id,
"started_at": last.started_at,
"issues_fetched": last.issues_fetched,
"issues_dispatched": last.issues_dispatched,
"stuck_agents": last.stuck_agents,
"house_warnings": last.house_warnings,
"healthy": last.healthy,
}
if last
else None,
}
# -- single cycle ------------------------------------------------------
async def run_cycle(self) -> VassalCycleRecord:
"""Execute one full orchestration cycle.
Gracefully degrades at each step — a failure in one sub-task does
not abort the rest of the cycle.
Returns:
VassalCycleRecord summarising what happened.
"""
self._cycle_count += 1
start = time.monotonic()
record = VassalCycleRecord(
cycle_id=self._cycle_count,
started_at=datetime.now(UTC).isoformat(),
)
# 1 + 2: Fetch & triage
await self._step_backlog(record)
# 3: Agent health
await self._step_agent_health(record)
# 4: House health
await self._step_house_health(record)
# Finalise record
record.finished_at = datetime.now(UTC).isoformat()
record.duration_ms = int((time.monotonic() - start) * 1000)
self._history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
logger.info(
"VassalOrchestrator cycle #%d complete (%d ms): "
"fetched=%d dispatched=%d stuck=%s house_ok=%s",
record.cycle_id,
record.duration_ms,
record.issues_fetched,
record.issues_dispatched,
record.stuck_agents or "none",
not record.house_warnings,
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring orchestration loop as a background task."""
if self._running:
logger.warning("VassalOrchestrator already running")
return
self._running = True
self._task = asyncio.ensure_future(self._loop())
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
logger.info("VassalOrchestrator stop requested")
async def _loop(self) -> None:
interval = self._resolve_interval()
logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval)
while self._running:
try:
await self.run_cycle()
except Exception:
logger.exception("VassalOrchestrator cycle failed")
await asyncio.sleep(interval)
# -- step: backlog -------------------------------------------------------
async def _step_backlog(self, record: VassalCycleRecord) -> None:
from timmy.vassal.backlog import fetch_open_issues, triage_issues
from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry
try:
raw_issues = await fetch_open_issues(
limit=50,
exclude_labels=["wip", "blocked", "needs-info"],
)
record.issues_fetched = len(raw_issues)
if not raw_issues:
return
triaged = triage_issues(raw_issues)
registry = get_dispatch_registry()
dispatched = 0
for issue in triaged:
if dispatched >= self._max_dispatch:
break
# Skip already-dispatched issues
if issue.number in registry:
continue
await dispatch_issue(issue)
dispatched += 1
from timmy.vassal.backlog import AgentTarget
if issue.agent_target == AgentTarget.CLAUDE:
record.dispatched_to_claude += 1
elif issue.agent_target == AgentTarget.KIMI:
record.dispatched_to_kimi += 1
else:
record.dispatched_to_timmy += 1
record.issues_dispatched = dispatched
except Exception as exc:
logger.exception("_step_backlog failed")
record.errors.append(f"backlog: {exc}")
# -- step: agent health -------------------------------------------------
async def _step_agent_health(self, record: VassalCycleRecord) -> None:
from config import settings
from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent
try:
threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120)
report = await get_full_health_report(stuck_threshold_minutes=threshold)
for agent_status in report.agents:
if agent_status.is_stuck:
record.stuck_agents.append(agent_status.agent)
for issue_num in agent_status.stuck_issue_numbers:
ok = await nudge_stuck_agent(agent_status.agent, issue_num)
if ok:
record.nudges_sent += 1
except Exception as exc:
logger.exception("_step_agent_health failed")
record.errors.append(f"agent_health: {exc}")
# -- step: house health -------------------------------------------------
async def _step_house_health(self, record: VassalCycleRecord) -> None:
from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot
try:
snapshot = await get_system_snapshot()
record.house_warnings = snapshot.warnings
# Auto-cleanup temp files when disk is getting tight
if snapshot.disk.percent_used >= 80.0:
result = await cleanup_stale_files(max_age_days=3)
record.cleanup_deleted = result.get("deleted_count", 0)
except Exception as exc:
logger.exception("_step_house_health failed")
record.errors.append(f"house_health: {exc}")
# -- helpers ------------------------------------------------------------
def _resolve_interval(self) -> float:
if self._cycle_interval is not None:
return self._cycle_interval
try:
from config import settings
return float(getattr(settings, "vassal_cycle_interval", 300))
except Exception:
return 300.0
async def _broadcast(self, record: VassalCycleRecord) -> None:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"vassal.cycle",
{
"cycle_id": record.cycle_id,
"started_at": record.started_at,
"issues_fetched": record.issues_fetched,
"issues_dispatched": record.issues_dispatched,
"stuck_agents": record.stuck_agents,
"house_warnings": record.house_warnings,
"duration_ms": record.duration_ms,
"healthy": record.healthy,
},
)
except Exception as exc:
logger.debug("VassalOrchestrator broadcast skipped: %s", exc)

View File

@@ -968,195 +968,3 @@ class TestCascadeRouterReload:
assert router.providers[0].name == "low-priority"
assert router.providers[1].name == "high-priority"
class TestComplexityRouting:
"""Tests for Qwen3-8B / Qwen3-14B dual-model routing (issue #1065)."""
def _make_dual_model_provider(self) -> Provider:
"""Build an Ollama provider with both Qwen3 models registered."""
return Provider(
name="ollama-local",
type="ollama",
enabled=True,
priority=1,
url="http://localhost:11434",
models=[
{
"name": "qwen3:8b",
"capabilities": ["text", "tools", "json", "streaming", "routine"],
},
{
"name": "qwen3:14b",
"default": True,
"capabilities": ["text", "tools", "json", "streaming", "complex", "reasoning"],
},
],
)
def test_get_model_for_complexity_simple_returns_8b(self):
"""Simple tasks should select the model with 'routine' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model == "qwen3:8b"
def test_get_model_for_complexity_complex_returns_14b(self):
"""Complex tasks should select the model with 'complex' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.COMPLEX)
assert model == "qwen3:14b"
def test_get_model_for_complexity_returns_none_when_no_match(self):
"""Returns None when provider has no matching model in chain."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {} # empty chains
provider = Provider(
name="test",
type="ollama",
enabled=True,
priority=1,
models=[{"name": "llama3.2:3b", "default": True, "capabilities": ["text"]}],
)
# No 'routine' or 'complex' model available
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model is None
@pytest.mark.asyncio
async def test_complete_with_simple_hint_routes_to_8b(self):
"""complexity_hint='simple' should use qwen3:8b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "fast answer", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
complexity_hint="simple",
)
assert result["model"] == "qwen3:8b"
assert result["complexity"] == "simple"
@pytest.mark.asyncio
async def test_complete_with_complex_hint_routes_to_14b(self):
"""complexity_hint='complex' should use qwen3:14b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "detailed answer", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "review this PR"}],
complexity_hint="complex",
)
assert result["model"] == "qwen3:14b"
assert result["complexity"] == "complex"
@pytest.mark.asyncio
async def test_explicit_model_bypasses_complexity_routing(self):
"""When model is explicitly provided, complexity routing is skipped."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "response", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
model="qwen3:14b", # explicit override
)
# Explicit model wins — complexity field is None
assert result["model"] == "qwen3:14b"
assert result["complexity"] is None
@pytest.mark.asyncio
async def test_auto_classification_routes_simple_message(self):
"""Short, simple messages should auto-classify as SIMPLE → 8B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
# no complexity_hint — auto-classify
)
assert result["complexity"] == "simple"
assert result["model"] == "qwen3:8b"
@pytest.mark.asyncio
async def test_auto_classification_routes_complex_message(self):
"""Complex messages should auto-classify → 14B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "deep analysis", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "analyze and prioritize the backlog"}],
)
assert result["complexity"] == "complex"
assert result["model"] == "qwen3:14b"
@pytest.mark.asyncio
async def test_invalid_complexity_hint_falls_back_to_auto(self):
"""Invalid complexity_hint should log a warning and auto-classify."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
# Should not raise
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
complexity_hint="INVALID_HINT",
)
assert result["complexity"] in ("simple", "complex") # auto-classified

View File

@@ -1,134 +0,0 @@
"""Tests for Qwen3 dual-model task complexity classifier."""
import pytest
from infrastructure.router.classifier import TaskComplexity, classify_task
class TestClassifyTask:
"""Tests for classify_task heuristics."""
# ── Simple / routine tasks ──────────────────────────────────────────────
def test_empty_messages_is_simple(self):
assert classify_task([]) == TaskComplexity.SIMPLE
def test_no_user_content_is_simple(self):
messages = [{"role": "system", "content": "You are Timmy."}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_status_query_is_simple(self):
messages = [{"role": "user", "content": "status"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_list_command_is_simple(self):
messages = [{"role": "user", "content": "list all tasks"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_get_command_is_simple(self):
messages = [{"role": "user", "content": "get the latest log entry"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_message_under_threshold_is_simple(self):
messages = [{"role": "user", "content": "run the build"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_affirmation_is_simple(self):
messages = [{"role": "user", "content": "yes"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
# ── Complex / quality-sensitive tasks ──────────────────────────────────
def test_plan_keyword_is_complex(self):
messages = [{"role": "user", "content": "plan the sprint"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_review_keyword_is_complex(self):
messages = [{"role": "user", "content": "review this code"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyze_keyword_is_complex(self):
messages = [{"role": "user", "content": "analyze performance"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_triage_keyword_is_complex(self):
messages = [{"role": "user", "content": "triage the open issues"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_refactor_keyword_is_complex(self):
messages = [{"role": "user", "content": "refactor the auth module"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_explain_keyword_is_complex(self):
messages = [{"role": "user", "content": "explain how the router works"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_prioritize_keyword_is_complex(self):
messages = [{"role": "user", "content": "prioritize the backlog"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_long_message_is_complex(self):
long_msg = "do something " * 50 # > 500 chars
messages = [{"role": "user", "content": long_msg}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_numbered_list_is_complex(self):
messages = [
{
"role": "user",
"content": "1. Read the file 2. Analyze it 3. Write a report",
}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_code_block_is_complex(self):
messages = [
{"role": "user", "content": "Here is the code:\n```python\nprint('hello')\n```"}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_deep_conversation_is_complex(self):
messages = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "now do the thing"},
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyse_british_spelling_is_complex(self):
messages = [{"role": "user", "content": "analyse this dataset"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_non_string_content_is_ignored(self):
"""Non-string content should not crash the classifier."""
messages = [{"role": "user", "content": ["part1", "part2"]}]
# Should not raise; result doesn't matter — just must not blow up
result = classify_task(messages)
assert isinstance(result, TaskComplexity)
def test_system_message_not_counted_as_user(self):
"""System message alone should not trigger complex keywords."""
messages = [
{"role": "system", "content": "analyze everything carefully"},
{"role": "user", "content": "yes"},
]
# "analyze" is in system message (not user) — user says "yes" → simple
assert classify_task(messages) == TaskComplexity.SIMPLE
class TestTaskComplexityEnum:
"""Tests for TaskComplexity enum values."""
def test_simple_value(self):
assert TaskComplexity.SIMPLE.value == "simple"
def test_complex_value(self):
assert TaskComplexity.COMPLEX.value == "complex"
def test_lookup_by_value(self):
assert TaskComplexity("simple") == TaskComplexity.SIMPLE
assert TaskComplexity("complex") == TaskComplexity.COMPLEX

View File

@@ -0,0 +1,621 @@
"""Unit tests for timmy.backlog_triage — autonomous backlog triage loop."""
from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.backlog_triage import (
AGENT_CLAUDE,
AGENT_KIMI,
KIMI_READY_LABEL,
OWNER_LOGIN,
READY_THRESHOLD,
BacklogTriageLoop,
ScoredIssue,
TriageCycleResult,
TriageDecision,
_build_audit_comment,
_build_daily_summary,
_extract_tags,
_score_acceptance,
_score_alignment,
_score_scope,
decide,
score_issue,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────
def _make_raw_issue(
number: int = 1,
title: str = "Fix the login bug",
body: str = "## Problem\nLogin fails on empty password.\n\n## Steps\nassert response == 200",
labels: list | None = None,
assignees: list | None = None,
created_at: str = "2026-03-20T10:00:00Z",
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"created_at": created_at,
}
def _make_scored_issue(
number: int = 1,
title: str = "Fix login bug",
issue_type: str = "bug",
score: int = 7,
ready: bool = True,
is_p0: bool = True,
is_blocked: bool = False,
assignees: list | None = None,
tags: set | None = None,
labels: list | None = None,
age_days: int = 3,
) -> ScoredIssue:
return ScoredIssue(
number=number,
title=title,
body="",
labels=labels or [],
tags=tags or {"bug"},
assignees=assignees or [],
created_at=datetime.now(UTC),
issue_type=issue_type,
score=score,
scope=2,
acceptance=2,
alignment=3,
ready=ready,
age_days=age_days,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ── _extract_tags ─────────────────────────────────────────────────────────────
class TestExtractTags:
def test_bracket_tags_in_title(self):
tags = _extract_tags("[Bug] Login fails", [])
assert "bug" in tags
def test_multiple_brackets(self):
tags = _extract_tags("[Bug][P0] Crash on startup", [])
assert "bug" in tags
assert "p0" in tags
def test_label_names(self):
tags = _extract_tags("Fix thing", ["security", "hotfix"])
assert "security" in tags
assert "hotfix" in tags
def test_labels_lowercased(self):
tags = _extract_tags("Title", ["Bug", "FEATURE"])
assert "bug" in tags
assert "feature" in tags
def test_empty_inputs(self):
tags = _extract_tags("", [])
assert tags == set()
# ── Scoring functions ─────────────────────────────────────────────────────────
class TestScoreScope:
def test_file_reference_adds_point(self):
score = _score_scope("Fix auth", "Edit src/timmy/auth.py", set())
assert score >= 1
def test_function_reference_adds_point(self):
score = _score_scope("Fix auth", "def validate_token()", set())
assert score >= 1
def test_short_title_adds_point(self):
score = _score_scope("Short title", "", set())
assert score >= 1
def test_meta_tag_penalizes(self):
score = _score_scope("Discussion about philosophy", "long body " * 5, {"philosophy"})
assert score <= 1
def test_max_score_3(self):
score = _score_scope("Fix auth", "src/auth.py\ndef login()", set())
assert score <= 3
class TestScoreAcceptance:
def test_acceptance_keywords(self):
body = "should return 200\nmust pass tests\nexpect response"
score = _score_acceptance("Title", body, set())
assert score >= 2
def test_test_reference_adds_point(self):
score = _score_acceptance("Title", "Run tox -e unit", set())
assert score >= 1
def test_structured_sections(self):
body = "## Problem\nX\n## Solution\nY"
score = _score_acceptance("Title", body, set())
assert score >= 1
def test_meta_tag_penalizes(self):
score = _score_acceptance("Title", "should do something", {"philosophy"})
# still counts but penalized
assert score <= 2
def test_empty_body(self):
score = _score_acceptance("Title", "", set())
assert score == 0
class TestScoreAlignment:
def test_bug_tags_score_max(self):
assert _score_alignment("", "", {"bug"}) == 3
def test_hotfix_tag_max(self):
assert _score_alignment("", "", {"hotfix"}) == 3
def test_refactor_tag(self):
score = _score_alignment("", "", {"refactor"})
assert score >= 2
def test_feature_tag(self):
score = _score_alignment("", "", {"feature"})
assert score >= 2
def test_meta_tags_zero(self):
assert _score_alignment("", "", {"philosophy"}) == 0
def test_loop_generated_bonus(self):
score = _score_alignment("", "", {"loop-generated"})
assert score >= 1
# ── score_issue ───────────────────────────────────────────────────────────────
class TestScoreIssue:
def test_bug_issue_classified_correctly(self):
raw = _make_raw_issue(labels=["bug"], title="[Bug] Crash on startup")
scored = score_issue(raw)
assert scored.issue_type == "bug"
assert scored.is_p0 is True
def test_feature_issue_classified(self):
raw = _make_raw_issue(labels=["feature"], title="Add voice support")
scored = score_issue(raw)
assert scored.issue_type == "feature"
def test_philosophy_issue_classified(self):
raw = _make_raw_issue(labels=["philosophy"], title="[Philosophy] Should Timmy sleep?")
scored = score_issue(raw)
assert scored.issue_type == "philosophy"
def test_research_issue_classified(self):
raw = _make_raw_issue(labels=["research"], title="Investigate model options")
scored = score_issue(raw)
assert scored.issue_type == "research"
def test_ready_flag_set_when_score_high(self):
body = (
"## Problem\nX breaks.\n## Solution\nFix src/timmy/agent.py def run()\n"
"should return True\nmust pass tox -e unit"
)
raw = _make_raw_issue(labels=["bug"], body=body)
scored = score_issue(raw)
assert scored.score >= READY_THRESHOLD
assert scored.ready is True
def test_is_blocked_detected_in_body(self):
raw = _make_raw_issue(body="This is blocked by issue #50")
scored = score_issue(raw)
assert scored.is_blocked is True
def test_is_blocked_detected_in_title(self):
raw = _make_raw_issue(title="[blocking] Cannot proceed")
scored = score_issue(raw)
# "blocking" in brackets becomes a tag
assert scored.is_blocked is True
def test_unassigned_when_no_assignees(self):
raw = _make_raw_issue(assignees=[])
scored = score_issue(raw)
assert scored.is_unassigned is True
def test_assigned_when_has_assignee(self):
raw = _make_raw_issue(assignees=["claude"])
scored = score_issue(raw)
assert scored.is_unassigned is False
def test_age_days_computed(self):
old_ts = "2026-01-01T00:00:00Z"
raw = _make_raw_issue(created_at=old_ts)
scored = score_issue(raw)
assert scored.age_days > 0
def test_needs_kimi_for_research_label(self):
raw = _make_raw_issue(labels=["kimi-ready"])
scored = score_issue(raw)
assert scored.needs_kimi is True
# ── decide ────────────────────────────────────────────────────────────────────
class TestDecide:
def test_philosophy_skipped(self):
issue = _make_scored_issue(issue_type="philosophy", tags={"philosophy"})
d = decide(issue)
assert d.action == "skip"
assert "philosophy" in d.reason.lower()
def test_assigned_issue_skipped(self):
issue = _make_scored_issue(assignees=["perplexity"])
d = decide(issue)
assert d.action == "skip"
assert "assigned" in d.reason.lower()
def test_low_score_skipped(self):
issue = _make_scored_issue(score=2, ready=False)
d = decide(issue)
assert d.action == "skip"
assert "threshold" in d.reason.lower()
def test_blocked_issue_flagged_for_alex(self):
issue = _make_scored_issue(is_blocked=True)
d = decide(issue)
assert d.action == "flag_alex"
assert d.agent == OWNER_LOGIN
def test_research_issue_assigned_kimi(self):
issue = _make_scored_issue(
issue_type="research",
tags={"research"},
is_p0=False,
is_blocked=False,
)
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_kimi_ready_label_assigns_kimi(self):
issue = _make_scored_issue(
issue_type="unknown",
tags={"kimi-ready"},
labels=["kimi-ready"],
is_p0=False,
is_blocked=False,
)
d = decide(issue)
assert d.action == "assign_kimi"
def test_p0_bug_assigns_claude(self):
issue = _make_scored_issue(issue_type="bug", is_p0=True, is_blocked=False)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_feature_assigns_claude(self):
issue = _make_scored_issue(
issue_type="feature",
is_p0=False,
is_blocked=False,
tags={"feature"},
)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_decision_has_reason(self):
issue = _make_scored_issue()
d = decide(issue)
assert len(d.reason) > 10
# ── _build_audit_comment ──────────────────────────────────────────────────────
class TestBuildAuditComment:
def test_contains_timmy_triage_header(self):
d = TriageDecision(42, "assign_claude", "High priority bug", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "Timmy Triage" in comment
def test_contains_issue_reason(self):
d = TriageDecision(42, "assign_claude", "Urgent P0 bug", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "Urgent P0 bug" in comment
def test_assign_claude_mentions_agent(self):
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert AGENT_CLAUDE in comment
def test_assign_kimi_mentions_label(self):
d = TriageDecision(42, "assign_kimi", "reason", agent=AGENT_KIMI)
comment = _build_audit_comment(d)
assert KIMI_READY_LABEL in comment
def test_flag_alex_mentions_owner(self):
d = TriageDecision(42, "flag_alex", "blocked", agent=OWNER_LOGIN)
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
def test_contains_override_note(self):
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "override" in comment.lower()
# ── _build_daily_summary ──────────────────────────────────────────────────────
class TestBuildDailySummary:
def _make_result(self, decisions=None) -> TriageCycleResult:
return TriageCycleResult(
timestamp=datetime.now(UTC).isoformat(),
total_open=10,
scored=8,
ready=5,
decisions=decisions or [],
)
def test_contains_open_count(self):
result = self._make_result()
scored = [_make_scored_issue(number=i, ready=True, score=6) for i in range(1, 4)]
summary = _build_daily_summary(result, scored)
assert "10" in summary # total_open
def test_contains_ready_count(self):
result = self._make_result()
summary = _build_daily_summary(result, [])
assert "5" in summary
def test_actions_taken_section(self):
decisions = [
TriageDecision(1, "assign_claude", "P0 bug", agent="claude", executed=True),
]
result = self._make_result(decisions=decisions)
summary = _build_daily_summary(result, [])
assert "Actions Taken" in summary
assert "#1" in summary
def test_top_issues_listed(self):
scored = [_make_scored_issue(number=99, ready=True, score=8)]
result = self._make_result()
summary = _build_daily_summary(result, scored)
assert "#99" in summary
def test_footer_present(self):
summary = _build_daily_summary(self._make_result(), [])
assert "Auto-generated" in summary
# ── BacklogTriageLoop ─────────────────────────────────────────────────────────
class TestBacklogTriageLoop:
def test_default_interval_from_settings(self):
loop = BacklogTriageLoop()
from config import settings
assert loop._interval == float(settings.backlog_triage_interval_seconds)
def test_custom_interval(self):
loop = BacklogTriageLoop(interval=300)
assert loop._interval == 300.0
def test_dry_run_default(self):
loop = BacklogTriageLoop(dry_run=True)
assert loop._dry_run is True
def test_not_running_initially(self):
loop = BacklogTriageLoop()
assert loop.is_running is False
def test_stop_sets_running_false(self):
loop = BacklogTriageLoop()
loop._running = True
loop.stop()
assert loop._running is False
def test_cycle_count_starts_zero(self):
loop = BacklogTriageLoop()
assert loop.cycle_count == 0
@pytest.mark.asyncio
async def test_run_once_skips_when_no_gitea_token(self):
loop = BacklogTriageLoop()
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
with patch("timmy.backlog_triage.settings", mock_settings):
result = await loop.run_once()
assert result.total_open == 0
@pytest.mark.asyncio
async def test_run_once_dry_run_no_api_writes(self):
"""In dry_run mode, decisions are made but no Gitea API writes happen."""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
raw_issues = [
_make_raw_issue(
number=10,
title="Fix crash",
labels=["bug"],
body=(
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
"def login()\nshould return 200\nmust pass tox tests"
),
)
]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_client = AsyncMock()
mock_client.get.return_value = MagicMock(
status_code=200, json=MagicMock(return_value=raw_issues)
)
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
):
result = await loop.run_once()
# No POST/PATCH calls in dry run
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
assert result.total_open == 1
assert loop.cycle_count == 1
assert len(loop.history) == 1
@pytest.mark.asyncio
async def test_run_once_assigns_unassigned_bug(self):
"""Unassigned ready bug should be assigned to Claude with audit comment."""
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
body = (
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
"def login()\nshould return 200\nmust pass tox tests"
)
raw_issues = [_make_raw_issue(number=5, title="Fix crash", labels=["bug"], body=body)]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
# GET /issues returns our issue
get_issues_resp = MagicMock(status_code=200)
get_issues_resp.json.return_value = raw_issues
# POST /comments returns success
comment_resp = MagicMock(status_code=201)
comment_resp.json.return_value = {"id": 1}
# PATCH /issues/{n} (assign) returns success
assign_resp = MagicMock(status_code=200)
assign_resp.json.return_value = {"number": 5}
mock_client = AsyncMock()
mock_client.get.return_value = get_issues_resp
mock_client.post.return_value = comment_resp
mock_client.patch.return_value = assign_resp
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
patch("asyncio.sleep", new_callable=AsyncMock),
):
result = await loop.run_once()
assert result.total_open == 1
# Comment should have been posted
mock_client.post.assert_called()
# Assign should have been called (PATCH)
mock_client.patch.assert_called()
@pytest.mark.asyncio
async def test_run_once_skips_already_assigned(self):
"""Issues already assigned should not be acted upon."""
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
raw_issues = [
_make_raw_issue(
number=3,
labels=["bug"],
assignees=["perplexity"],
body="## Problem\nX\nmust pass tox\nshould return 200 at least 3 times",
)
]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
get_resp = MagicMock(status_code=200)
get_resp.json.return_value = raw_issues
mock_client = AsyncMock()
mock_client.get.return_value = get_resp
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
):
result = await loop.run_once()
# No writes for already-assigned issue
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
assert result.decisions[0].action == "skip"
# ── ScoredIssue properties ────────────────────────────────────────────────────
class TestScoredIssueProperties:
def test_is_unassigned_true_when_no_assignees(self):
issue = _make_scored_issue(assignees=[])
assert issue.is_unassigned is True
def test_is_unassigned_false_when_assigned(self):
issue = _make_scored_issue(assignees=["claude"])
assert issue.is_unassigned is False
def test_needs_kimi_for_research_tag(self):
issue = _make_scored_issue(tags={"research"})
assert issue.needs_kimi is True
def test_needs_kimi_for_kimi_ready_label(self):
issue = _make_scored_issue(labels=["kimi-ready"], tags=set())
assert issue.needs_kimi is True
def test_needs_kimi_false_for_bug(self):
issue = _make_scored_issue(tags={"bug"}, labels=[])
assert issue.needs_kimi is False

View File

@@ -0,0 +1,103 @@
"""Unit tests for timmy.vassal.agent_health."""
from __future__ import annotations
import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
def test_agent_status_idle_default():
s = AgentStatus(agent="claude")
assert s.is_idle is True
assert s.is_stuck is False
assert s.needs_reassignment is False
def test_agent_status_active():
s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11])
s.is_idle = len(s.active_issue_numbers) == 0
assert s.is_idle is False
def test_agent_status_stuck():
s = AgentStatus(
agent="claude",
active_issue_numbers=[7],
stuck_issue_numbers=[7],
is_idle=False,
)
assert s.is_stuck is True
assert s.needs_reassignment is True
# ---------------------------------------------------------------------------
# AgentHealthReport
# ---------------------------------------------------------------------------
def test_report_any_stuck():
claude = AgentStatus(agent="claude", stuck_issue_numbers=[3])
kimi = AgentStatus(agent="kimi")
report = AgentHealthReport(agents=[claude, kimi])
assert report.any_stuck is True
def test_report_all_idle():
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.all_idle is True
def test_report_for_agent_found():
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
found = report.for_agent("kimi")
assert found is kimi
def test_report_for_agent_not_found():
report = AgentHealthReport(agents=[AgentStatus(agent="claude")])
assert report.for_agent("timmy") is None
# ---------------------------------------------------------------------------
# check_agent_health — no Gitea in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_agent_health_unknown_agent():
"""Unknown agent name returns idle status without error."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("unknown-bot")
assert status.agent == "unknown-bot"
assert status.is_idle is True
@pytest.mark.asyncio
async def test_check_agent_health_no_token():
"""Returns idle status gracefully when Gitea token is absent."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("claude")
# Should not raise; returns idle (no active issues discovered)
assert isinstance(status, AgentStatus)
assert status.agent == "claude"
@pytest.mark.asyncio
async def test_get_full_health_report_returns_both_agents():
from timmy.vassal.agent_health import get_full_health_report
report = await get_full_health_report()
agent_names = {a.agent for a in report.agents}
assert "claude" in agent_names
assert "kimi" in agent_names

View File

@@ -0,0 +1,186 @@
"""Unit tests for timmy.vassal.backlog — triage and fetch helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import (
AgentTarget,
TriagedIssue,
_choose_agent,
_extract_labels,
_score_priority,
triage_issues,
)
# ---------------------------------------------------------------------------
# _extract_labels
# ---------------------------------------------------------------------------
def test_extract_labels_empty():
assert _extract_labels({}) == []
def test_extract_labels_normalises_case():
issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]}
assert _extract_labels(issue) == ["high", "feature"]
# ---------------------------------------------------------------------------
# _score_priority
# ---------------------------------------------------------------------------
def test_priority_urgent():
assert _score_priority(["urgent"], []) == 100
def test_priority_high():
assert _score_priority(["high"], []) == 75
def test_priority_normal_default():
assert _score_priority([], []) == 50
def test_priority_assigned_penalised():
# already assigned → subtract 20
score = _score_priority([], ["some-agent"])
assert score == 30
def test_priority_label_substring_match():
# "critical" contains "critical" → 90
assert _score_priority(["critical-bug"], []) == 90
# ---------------------------------------------------------------------------
# _choose_agent
# ---------------------------------------------------------------------------
def test_choose_claude_for_architecture():
target, rationale = _choose_agent("Refactor auth middleware", "", [])
assert target == AgentTarget.CLAUDE
assert "complex" in rationale or "high-complexity" in rationale
def test_choose_kimi_for_research():
target, rationale = _choose_agent("Deep research on embedding models", "", [])
assert target == AgentTarget.KIMI
def test_choose_timmy_for_docs():
target, rationale = _choose_agent("Update documentation for CLI", "", [])
assert target == AgentTarget.TIMMY
def test_choose_timmy_default():
target, rationale = _choose_agent("Fix typo in README", "simple change", [])
# Could route to timmy (docs/trivial) or default — either is valid
assert isinstance(target, AgentTarget)
def test_choose_agent_label_wins():
# "security" label → Claude
target, _ = _choose_agent("Login page", "", ["security"])
assert target == AgentTarget.CLAUDE
# ---------------------------------------------------------------------------
# triage_issues
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int,
title: str,
body: str = "",
labels: list[str] | None = None,
assignees: list[str] | None = None,
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"html_url": f"http://gitea/issues/{number}",
}
def test_triage_returns_sorted_by_priority():
issues = [
_make_raw_issue(1, "Routine docs update", labels=["docs"]),
_make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]),
_make_raw_issue(3, "Normal feature", labels=[]),
]
triaged = triage_issues(issues)
# Highest priority first
assert triaged[0].number == 2
assert triaged[0].priority_score == 100 # urgent label
def test_triage_prs_can_be_included():
# triage_issues does not filter PRs — that's fetch_open_issues's job
issues = [_make_raw_issue(10, "A PR-like issue")]
triaged = triage_issues(issues)
assert len(triaged) == 1
def test_triage_empty():
assert triage_issues([]) == []
def test_triage_routing():
issues = [
_make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"),
_make_raw_issue(2, "Refactor agent loader", body="architecture change"),
_make_raw_issue(3, "Fix typo in docs", labels=["docs"]),
]
triaged = {i.number: i for i in triage_issues(issues)}
assert triaged[1].agent_target == AgentTarget.KIMI
assert triaged[2].agent_target == AgentTarget.CLAUDE
assert triaged[3].agent_target == AgentTarget.TIMMY
def test_triage_preserves_url():
issues = [_make_raw_issue(42, "Some issue")]
triaged = triage_issues(issues)
assert triaged[0].url == "http://gitea/issues/42"
# ---------------------------------------------------------------------------
# fetch_open_issues — no Gitea available in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch):
"""When Gitea is disabled, fetch returns [] without raising."""
import timmy.vassal.backlog as bl
# Patch settings
class FakeSettings:
gitea_enabled = False
gitea_token = ""
gitea_url = "http://localhost:3000"
gitea_repo = "owner/repo"
monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import
# We can't easily monkeypatch `from config import settings` inside the function,
# so test the no-token path via environment
import os
original = os.environ.pop("GITEA_TOKEN", None)
try:
result = await bl.fetch_open_issues()
# Should return [] gracefully (no token configured by default in test env)
assert isinstance(result, list)
finally:
if original is not None:
os.environ["GITEA_TOKEN"] = original

View File

@@ -0,0 +1,114 @@
"""Unit tests for timmy.vassal.dispatch — routing and label helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import AgentTarget, TriagedIssue
from timmy.vassal.dispatch import (
DispatchRecord,
clear_dispatch_registry,
get_dispatch_registry,
)
def _make_triaged(
number: int,
title: str,
agent: AgentTarget,
priority: int = 50,
) -> TriagedIssue:
return TriagedIssue(
number=number,
title=title,
body="",
agent_target=agent,
priority_score=priority,
rationale="test rationale",
url=f"http://gitea/issues/{number}",
)
# ---------------------------------------------------------------------------
# Registry helpers
# ---------------------------------------------------------------------------
def test_registry_starts_empty():
clear_dispatch_registry()
assert get_dispatch_registry() == {}
def test_registry_returns_copy():
clear_dispatch_registry()
reg = get_dispatch_registry()
reg[999] = None # type: ignore[assignment]
assert 999 not in get_dispatch_registry()
# ---------------------------------------------------------------------------
# dispatch_issue — Timmy self-dispatch (no Gitea required)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dispatch_timmy_self_no_gitea():
"""Timmy self-dispatch records without hitting Gitea."""
clear_dispatch_registry()
issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert isinstance(record, DispatchRecord)
assert record.issue_number == 1
assert record.agent == AgentTarget.TIMMY
assert 1 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_claude_no_gitea_token():
"""Claude dispatch gracefully degrades when Gitea token is absent."""
clear_dispatch_registry()
issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.issue_number == 2
assert record.agent == AgentTarget.CLAUDE
# label/comment not applied — no token
assert record.label_applied is False
assert 2 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_kimi_no_gitea_token():
clear_dispatch_registry()
issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.agent == AgentTarget.KIMI
assert record.label_applied is False
# ---------------------------------------------------------------------------
# DispatchRecord fields
# ---------------------------------------------------------------------------
def test_dispatch_record_defaults():
r = DispatchRecord(
issue_number=5,
issue_title="Test issue",
agent=AgentTarget.TIMMY,
rationale="because",
)
assert r.label_applied is False
assert r.comment_posted is False
assert r.dispatched_at # has a timestamp

View File

@@ -0,0 +1,116 @@
"""Unit tests for timmy.vassal.house_health."""
from __future__ import annotations
import pytest
from timmy.vassal.house_health import (
DiskUsage,
MemoryUsage,
OllamaHealth,
SystemSnapshot,
_probe_disk,
)
# ---------------------------------------------------------------------------
# Data model tests
# ---------------------------------------------------------------------------
def test_system_snapshot_healthy_when_no_warnings():
snap = SystemSnapshot()
assert snap.healthy is True
def test_system_snapshot_unhealthy_with_warnings():
snap = SystemSnapshot(warnings=["disk 90% full"])
assert snap.healthy is False
def test_disk_usage_defaults():
d = DiskUsage()
assert d.percent_used == 0.0
assert d.path == "/"
def test_memory_usage_defaults():
m = MemoryUsage()
assert m.percent_used == 0.0
def test_ollama_health_defaults():
o = OllamaHealth()
assert o.reachable is False
assert o.loaded_models == []
# ---------------------------------------------------------------------------
# _probe_disk — runs against real filesystem
# ---------------------------------------------------------------------------
def test_probe_disk_root():
result = _probe_disk("/")
assert result.total_gb > 0
assert 0.0 <= result.percent_used <= 100.0
assert result.free_gb >= 0
def test_probe_disk_bad_path():
result = _probe_disk("/nonexistent_path_xyz")
# Should not raise — returns zeroed DiskUsage
assert result.percent_used == 0.0
# ---------------------------------------------------------------------------
# get_system_snapshot — async
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_system_snapshot_returns_snapshot():
from timmy.vassal.house_health import get_system_snapshot
snap = await get_system_snapshot()
assert isinstance(snap, SystemSnapshot)
# Disk is always probed
assert snap.disk.total_gb >= 0
# Ollama is likely unreachable in test env — that's fine
assert isinstance(snap.ollama, OllamaHealth)
@pytest.mark.asyncio
async def test_get_system_snapshot_disk_warning(monkeypatch):
"""When disk is above threshold, a warning is generated."""
import timmy.vassal.house_health as hh
# Patch _probe_disk to return high usage
def _full_disk(path: str) -> DiskUsage:
return DiskUsage(
path=path,
total_gb=100.0,
used_gb=90.0,
free_gb=10.0,
percent_used=90.0,
)
monkeypatch.setattr(hh, "_probe_disk", _full_disk)
snap = await hh.get_system_snapshot()
assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings)
# ---------------------------------------------------------------------------
# cleanup_stale_files — temp dir test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cleanup_stale_files_missing_dir():
"""Should not raise when the target dir doesn't exist."""
from timmy.vassal.house_health import cleanup_stale_files
result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"])
assert result["deleted_count"] == 0
assert result["errors"] == []

View File

@@ -0,0 +1,139 @@
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
from __future__ import annotations
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
def test_cycle_record_healthy_when_no_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
)
assert r.healthy is True
def test_cycle_record_unhealthy_with_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
errors=["backlog: connection refused"],
)
assert r.healthy is False
def test_cycle_record_unhealthy_with_warnings():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
house_warnings=["disk 90% full"],
)
assert r.healthy is False
# ---------------------------------------------------------------------------
# VassalOrchestrator state
# ---------------------------------------------------------------------------
def test_orchestrator_initial_state():
orch = VassalOrchestrator()
assert orch.cycle_count == 0
assert orch.is_running is False
assert orch.history == []
def test_orchestrator_get_status_no_cycles():
orch = VassalOrchestrator()
status = orch.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
# ---------------------------------------------------------------------------
# run_cycle — integration (no Gitea, no Ollama in test env)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_cycle_completes_without_services():
"""run_cycle must complete and record even when external services are down."""
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator(cycle_interval=300)
record = await orch.run_cycle()
assert isinstance(record, VassalCycleRecord)
assert record.cycle_id == 1
assert record.finished_at # was set
assert record.duration_ms >= 0
# No Gitea → fetched = 0, dispatched = 0
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
# History updated
assert len(orch.history) == 1
assert orch.cycle_count == 1
@pytest.mark.asyncio
async def test_run_cycle_increments_cycle_count():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
await orch.run_cycle()
assert orch.cycle_count == 2
assert len(orch.history) == 2
@pytest.mark.asyncio
async def test_get_status_after_cycle():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
status = orch.get_status()
assert status["cycle_count"] == 1
last = status["last_cycle"]
assert last is not None
assert last["cycle_id"] == 1
assert last["issues_fetched"] == 0
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
def test_orchestrator_stop_when_not_running():
"""stop() on an idle orchestrator must not raise."""
orch = VassalOrchestrator()
orch.stop() # should be a no-op
assert orch.is_running is False
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
def test_module_singleton_exists():
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)