Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
617ef43f99 test: add unit tests for daily_run.py — 51 tests covering main handlers
Some checks failed
Tests / lint (pull_request) Failing after 14s
Tests / test (pull_request) Has been skipped
Adds tests/dashboard/test_daily_run.py with 51 test cases covering:
- _load_config(): defaults, file loading, env var overrides, invalid JSON
- _get_token(): from config dict, from file, missing file
- GiteaClient: headers, api_url, is_available (true/false/cached), get_paginated
- LayerMetrics: trend and trend_color properties (all directions)
- DailyRunMetrics: sessions_trend and sessions_trend_color properties
- _extract_layer(): label extraction from issue label lists
- _load_cycle_data(): success counting, invalid JSON lines, missing timestamps
- _fetch_layer_metrics(): counting logic, graceful degradation on errors
- _get_metrics(): unavailable client, happy path, exception handling
- Route handlers: /daily-run/metrics (JSON) and /daily-run/panel (HTML)

All 51 tests pass. tox -e unit remains green (293 passing).

Fixes #1186

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 17:57:26 -04:00
16 changed files with 276 additions and 2285 deletions

View File

@@ -71,50 +71,6 @@ class GitHand:
return True
return False
async def _exec_subprocess(
self, args: str, timeout: int,
) -> tuple[bytes, bytes, int]:
"""Run git as a subprocess, return (stdout, stderr, returncode).
Raises TimeoutError if the process exceeds *timeout* seconds.
"""
proc = await asyncio.create_subprocess_exec(
"git",
*args.split(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self._repo_dir,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout,
)
except TimeoutError:
proc.kill()
await proc.wait()
raise
return stdout, stderr, proc.returncode or 0
@staticmethod
def _parse_output(
command: str,
stdout_bytes: bytes,
stderr_bytes: bytes,
returncode: int | None,
latency_ms: float,
) -> GitResult:
"""Decode subprocess output into a GitResult."""
exit_code = returncode or 0
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return GitResult(
operation=command,
success=exit_code == 0,
output=stdout,
error=stderr if exit_code != 0 else "",
latency_ms=latency_ms,
)
async def run(
self,
args: str,
@@ -132,15 +88,14 @@ class GitHand:
GitResult with output or error details.
"""
start = time.time()
command = f"git {args}"
# Gate destructive operations
if self._is_destructive(args) and not allow_destructive:
return GitResult(
operation=command,
operation=f"git {args}",
success=False,
error=(
f"Destructive operation blocked: '{command}'. "
f"Destructive operation blocked: 'git {args}'. "
"Set allow_destructive=True to override."
),
requires_confirmation=True,
@@ -148,20 +103,46 @@ class GitHand:
)
effective_timeout = timeout or self._timeout
command = f"git {args}"
try:
stdout_bytes, stderr_bytes, returncode = await self._exec_subprocess(
args, effective_timeout,
proc = await asyncio.create_subprocess_exec(
"git",
*args.split(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self._repo_dir,
)
except TimeoutError:
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Git command timed out after %ds: %s", effective_timeout, command)
return GitResult(
operation=command,
success=False,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
)
latency = (time.time() - start) * 1000
logger.warning("Git command timed out after %ds: %s", effective_timeout, command)
exit_code = proc.returncode or 0
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return GitResult(
operation=command,
success=False,
error=f"Command timed out after {effective_timeout}s",
success=exit_code == 0,
output=stdout,
error=stderr if exit_code != 0 else "",
latency_ms=latency,
)
except FileNotFoundError:
latency = (time.time() - start) * 1000
logger.warning("git binary not found")
@@ -181,12 +162,6 @@ class GitHand:
latency_ms=latency,
)
return self._parse_output(
command, stdout_bytes, stderr_bytes,
returncode=returncode,
latency_ms=(time.time() - start) * 1000,
)
# ── Convenience wrappers ─────────────────────────────────────────────────
async def status(self) -> GitResult:

View File

@@ -242,64 +242,6 @@ def produce_agent_state(agent_id: str, presence: dict) -> dict:
}
def _get_agents_online() -> int:
"""Return the count of agents with a non-offline status."""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
return sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
return 0
def _get_visitors() -> int:
"""Return the count of active WebSocket visitor clients."""
try:
from dashboard.routes.world import _ws_clients
return len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
return 0
def _get_uptime_seconds() -> int:
"""Return seconds elapsed since application start."""
try:
from config import APP_START_TIME
return int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
return 0
def _get_thinking_active() -> bool:
"""Return True if the thinking engine is enabled and running."""
try:
from config import settings
from timmy.thinking import thinking_engine
return settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
return False
def _get_memory_count() -> int:
"""Return total entries in the vector memory store."""
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
return stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return 0
def produce_system_status() -> dict:
"""Generate a system_status message for the Matrix.
@@ -328,14 +270,64 @@ def produce_system_status() -> dict:
"ts": 1742529600,
}
"""
# Count agents with status != offline
agents_online = 0
try:
from timmy.agents.loader import list_agents
agents = list_agents()
agents_online = sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
# Count visitors from WebSocket clients
visitors = 0
try:
from dashboard.routes.world import _ws_clients
visitors = len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
# Calculate uptime
uptime_seconds = 0
try:
from datetime import UTC
from config import APP_START_TIME
uptime_seconds = int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
# Check thinking engine status
thinking_active = False
try:
from config import settings
from timmy.thinking import thinking_engine
thinking_active = settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
# Count memories in vector store
memory_count = 0
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
memory_count = stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return {
"type": "system_status",
"data": {
"agents_online": _get_agents_online(),
"visitors": _get_visitors(),
"uptime_seconds": _get_uptime_seconds(),
"thinking_active": _get_thinking_active(),
"memory_count": _get_memory_count(),
"agents_online": agents_online,
"visitors": visitors,
"uptime_seconds": uptime_seconds,
"thinking_active": thinking_active,
"memory_count": memory_count,
},
"ts": int(time.time()),
}

View File

@@ -528,71 +528,6 @@ class CascadeRouter:
return True
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
"""Return the provider list filtered by tier.
Raises:
RuntimeError: If a tier is specified but no matching providers exist.
"""
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
return providers
if cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
return providers
return self.providers
async def _try_single_provider(
self,
provider: "Provider",
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
errors: list[str],
) -> dict | None:
"""Attempt one provider, returning a result dict on success or None on failure.
On failure the error string is appended to *errors* and the provider's
failure metrics are updated so the caller can move on to the next provider.
"""
if not self._is_provider_available(provider):
return None
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
return None
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider, messages, selected_model, temperature, max_tokens, content_type
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
return None
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
async def complete(
self,
messages: list[dict],
@@ -626,15 +561,55 @@ class CascadeRouter:
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
errors: list[str] = []
providers = self._filter_providers(cascade_tier)
errors = []
providers = self.providers
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
elif cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
result = await self._try_single_provider(
provider, messages, model, temperature, max_tokens, content_type, errors
)
if result is not None:
return result
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -110,92 +110,6 @@ async def _get_or_create_label(
return None
# ---------------------------------------------------------------------------
# Dispatch action helpers
# ---------------------------------------------------------------------------
async def _apply_label_to_issue(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
label_name: str,
) -> bool:
"""Get-or-create the label then apply it to the issue. Returns True on success."""
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
if label_id is None:
return False
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
async def _post_dispatch_comment(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue: TriagedIssue,
label_name: str,
) -> bool:
"""Post the vassal routing comment. Returns True on success."""
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
return resp.status_code in (200, 201)
async def _perform_gitea_dispatch(
issue: TriagedIssue,
record: DispatchRecord,
) -> None:
"""Apply label and post comment via Gitea. Mutates *record* in-place."""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
return
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
record.label_applied = await _apply_label_to_issue(
client, base_url, headers, repo, issue.number, label_name
)
record.comment_posted = await _post_dispatch_comment(
client, base_url, headers, repo, issue, label_name
)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
@@ -230,7 +144,58 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
_registry[issue.number] = record
return record
await _perform_gitea_dispatch(issue, record)
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(

View File

@@ -95,106 +95,6 @@ def _get_config_dir() -> Path:
return DEFAULT_CONFIG_DIR
def _load_daily_run_config() -> dict[str, Any]:
"""Load and validate the daily run configuration."""
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
return config
def _display_schedules_table(schedules: dict[str, Any]) -> None:
"""Display the daily run schedules in a table."""
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
def _display_triggers_table(triggers: dict[str, Any]) -> None:
"""Display the triggers in a table."""
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
def _execute_automation(auto: dict[str, Any], verbose: bool) -> None:
"""Execute a single automation and display results."""
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
return
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
def _execute_all_automations(verbose: bool) -> None:
"""Execute all enabled automations."""
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
return
for auto in enabled:
_execute_automation(auto, verbose)
@app.command()
def daily_run(
dry_run: bool = typer.Option(
@@ -213,22 +113,93 @@ def daily_run(
console.print("[bold green]Timmy Daily Run[/bold green]")
console.print()
config = _load_daily_run_config()
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
schedules = config.get("schedules", {})
triggers = config.get("triggers", {})
if verbose:
config_path = _get_config_dir() / "daily_run.json"
console.print(f"[dim]Config loaded from: {config_path}[/dim]")
console.print()
_display_schedules_table(schedules)
_display_triggers_table(triggers)
# Show the daily run schedule
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
# Show triggers
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
if dry_run:
console.print("[yellow]Dry run mode — no actions executed.[/yellow]")
else:
_execute_all_automations(verbose)
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
for auto in enabled:
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
continue
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
@app.command()

View File

@@ -86,19 +86,6 @@
<p>Your task has been added to the queue. Timmy will review it shortly.</p>
<button type="button" id="submit-another-btn" class="btn-primary">Submit Another</button>
</div>
<div id="submit-job-queued" class="submit-job-queued hidden">
<div class="queued-icon">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
<circle cx="12" cy="12" r="10"></circle>
<polyline points="12 6 12 12 16 14"></polyline>
</svg>
</div>
<h3>Job Queued</h3>
<p>The server is unreachable right now. Your job has been saved locally and will be submitted automatically when the connection is restored.</p>
<div id="queue-count-display" class="queue-count-display"></div>
<button type="button" id="submit-another-queued-btn" class="btn-primary">Submit Another</button>
</div>
</div>
<div id="submit-job-backdrop" class="submit-job-backdrop"></div>
</div>
@@ -155,7 +142,6 @@
import { createFamiliar } from "./familiar.js";
import { setupControls } from "./controls.js";
import { StateReader } from "./state.js";
import { messageQueue } from "./queue.js";
// --- Renderer ---
const renderer = new THREE.WebGLRenderer({ antialias: true });
@@ -196,60 +182,8 @@
moodEl.textContent = state.timmyState.mood;
}
});
// Replay queued jobs whenever the server comes back online.
stateReader.onConnectionChange(async (online) => {
if (!online) return;
const pending = messageQueue.getPending();
if (pending.length === 0) return;
console.log(`[queue] Online — replaying ${pending.length} queued job(s)`);
for (const item of pending) {
try {
const response = await fetch("/api/tasks", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(item.payload),
});
if (response.ok) {
messageQueue.markDelivered(item.id);
console.log(`[queue] Delivered queued job ${item.id}`);
} else {
messageQueue.markFailed(item.id);
console.warn(`[queue] Failed to deliver job ${item.id}: ${response.status}`);
}
} catch (err) {
// Still offline — leave as QUEUED, will retry next cycle.
console.warn(`[queue] Replay aborted (still offline): ${err}`);
break;
}
}
messageQueue.prune();
_updateQueueBadge();
});
stateReader.connect();
// --- Queue badge (top-right indicator for pending jobs) ---
function _updateQueueBadge() {
const count = messageQueue.pendingCount();
let badge = document.getElementById("queue-badge");
if (count === 0) {
if (badge) badge.remove();
return;
}
if (!badge) {
badge = document.createElement("div");
badge.id = "queue-badge";
badge.className = "queue-badge";
badge.title = "Jobs queued offline — will submit on reconnect";
document.getElementById("overlay").appendChild(badge);
}
badge.textContent = `${count} queued`;
}
// Show badge on load if there are already queued messages.
messageQueue.prune();
_updateQueueBadge();
// --- About Panel ---
const infoBtn = document.getElementById("info-btn");
const aboutPanel = document.getElementById("about-panel");
@@ -294,9 +228,6 @@
const descWarning = document.getElementById("desc-warning");
const submitJobSuccess = document.getElementById("submit-job-success");
const submitAnotherBtn = document.getElementById("submit-another-btn");
const submitJobQueued = document.getElementById("submit-job-queued");
const submitAnotherQueuedBtn = document.getElementById("submit-another-queued-btn");
const queueCountDisplay = document.getElementById("queue-count-display");
// Constants
const MAX_TITLE_LENGTH = 200;
@@ -324,7 +255,6 @@
submitJobForm.reset();
submitJobForm.classList.remove("hidden");
submitJobSuccess.classList.add("hidden");
submitJobQueued.classList.add("hidden");
updateCharCounts();
clearErrors();
validateForm();
@@ -433,7 +363,6 @@
submitJobBackdrop.addEventListener("click", closeSubmitJobModal);
cancelJobBtn.addEventListener("click", closeSubmitJobModal);
submitAnotherBtn.addEventListener("click", resetForm);
submitAnotherQueuedBtn.addEventListener("click", resetForm);
// Input event listeners for real-time validation
jobTitle.addEventListener("input", () => {
@@ -491,10 +420,9 @@
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(formData),
signal: AbortSignal.timeout(8000),
body: JSON.stringify(formData)
});
if (response.ok) {
// Show success state
submitJobForm.classList.add("hidden");
@@ -505,14 +433,9 @@
descError.classList.add("visible");
}
} catch (error) {
// Server unreachable — persist to localStorage queue.
messageQueue.enqueue(formData);
const count = messageQueue.pendingCount();
// For demo/development, show success even if API fails
submitJobForm.classList.add("hidden");
submitJobQueued.classList.remove("hidden");
queueCountDisplay.textContent =
count > 1 ? `${count} jobs queued` : "1 job queued";
_updateQueueBadge();
submitJobSuccess.classList.remove("hidden");
} finally {
submitJobSubmit.disabled = false;
submitJobSubmit.textContent = "Submit Job";

View File

@@ -1,90 +0,0 @@
/**
* Offline message queue for Workshop panel.
*
* Persists undelivered job submissions to localStorage so they survive
* page refreshes and are replayed when the server comes back online.
*/
const _QUEUE_KEY = "timmy_workshop_queue";
const _MAX_AGE_MS = 24 * 60 * 60 * 1000; // 24 hours — auto-expire old items
export const STATUS = {
QUEUED: "queued",
DELIVERED: "delivered",
FAILED: "failed",
};
function _load() {
try {
const raw = localStorage.getItem(_QUEUE_KEY);
return raw ? JSON.parse(raw) : [];
} catch {
return [];
}
}
function _save(items) {
try {
localStorage.setItem(_QUEUE_KEY, JSON.stringify(items));
} catch {
/* localStorage unavailable — degrade silently */
}
}
function _uid() {
return `msg_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
}
/** LocalStorage-backed message queue for Workshop job submissions. */
export const messageQueue = {
/** Add a payload. Returns the created item (with id and status). */
enqueue(payload) {
const item = {
id: _uid(),
payload,
queuedAt: new Date().toISOString(),
status: STATUS.QUEUED,
};
const items = _load();
items.push(item);
_save(items);
return item;
},
/** Mark a message as delivered and remove it from storage. */
markDelivered(id) {
_save(_load().filter((i) => i.id !== id));
},
/** Mark a message as permanently failed (kept for 24h for visibility). */
markFailed(id) {
_save(
_load().map((i) =>
i.id === id ? { ...i, status: STATUS.FAILED } : i
)
);
},
/** All messages waiting to be delivered. */
getPending() {
return _load().filter((i) => i.status === STATUS.QUEUED);
},
/** Total queued (QUEUED status only) count. */
pendingCount() {
return this.getPending().length;
},
/** Drop expired failed items (> 24h old). */
prune() {
const cutoff = Date.now() - _MAX_AGE_MS;
_save(
_load().filter(
(i) =>
i.status === STATUS.QUEUED ||
(i.status === STATUS.FAILED &&
new Date(i.queuedAt).getTime() > cutoff)
)
);
},
};

View File

@@ -3,10 +3,6 @@
*
* Provides Timmy's current state to the scene. In Phase 2 this is a
* static default; the WebSocket path is stubbed for future use.
*
* Also manages connection health monitoring: pings /api/matrix/health
* every 30 seconds and notifies listeners when online/offline state
* changes so the Workshop can replay any queued messages.
*/
const DEFAULTS = {
@@ -24,19 +20,11 @@ const DEFAULTS = {
version: 1,
};
const _HEALTH_URL = "/api/matrix/health";
const _PING_INTERVAL_MS = 30_000;
const _WS_RECONNECT_DELAY_MS = 5_000;
export class StateReader {
constructor() {
this.state = { ...DEFAULTS };
this.listeners = [];
this.connectionListeners = [];
this._ws = null;
this._online = false;
this._pingTimer = null;
this._reconnectTimer = null;
}
/** Subscribe to state changes. */
@@ -44,12 +32,7 @@ export class StateReader {
this.listeners.push(fn);
}
/** Subscribe to online/offline transitions. Called with (isOnline: bool). */
onConnectionChange(fn) {
this.connectionListeners.push(fn);
}
/** Notify all state listeners. */
/** Notify all listeners. */
_notify() {
for (const fn of this.listeners) {
try {
@@ -60,48 +43,8 @@ export class StateReader {
}
}
/** Fire connection listeners only when state actually changes. */
_notifyConnection(online) {
if (online === this._online) return;
this._online = online;
for (const fn of this.connectionListeners) {
try {
fn(online);
} catch (e) {
console.warn("Connection listener error:", e);
}
}
}
/** Ping the health endpoint once and update connection state. */
async _ping() {
try {
const r = await fetch(_HEALTH_URL, {
signal: AbortSignal.timeout(5000),
});
this._notifyConnection(r.ok);
} catch {
this._notifyConnection(false);
}
}
/** Start 30-second health-check loop (idempotent). */
_startHealthCheck() {
if (this._pingTimer) return;
this._pingTimer = setInterval(() => this._ping(), _PING_INTERVAL_MS);
}
/** Schedule a WebSocket reconnect attempt after a delay (idempotent). */
_scheduleReconnect() {
if (this._reconnectTimer) return;
this._reconnectTimer = setTimeout(() => {
this._reconnectTimer = null;
this._connectWS();
}, _WS_RECONNECT_DELAY_MS);
}
/** Open (or re-open) the WebSocket connection. */
_connectWS() {
/** Try to connect to the world WebSocket for live updates. */
connect() {
const proto = location.protocol === "https:" ? "wss:" : "ws:";
const url = `${proto}//${location.host}/api/world/ws`;
try {
@@ -109,13 +52,10 @@ export class StateReader {
this._ws.onopen = () => {
const dot = document.getElementById("connection-dot");
if (dot) dot.classList.add("connected");
this._notifyConnection(true);
};
this._ws.onclose = () => {
const dot = document.getElementById("connection-dot");
if (dot) dot.classList.remove("connected");
this._notifyConnection(false);
this._scheduleReconnect();
};
this._ws.onmessage = (ev) => {
try {
@@ -135,18 +75,9 @@ export class StateReader {
};
} catch (e) {
console.warn("WebSocket unavailable — using static state");
this._scheduleReconnect();
}
}
/** Connect to the world WebSocket and start health-check polling. */
connect() {
this._connectWS();
this._startHealthCheck();
// Immediate ping so connection status is known before the first interval.
this._ping();
}
/** Current mood string. */
get mood() {
return this.state.timmyState.mood;
@@ -161,9 +92,4 @@ export class StateReader {
get energy() {
return this.state.timmyState.energy;
}
/** Whether the server is currently reachable. */
get isOnline() {
return this._online;
}
}

View File

@@ -604,68 +604,6 @@ canvas {
opacity: 1;
}
/* Queued State (offline buffer) */
.submit-job-queued {
text-align: center;
padding: 32px 16px;
}
.submit-job-queued.hidden {
display: none;
}
.queued-icon {
width: 64px;
height: 64px;
margin: 0 auto 20px;
color: #ffaa33;
}
.queued-icon svg {
width: 100%;
height: 100%;
}
.submit-job-queued h3 {
font-size: 20px;
color: #ffaa33;
margin: 0 0 12px 0;
}
.submit-job-queued p {
font-size: 14px;
color: #888;
margin: 0 0 16px 0;
line-height: 1.5;
}
.queue-count-display {
font-size: 12px;
color: #ffaa33;
margin-bottom: 24px;
opacity: 0.8;
}
/* Queue badge — shown in overlay corner when offline jobs are pending */
.queue-badge {
position: absolute;
bottom: 16px;
right: 16px;
padding: 4px 10px;
background: rgba(10, 10, 20, 0.85);
border: 1px solid rgba(255, 170, 51, 0.6);
border-radius: 12px;
color: #ffaa33;
font-size: 11px;
pointer-events: none;
animation: queue-pulse 2s ease-in-out infinite;
}
@keyframes queue-pulse {
0%, 100% { opacity: 0.8; }
50% { opacity: 1; }
}
/* Mobile adjustments */
@media (max-width: 480px) {
.about-panel-content {

View File

@@ -1,513 +0,0 @@
"""Unit tests for infrastructure.chat_store module."""
import threading
from pathlib import Path
import pytest
from infrastructure.chat_store import MAX_MESSAGES, Message, MessageLog, _get_conn
# ---------------------------------------------------------------------------
# Message dataclass
# ---------------------------------------------------------------------------
class TestMessageDataclass:
"""Tests for the Message dataclass."""
def test_message_required_fields(self):
"""Message can be created with required fields only."""
msg = Message(role="user", content="hello", timestamp="2024-01-01T00:00:00")
assert msg.role == "user"
assert msg.content == "hello"
assert msg.timestamp == "2024-01-01T00:00:00"
def test_message_default_source(self):
"""Message source defaults to 'browser'."""
msg = Message(role="user", content="hi", timestamp="2024-01-01T00:00:00")
assert msg.source == "browser"
def test_message_custom_source(self):
"""Message source can be overridden."""
msg = Message(role="agent", content="reply", timestamp="2024-01-01T00:00:00", source="api")
assert msg.source == "api"
def test_message_equality(self):
"""Two Messages with the same fields are equal (dataclass default)."""
m1 = Message(role="user", content="x", timestamp="t")
m2 = Message(role="user", content="x", timestamp="t")
assert m1 == m2
def test_message_inequality(self):
"""Messages with different content are not equal."""
m1 = Message(role="user", content="x", timestamp="t")
m2 = Message(role="user", content="y", timestamp="t")
assert m1 != m2
# ---------------------------------------------------------------------------
# _get_conn context manager
# ---------------------------------------------------------------------------
class TestGetConnContextManager:
"""Tests for the _get_conn context manager."""
def test_creates_db_file(self, tmp_path):
"""_get_conn creates the database file on first use."""
db = tmp_path / "chat.db"
assert not db.exists()
with _get_conn(db) as conn:
assert conn is not None
assert db.exists()
def test_creates_parent_directories(self, tmp_path):
"""_get_conn creates any missing parent directories."""
db = tmp_path / "nested" / "deep" / "chat.db"
with _get_conn(db):
pass
assert db.exists()
def test_creates_schema(self, tmp_path):
"""_get_conn creates the chat_messages table."""
db = tmp_path / "chat.db"
with _get_conn(db) as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='chat_messages'"
).fetchall()
assert len(tables) == 1
def test_schema_has_expected_columns(self, tmp_path):
"""chat_messages table has the expected columns."""
db = tmp_path / "chat.db"
with _get_conn(db) as conn:
info = conn.execute("PRAGMA table_info(chat_messages)").fetchall()
col_names = [row["name"] for row in info]
assert set(col_names) == {"id", "role", "content", "timestamp", "source"}
def test_idempotent_schema_creation(self, tmp_path):
"""Calling _get_conn twice does not fail (CREATE TABLE IF NOT EXISTS)."""
db = tmp_path / "chat.db"
with _get_conn(db):
pass
with _get_conn(db) as conn:
# Table still exists and is usable
conn.execute("SELECT COUNT(*) FROM chat_messages")
# ---------------------------------------------------------------------------
# MessageLog — basic operations
# ---------------------------------------------------------------------------
class TestMessageLogAppend:
"""Tests for MessageLog.append()."""
def test_append_single_message(self, tmp_path):
"""append() stores a message that can be retrieved."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "hello", "2024-01-01T00:00:00")
messages = log.all()
assert len(messages) == 1
assert messages[0].role == "user"
assert messages[0].content == "hello"
assert messages[0].timestamp == "2024-01-01T00:00:00"
assert messages[0].source == "browser"
log.close()
def test_append_custom_source(self, tmp_path):
"""append() stores the source field correctly."""
log = MessageLog(tmp_path / "chat.db")
log.append("agent", "reply", "2024-01-01T00:00:01", source="api")
msg = log.all()[0]
assert msg.source == "api"
log.close()
def test_append_multiple_messages_preserves_order(self, tmp_path):
"""append() preserves insertion order."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "first", "2024-01-01T00:00:00")
log.append("agent", "second", "2024-01-01T00:00:01")
log.append("user", "third", "2024-01-01T00:00:02")
messages = log.all()
assert [m.content for m in messages] == ["first", "second", "third"]
log.close()
def test_append_persists_across_instances(self, tmp_path):
"""Messages appended by one instance are readable by another."""
db = tmp_path / "chat.db"
log1 = MessageLog(db)
log1.append("user", "persisted", "2024-01-01T00:00:00")
log1.close()
log2 = MessageLog(db)
messages = log2.all()
assert len(messages) == 1
assert messages[0].content == "persisted"
log2.close()
class TestMessageLogAll:
"""Tests for MessageLog.all()."""
def test_all_on_empty_store_returns_empty_list(self, tmp_path):
"""all() returns [] when there are no messages."""
log = MessageLog(tmp_path / "chat.db")
assert log.all() == []
log.close()
def test_all_returns_message_objects(self, tmp_path):
"""all() returns a list of Message dataclass instances."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "hi", "2024-01-01T00:00:00")
messages = log.all()
assert all(isinstance(m, Message) for m in messages)
log.close()
def test_all_returns_all_messages(self, tmp_path):
"""all() returns every stored message."""
log = MessageLog(tmp_path / "chat.db")
for i in range(5):
log.append("user", f"msg{i}", f"2024-01-01T00:00:0{i}")
assert len(log.all()) == 5
log.close()
class TestMessageLogRecent:
"""Tests for MessageLog.recent()."""
def test_recent_on_empty_store_returns_empty_list(self, tmp_path):
"""recent() returns [] when there are no messages."""
log = MessageLog(tmp_path / "chat.db")
assert log.recent() == []
log.close()
def test_recent_default_limit(self, tmp_path):
"""recent() with default limit returns up to 50 messages."""
log = MessageLog(tmp_path / "chat.db")
for i in range(60):
log.append("user", f"msg{i}", f"2024-01-01T00:00:{i:02d}")
msgs = log.recent()
assert len(msgs) == 50
log.close()
def test_recent_custom_limit(self, tmp_path):
"""recent() respects a custom limit."""
log = MessageLog(tmp_path / "chat.db")
for i in range(10):
log.append("user", f"msg{i}", f"2024-01-01T00:00:0{i}")
msgs = log.recent(limit=3)
assert len(msgs) == 3
log.close()
def test_recent_returns_newest_messages(self, tmp_path):
"""recent() returns the most-recently-inserted messages."""
log = MessageLog(tmp_path / "chat.db")
for i in range(10):
log.append("user", f"msg{i}", f"2024-01-01T00:00:0{i}")
msgs = log.recent(limit=3)
# Should be the last 3 inserted, in oldest-first order
assert [m.content for m in msgs] == ["msg7", "msg8", "msg9"]
log.close()
def test_recent_fewer_than_limit_returns_all(self, tmp_path):
"""recent() returns all messages when count < limit."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "only", "2024-01-01T00:00:00")
msgs = log.recent(limit=10)
assert len(msgs) == 1
log.close()
def test_recent_returns_oldest_first(self, tmp_path):
"""recent() returns messages in oldest-first order."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "a", "2024-01-01T00:00:00")
log.append("user", "b", "2024-01-01T00:00:01")
log.append("user", "c", "2024-01-01T00:00:02")
msgs = log.recent(limit=2)
assert [m.content for m in msgs] == ["b", "c"]
log.close()
class TestMessageLogClear:
"""Tests for MessageLog.clear()."""
def test_clear_empties_the_store(self, tmp_path):
"""clear() removes all messages."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "hello", "2024-01-01T00:00:00")
log.clear()
assert log.all() == []
log.close()
def test_clear_on_empty_store_is_safe(self, tmp_path):
"""clear() on an empty store does not raise."""
log = MessageLog(tmp_path / "chat.db")
log.clear() # should not raise
assert log.all() == []
log.close()
def test_clear_allows_new_appends(self, tmp_path):
"""After clear(), new messages can be appended."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "old", "2024-01-01T00:00:00")
log.clear()
log.append("user", "new", "2024-01-01T00:00:01")
messages = log.all()
assert len(messages) == 1
assert messages[0].content == "new"
log.close()
def test_clear_resets_len_to_zero(self, tmp_path):
"""After clear(), __len__ returns 0."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "a", "t")
log.append("user", "b", "t")
log.clear()
assert len(log) == 0
log.close()
# ---------------------------------------------------------------------------
# MessageLog — __len__
# ---------------------------------------------------------------------------
class TestMessageLogLen:
"""Tests for MessageLog.__len__()."""
def test_len_empty_store(self, tmp_path):
"""__len__ returns 0 for an empty store."""
log = MessageLog(tmp_path / "chat.db")
assert len(log) == 0
log.close()
def test_len_after_appends(self, tmp_path):
"""__len__ reflects the number of stored messages."""
log = MessageLog(tmp_path / "chat.db")
for i in range(7):
log.append("user", f"msg{i}", "t")
assert len(log) == 7
log.close()
def test_len_after_clear(self, tmp_path):
"""__len__ is 0 after clear()."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "x", "t")
log.clear()
assert len(log) == 0
log.close()
# ---------------------------------------------------------------------------
# MessageLog — pruning
# ---------------------------------------------------------------------------
class TestMessageLogPrune:
"""Tests for automatic pruning via _prune()."""
def test_prune_keeps_at_most_max_messages(self, tmp_path):
"""After exceeding MAX_MESSAGES, oldest messages are pruned."""
log = MessageLog(tmp_path / "chat.db")
# Temporarily lower the limit via monkeypatching is not straightforward
# because _prune reads the module-level MAX_MESSAGES constant.
# We therefore patch it directly.
import infrastructure.chat_store as cs
original = cs.MAX_MESSAGES
cs.MAX_MESSAGES = 5
try:
for i in range(8):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 5
finally:
cs.MAX_MESSAGES = original
log.close()
def test_prune_keeps_newest_messages(self, tmp_path):
"""Pruning removes oldest messages and keeps the newest ones."""
import infrastructure.chat_store as cs
log = MessageLog(tmp_path / "chat.db")
original = cs.MAX_MESSAGES
cs.MAX_MESSAGES = 3
try:
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
messages = log.all()
contents = [m.content for m in messages]
assert contents == ["msg2", "msg3", "msg4"]
finally:
cs.MAX_MESSAGES = original
log.close()
def test_no_prune_when_below_limit(self, tmp_path):
"""No messages are pruned while count is at or below MAX_MESSAGES."""
log = MessageLog(tmp_path / "chat.db")
import infrastructure.chat_store as cs
original = cs.MAX_MESSAGES
cs.MAX_MESSAGES = 10
try:
for i in range(10):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 10
finally:
cs.MAX_MESSAGES = original
log.close()
# ---------------------------------------------------------------------------
# MessageLog — close / lifecycle
# ---------------------------------------------------------------------------
class TestMessageLogClose:
"""Tests for MessageLog.close()."""
def test_close_is_safe_before_first_use(self, tmp_path):
"""close() on a fresh (never-used) instance does not raise."""
log = MessageLog(tmp_path / "chat.db")
log.close() # should not raise
def test_close_multiple_times_is_safe(self, tmp_path):
"""close() can be called multiple times without error."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "hi", "t")
log.close()
log.close() # second close should not raise
def test_close_sets_conn_to_none(self, tmp_path):
"""close() sets the internal _conn attribute to None."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "hi", "t")
assert log._conn is not None
log.close()
assert log._conn is None
# ---------------------------------------------------------------------------
# Thread safety
# ---------------------------------------------------------------------------
class TestMessageLogThreadSafety:
"""Thread-safety tests for MessageLog."""
def test_concurrent_appends(self, tmp_path):
"""Multiple threads can append messages without data loss or errors."""
log = MessageLog(tmp_path / "chat.db")
errors: list[Exception] = []
def worker(n: int) -> None:
try:
for i in range(5):
log.append("user", f"t{n}-{i}", f"ts-{n}-{i}")
except Exception as exc: # noqa: BLE001
errors.append(exc)
threads = [threading.Thread(target=worker, args=(n,)) for n in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Concurrent append raised: {errors}"
# All 20 messages should be present (4 threads × 5 messages)
assert len(log) == 20
log.close()
def test_concurrent_reads_and_writes(self, tmp_path):
"""Concurrent reads and writes do not corrupt state."""
log = MessageLog(tmp_path / "chat.db")
errors: list[Exception] = []
def writer() -> None:
try:
for i in range(10):
log.append("user", f"msg{i}", f"t{i}")
except Exception as exc: # noqa: BLE001
errors.append(exc)
def reader() -> None:
try:
for _ in range(10):
log.all()
except Exception as exc: # noqa: BLE001
errors.append(exc)
threads = [threading.Thread(target=writer)] + [
threading.Thread(target=reader) for _ in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Concurrent read/write raised: {errors}"
log.close()
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
class TestMessageLogEdgeCases:
"""Edge-case tests for MessageLog."""
def test_empty_content_stored_and_retrieved(self, tmp_path):
"""Empty string content can be stored and retrieved."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "", "2024-01-01T00:00:00")
assert log.all()[0].content == ""
log.close()
def test_unicode_content_stored_and_retrieved(self, tmp_path):
"""Unicode characters in content are stored and retrieved correctly."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "こんにちは 🌍", "2024-01-01T00:00:00")
assert log.all()[0].content == "こんにちは 🌍"
log.close()
def test_newline_in_content(self, tmp_path):
"""Newlines in content are preserved."""
log = MessageLog(tmp_path / "chat.db")
multiline = "line1\nline2\nline3"
log.append("agent", multiline, "2024-01-01T00:00:00")
assert log.all()[0].content == multiline
log.close()
def test_default_db_path_attribute(self):
"""MessageLog without explicit path uses the module-level DB_PATH."""
from infrastructure.chat_store import DB_PATH
log = MessageLog()
assert log._db_path == DB_PATH
# Do NOT call close() here — this is the global singleton's path
def test_custom_db_path_used(self, tmp_path):
"""MessageLog uses the provided db_path."""
db = tmp_path / "custom.db"
log = MessageLog(db)
log.append("user", "test", "t")
assert db.exists()
log.close()
def test_recent_limit_zero_returns_empty(self, tmp_path):
"""recent(limit=0) returns an empty list."""
log = MessageLog(tmp_path / "chat.db")
log.append("user", "msg", "t")
assert log.recent(limit=0) == []
log.close()
def test_all_roles_stored_correctly(self, tmp_path):
"""Different role values are stored and retrieved correctly."""
log = MessageLog(tmp_path / "chat.db")
for role in ("user", "agent", "error", "system"):
log.append(role, f"{role} message", "t")
messages = log.all()
assert [m.role for m in messages] == ["user", "agent", "error", "system"]
log.close()

View File

@@ -1,21 +1,10 @@
"""Tests for the async event bus (infrastructure.events.bus)."""
import sqlite3
from pathlib import Path
from unittest.mock import patch
import pytest
import infrastructure.events.bus as bus_module
from infrastructure.events.bus import (
Event,
EventBus,
emit,
event_bus,
get_event_bus,
init_event_bus_persistence,
on,
)
from infrastructure.events.bus import Event, EventBus, emit, event_bus, on
class TestEvent:
@@ -360,111 +349,3 @@ class TestEventBusPersistence:
assert mode == "wal"
finally:
conn.close()
async def test_persist_event_exception_is_swallowed(self, tmp_path):
"""_persist_event must not propagate SQLite errors."""
from unittest.mock import MagicMock
bus = EventBus()
bus.enable_persistence(tmp_path / "events.db")
# Make the INSERT raise an OperationalError
mock_conn = MagicMock()
mock_conn.execute.side_effect = sqlite3.OperationalError("simulated failure")
from contextlib import contextmanager
@contextmanager
def fake_ctx():
yield mock_conn
with patch.object(bus, "_get_persistence_conn", fake_ctx):
# Should not raise
bus._persist_event(Event(type="x", source="s"))
async def test_replay_exception_returns_empty(self, tmp_path):
"""replay() must return [] when SQLite query fails."""
from unittest.mock import MagicMock
bus = EventBus()
bus.enable_persistence(tmp_path / "events.db")
mock_conn = MagicMock()
mock_conn.execute.side_effect = sqlite3.OperationalError("simulated failure")
from contextlib import contextmanager
@contextmanager
def fake_ctx():
yield mock_conn
with patch.object(bus, "_get_persistence_conn", fake_ctx):
result = bus.replay()
assert result == []
# ── Singleton helpers ─────────────────────────────────────────────────────────
class TestSingletonHelpers:
"""Test get_event_bus(), init_event_bus_persistence(), and module __getattr__."""
def test_get_event_bus_returns_same_instance(self):
"""get_event_bus() is a true singleton."""
a = get_event_bus()
b = get_event_bus()
assert a is b
def test_module_event_bus_attr_is_singleton(self):
"""Accessing bus_module.event_bus via __getattr__ returns the singleton."""
assert bus_module.event_bus is get_event_bus()
def test_module_getattr_unknown_raises(self):
"""Accessing an unknown module attribute raises AttributeError."""
with pytest.raises(AttributeError):
_ = bus_module.no_such_attr # type: ignore[attr-defined]
def test_init_event_bus_persistence_sets_path(self, tmp_path):
"""init_event_bus_persistence() enables persistence on the singleton."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None # reset for the test
db_path = tmp_path / "test_init.db"
init_event_bus_persistence(db_path)
assert bus._persistence_db_path == db_path
finally:
bus._persistence_db_path = original_path
def test_init_event_bus_persistence_is_idempotent(self, tmp_path):
"""Calling init_event_bus_persistence() twice keeps the first path."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None
first_path = tmp_path / "first.db"
second_path = tmp_path / "second.db"
init_event_bus_persistence(first_path)
init_event_bus_persistence(second_path) # should be ignored
assert bus._persistence_db_path == first_path
finally:
bus._persistence_db_path = original_path
def test_init_event_bus_persistence_default_path(self):
"""init_event_bus_persistence() uses 'data/events.db' when no path given."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None
# Patch enable_persistence to capture what path it receives
captured = {}
def fake_enable(path: Path) -> None:
captured["path"] = path
with patch.object(bus, "enable_persistence", side_effect=fake_enable):
init_event_bus_persistence()
assert captured["path"] == Path("data/events.db")
finally:
bus._persistence_db_path = original_path

View File

@@ -1376,141 +1376,3 @@ class TestIsProviderAvailable:
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
@pytest.mark.unit
class TestFilterProviders:
"""Test _filter_providers helper extracted from complete()."""
def _router(self) -> CascadeRouter:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(
name="anthropic-p",
type="anthropic",
enabled=True,
priority=1,
api_key="key",
tier="frontier",
),
Provider(
name="ollama-p",
type="ollama",
enabled=True,
priority=2,
tier="local",
),
]
return router
def test_no_tier_returns_all_providers(self):
router = self._router()
result = router._filter_providers(None)
assert result is router.providers
def test_frontier_required_returns_only_anthropic(self):
router = self._router()
result = router._filter_providers("frontier_required")
assert len(result) == 1
assert result[0].type == "anthropic"
def test_frontier_required_no_anthropic_raises(self):
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(name="ollama-p", type="ollama", enabled=True, priority=1)
]
with pytest.raises(RuntimeError, match="No Anthropic provider configured"):
router._filter_providers("frontier_required")
def test_named_tier_filters_by_tier(self):
router = self._router()
result = router._filter_providers("local")
assert len(result) == 1
assert result[0].name == "ollama-p"
def test_named_tier_not_found_raises(self):
router = self._router()
with pytest.raises(RuntimeError, match="No providers found for tier"):
router._filter_providers("nonexistent")
@pytest.mark.unit
@pytest.mark.asyncio
class TestTrySingleProvider:
"""Test _try_single_provider helper extracted from complete()."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def _provider(self, name: str = "test", ptype: str = "ollama") -> Provider:
return Provider(
name=name,
type=ptype,
enabled=True,
priority=1,
models=[{"name": "llama3.2", "default": True}],
)
async def test_unavailable_provider_returns_none(self):
router = self._router()
provider = self._provider()
provider.enabled = False
errors: list[str] = []
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_quota_blocked_cloud_provider_returns_none(self):
router = self._router()
provider = self._provider(ptype="anthropic")
errors: list[str] = []
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
mock_qm.check.return_value = None
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_success_returns_result_dict(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "hi", "model": "llama3.2"}
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is not None
assert result["content"] == "hi"
assert result["provider"] == "test"
assert errors == []
async def test_failure_appends_error_and_returns_none(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.side_effect = RuntimeError("boom")
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is None
assert len(errors) == 1
assert "boom" in errors[0]
assert provider.metrics.failed_requests == 1

View File

@@ -18,10 +18,6 @@ def _make_settings(**env_overrides):
"""Create a fresh Settings instance with isolated env vars."""
from config import Settings
# Prevent Pydantic from reading .env file (local .env pollutes defaults)
_orig_config = Settings.model_config.copy()
Settings.model_config["env_file"] = None
# Strip keys that might bleed in from the test environment
clean_env = {
k: v
@@ -86,10 +82,7 @@ def _make_settings(**env_overrides):
}
clean_env.update(env_overrides)
with patch.dict(os.environ, clean_env, clear=True):
try:
return Settings()
finally:
Settings.model_config.update(_orig_config)
return Settings()
# ── normalize_ollama_url ──────────────────────────────────────────────────────
@@ -699,12 +692,12 @@ class TestGetEffectiveOllamaModel:
"""get_effective_ollama_model walks fallback chain."""
def test_returns_primary_when_available(self):
from config import get_effective_ollama_model, settings
from config import get_effective_ollama_model
with patch("config.check_ollama_model_available", return_value=True):
result = get_effective_ollama_model()
# Should return whatever the user's configured model is
assert result == settings.ollama_model
# Default is qwen3:14b
assert result == "qwen3:14b"
def test_falls_back_when_primary_unavailable(self):
from config import get_effective_ollama_model, settings

View File

@@ -6,12 +6,7 @@ import pytest
from infrastructure.presence import (
DEFAULT_PIP_STATE,
_get_agents_online,
_get_familiar_state,
_get_memory_count,
_get_thinking_active,
_get_uptime_seconds,
_get_visitors,
produce_agent_state,
produce_bark,
produce_system_status,
@@ -505,36 +500,3 @@ class TestProduceSystemStatus:
"""produce_system_status always returns a plain dict."""
result = produce_system_status()
assert isinstance(result, dict)
class TestSystemStatusHelpers:
"""Tests for the helper functions extracted from produce_system_status()."""
def test_get_agents_online_returns_int(self):
"""_get_agents_online returns a non-negative int."""
result = _get_agents_online()
assert isinstance(result, int)
assert result >= 0
def test_get_visitors_returns_int(self):
"""_get_visitors returns a non-negative int."""
result = _get_visitors()
assert isinstance(result, int)
assert result >= 0
def test_get_uptime_seconds_returns_int(self):
"""_get_uptime_seconds returns a non-negative int."""
result = _get_uptime_seconds()
assert isinstance(result, int)
assert result >= 0
def test_get_thinking_active_returns_bool(self):
"""_get_thinking_active returns a bool."""
result = _get_thinking_active()
assert isinstance(result, bool)
def test_get_memory_count_returns_int(self):
"""_get_memory_count returns a non-negative int."""
result = _get_memory_count()
assert isinstance(result, int)
assert result >= 0

View File

@@ -2,15 +2,10 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
pytestmark = pytest.mark.unit
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
@@ -40,25 +35,6 @@ def test_agent_status_stuck():
assert s.needs_reassignment is True
def test_agent_status_checked_at_is_iso_string():
s = AgentStatus(agent="claude")
# Should be parseable as an ISO datetime
dt = datetime.fromisoformat(s.checked_at)
assert dt.tzinfo is not None
def test_agent_status_multiple_stuck_issues():
s = AgentStatus(agent="kimi", stuck_issue_numbers=[1, 2, 3])
assert s.is_stuck is True
assert s.needs_reassignment is True
def test_agent_status_active_but_not_stuck():
s = AgentStatus(agent="claude", active_issue_numbers=[5], is_idle=False)
assert s.is_stuck is False
assert s.needs_reassignment is False
# ---------------------------------------------------------------------------
# AgentHealthReport
# ---------------------------------------------------------------------------
@@ -71,24 +47,11 @@ def test_report_any_stuck():
assert report.any_stuck is True
def test_report_not_any_stuck():
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.any_stuck is False
def test_report_all_idle():
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")])
assert report.all_idle is True
def test_report_not_all_idle():
claude = AgentStatus(agent="claude", active_issue_numbers=[1], is_idle=False)
report = AgentHealthReport(agents=[claude, AgentStatus(agent="kimi")])
assert report.all_idle is False
def test_report_for_agent_found():
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
@@ -101,233 +64,6 @@ def test_report_for_agent_not_found():
assert report.for_agent("timmy") is None
def test_report_generated_at_is_iso_string():
report = AgentHealthReport()
dt = datetime.fromisoformat(report.generated_at)
assert dt.tzinfo is not None
def test_report_empty_agents():
report = AgentHealthReport(agents=[])
assert report.any_stuck is False
assert report.all_idle is True
# ---------------------------------------------------------------------------
# _issue_created_time
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_issue_created_time_valid():
from timmy.vassal.agent_health import _issue_created_time
issue = {"created_at": "2024-01-15T10:30:00Z"}
result = await _issue_created_time(issue)
assert result is not None
assert result.year == 2024
assert result.month == 1
assert result.day == 15
@pytest.mark.asyncio
async def test_issue_created_time_missing_key():
from timmy.vassal.agent_health import _issue_created_time
result = await _issue_created_time({})
assert result is None
@pytest.mark.asyncio
async def test_issue_created_time_invalid_format():
from timmy.vassal.agent_health import _issue_created_time
result = await _issue_created_time({"created_at": "not-a-date"})
assert result is None
@pytest.mark.asyncio
async def test_issue_created_time_with_timezone():
from timmy.vassal.agent_health import _issue_created_time
issue = {"created_at": "2024-06-01T12:00:00+00:00"}
result = await _issue_created_time(issue)
assert result is not None
assert result.tzinfo is not None
# ---------------------------------------------------------------------------
# _fetch_labeled_issues — mocked HTTP client
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fetch_labeled_issues_success():
from timmy.vassal.agent_health import _fetch_labeled_issues
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = [
{"number": 1, "title": "Fix bug"},
{"number": 2, "title": "Add feature", "pull_request": {"url": "..."}},
]
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _fetch_labeled_issues(
mock_client, "http://gitea/api/v1", {}, "owner/repo", "claude-ready"
)
# Only non-PR issues returned
assert len(result) == 1
assert result[0]["number"] == 1
@pytest.mark.asyncio
async def test_fetch_labeled_issues_http_error():
from timmy.vassal.agent_health import _fetch_labeled_issues
mock_resp = MagicMock()
mock_resp.status_code = 401
mock_resp.json.return_value = []
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _fetch_labeled_issues(
mock_client, "http://gitea/api/v1", {}, "owner/repo", "claude-ready"
)
assert result == []
@pytest.mark.asyncio
async def test_fetch_labeled_issues_exception():
from timmy.vassal.agent_health import _fetch_labeled_issues
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=ConnectionError("network down"))
result = await _fetch_labeled_issues(
mock_client, "http://gitea/api/v1", {}, "owner/repo", "claude-ready"
)
assert result == []
@pytest.mark.asyncio
async def test_fetch_labeled_issues_filters_pull_requests():
from timmy.vassal.agent_health import _fetch_labeled_issues
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = [
{"number": 10, "title": "Issue"},
{"number": 11, "title": "PR", "pull_request": {"url": "http://gitea/pulls/11"}},
{"number": 12, "title": "Another Issue"},
]
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _fetch_labeled_issues(
mock_client, "http://gitea/api/v1", {}, "owner/repo", "claude-ready"
)
# Issues with truthy pull_request field are excluded
assert len(result) == 2
assert all(i["number"] in (10, 12) for i in result)
# ---------------------------------------------------------------------------
# _last_comment_time — mocked HTTP client
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_last_comment_time_with_comments():
from timmy.vassal.agent_health import _last_comment_time
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = [
{"updated_at": "2024-03-10T14:00:00Z", "created_at": "2024-03-10T13:00:00Z"}
]
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _last_comment_time(
mock_client, "http://gitea/api/v1", {}, "owner/repo", 42
)
assert result is not None
assert result.year == 2024
assert result.month == 3
@pytest.mark.asyncio
async def test_last_comment_time_uses_created_at_fallback():
from timmy.vassal.agent_health import _last_comment_time
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = [
{"created_at": "2024-03-10T13:00:00Z"} # no updated_at
]
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _last_comment_time(
mock_client, "http://gitea/api/v1", {}, "owner/repo", 42
)
assert result is not None
@pytest.mark.asyncio
async def test_last_comment_time_no_comments():
from timmy.vassal.agent_health import _last_comment_time
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = []
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _last_comment_time(
mock_client, "http://gitea/api/v1", {}, "owner/repo", 99
)
assert result is None
@pytest.mark.asyncio
async def test_last_comment_time_http_error():
from timmy.vassal.agent_health import _last_comment_time
mock_resp = MagicMock()
mock_resp.status_code = 404
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
result = await _last_comment_time(
mock_client, "http://gitea/api/v1", {}, "owner/repo", 99
)
assert result is None
@pytest.mark.asyncio
async def test_last_comment_time_exception():
from timmy.vassal.agent_health import _last_comment_time
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=TimeoutError("timed out"))
result = await _last_comment_time(
mock_client, "http://gitea/api/v1", {}, "owner/repo", 7
)
assert result is None
# ---------------------------------------------------------------------------
# check_agent_health — no Gitea in unit tests
# ---------------------------------------------------------------------------
@@ -354,140 +90,6 @@ async def test_check_agent_health_no_token():
assert status.agent == "claude"
@pytest.mark.asyncio
async def test_check_agent_health_detects_stuck_issue(monkeypatch):
"""Issues with last activity before the cutoff are flagged as stuck."""
import timmy.vassal.agent_health as ah
old_time = (datetime.now(UTC) - timedelta(minutes=200)).isoformat()
async def _fake_fetch(client, base_url, headers, repo, label):
return [{"number": 55, "created_at": old_time}]
async def _fake_last_comment(client, base_url, headers, repo, issue_number):
return datetime.now(UTC) - timedelta(minutes=200)
monkeypatch.setattr(ah, "_fetch_labeled_issues", _fake_fetch)
monkeypatch.setattr(ah, "_last_comment_time", _fake_last_comment)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
import httpx
with patch("config.settings", mock_settings):
status = await ah.check_agent_health("claude", stuck_threshold_minutes=120)
assert 55 in status.active_issue_numbers
assert 55 in status.stuck_issue_numbers
assert status.is_stuck is True
@pytest.mark.asyncio
async def test_check_agent_health_active_not_stuck(monkeypatch):
"""Recent activity means issue is active but not stuck."""
import timmy.vassal.agent_health as ah
recent_time = (datetime.now(UTC) - timedelta(minutes=5)).isoformat()
async def _fake_fetch(client, base_url, headers, repo, label):
return [{"number": 77, "created_at": recent_time}]
async def _fake_last_comment(client, base_url, headers, repo, issue_number):
return datetime.now(UTC) - timedelta(minutes=5)
monkeypatch.setattr(ah, "_fetch_labeled_issues", _fake_fetch)
monkeypatch.setattr(ah, "_last_comment_time", _fake_last_comment)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with patch("config.settings", mock_settings):
status = await ah.check_agent_health("claude", stuck_threshold_minutes=120)
assert 77 in status.active_issue_numbers
assert 77 not in status.stuck_issue_numbers
assert status.is_idle is False
@pytest.mark.asyncio
async def test_check_agent_health_uses_issue_created_when_no_comments(monkeypatch):
"""Falls back to issue created_at when no comment time is available."""
import timmy.vassal.agent_health as ah
old_time = (datetime.now(UTC) - timedelta(minutes=300)).isoformat()
async def _fake_fetch(client, base_url, headers, repo, label):
return [{"number": 99, "created_at": old_time}]
async def _fake_last_comment(client, base_url, headers, repo, issue_number):
return None # No comments
monkeypatch.setattr(ah, "_fetch_labeled_issues", _fake_fetch)
monkeypatch.setattr(ah, "_last_comment_time", _fake_last_comment)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with patch("config.settings", mock_settings):
status = await ah.check_agent_health("kimi", stuck_threshold_minutes=120)
assert 99 in status.stuck_issue_numbers
@pytest.mark.asyncio
async def test_check_agent_health_gitea_disabled(monkeypatch):
"""When gitea_enabled=False, returns idle status without querying."""
import timmy.vassal.agent_health as ah
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "fake-token"
with patch("config.settings", mock_settings):
status = await ah.check_agent_health("claude")
assert status.is_idle is True
assert status.active_issue_numbers == []
@pytest.mark.asyncio
async def test_check_agent_health_fetch_exception(monkeypatch):
"""HTTP exception during check is handled gracefully."""
import timmy.vassal.agent_health as ah
async def _bad_fetch(client, base_url, headers, repo, label):
raise RuntimeError("connection refused")
monkeypatch.setattr(ah, "_fetch_labeled_issues", _bad_fetch)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with patch("config.settings", mock_settings):
status = await ah.check_agent_health("claude")
assert isinstance(status, AgentStatus)
assert status.is_idle is True
# ---------------------------------------------------------------------------
# get_full_health_report
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_full_health_report_returns_both_agents():
from timmy.vassal.agent_health import get_full_health_report
@@ -496,127 +98,3 @@ async def test_get_full_health_report_returns_both_agents():
agent_names = {a.agent for a in report.agents}
assert "claude" in agent_names
assert "kimi" in agent_names
@pytest.mark.asyncio
async def test_get_full_health_report_structure():
from timmy.vassal.agent_health import get_full_health_report
report = await get_full_health_report()
assert isinstance(report, AgentHealthReport)
assert len(report.agents) == 2
# ---------------------------------------------------------------------------
# nudge_stuck_agent
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_nudge_stuck_agent_no_token():
"""Returns False gracefully when Gitea is not configured."""
from timmy.vassal.agent_health import nudge_stuck_agent
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
with patch("config.settings", mock_settings):
result = await nudge_stuck_agent("claude", 123)
assert result is False
@pytest.mark.asyncio
async def test_nudge_stuck_agent_success(monkeypatch):
"""Returns True when comment is posted successfully."""
import timmy.vassal.agent_health as ah
mock_resp = MagicMock()
mock_resp.status_code = 201
mock_client_instance = AsyncMock()
mock_client_instance.post = AsyncMock(return_value=mock_resp)
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_client_instance),
):
result = await ah.nudge_stuck_agent("claude", 55)
assert result is True
@pytest.mark.asyncio
async def test_nudge_stuck_agent_http_failure(monkeypatch):
"""Returns False when API returns non-2xx status."""
import timmy.vassal.agent_health as ah
mock_resp = MagicMock()
mock_resp.status_code = 500
mock_client_instance = AsyncMock()
mock_client_instance.post = AsyncMock(return_value=mock_resp)
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_client_instance),
):
result = await ah.nudge_stuck_agent("kimi", 77)
assert result is False
@pytest.mark.asyncio
async def test_nudge_stuck_agent_gitea_disabled(monkeypatch):
"""Returns False when gitea_enabled=False."""
import timmy.vassal.agent_health as ah
mock_settings = MagicMock()
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "fake-token"
with patch("config.settings", mock_settings):
result = await ah.nudge_stuck_agent("claude", 42)
assert result is False
@pytest.mark.asyncio
async def test_nudge_stuck_agent_exception(monkeypatch):
"""Returns False on network exception."""
import timmy.vassal.agent_health as ah
mock_client_instance = AsyncMock()
mock_client_instance.post = AsyncMock(side_effect=ConnectionError("refused"))
mock_client_instance.__aenter__ = AsyncMock(return_value=mock_client_instance)
mock_client_instance.__aexit__ = AsyncMock(return_value=False)
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://gitea"
mock_settings.gitea_repo = "owner/repo"
with (
patch("config.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_client_instance),
):
result = await ah.nudge_stuck_agent("claude", 10)
assert result is False

View File

@@ -2,17 +2,11 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.vassal.backlog import AgentTarget, TriagedIssue
from timmy.vassal.dispatch import (
DispatchRecord,
_apply_label_to_issue,
_get_or_create_label,
_post_dispatch_comment,
clear_dispatch_registry,
get_dispatch_registry,
)
@@ -118,244 +112,3 @@ def test_dispatch_record_defaults():
assert r.label_applied is False
assert r.comment_posted is False
assert r.dispatched_at # has a timestamp
# ---------------------------------------------------------------------------
# _get_or_create_label
# ---------------------------------------------------------------------------
_HEADERS = {"Authorization": "token x"}
_BASE_URL = "http://gitea"
_REPO = "org/repo"
def _mock_response(status_code: int, json_data=None):
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_data or {}
return resp
@pytest.mark.asyncio
async def test_get_or_create_label_finds_existing():
"""Returns the ID of an existing label without creating it."""
existing = [{"name": "claude-ready", "id": 42}, {"name": "other", "id": 7}]
client = AsyncMock()
client.get.return_value = _mock_response(200, existing)
result = await _get_or_create_label(client, _BASE_URL, _HEADERS, _REPO, "claude-ready")
assert result == 42
client.post.assert_not_called()
@pytest.mark.asyncio
async def test_get_or_create_label_creates_when_missing():
"""Creates the label when it doesn't exist in the list."""
client = AsyncMock()
# GET returns empty list
client.get.return_value = _mock_response(200, [])
# POST creates label
client.post.return_value = _mock_response(201, {"id": 99})
result = await _get_or_create_label(client, _BASE_URL, _HEADERS, _REPO, "claude-ready")
assert result == 99
client.post.assert_called_once()
@pytest.mark.asyncio
async def test_get_or_create_label_returns_none_on_get_error():
"""Returns None if the GET raises an exception."""
client = AsyncMock()
client.get.side_effect = Exception("network error")
result = await _get_or_create_label(client, _BASE_URL, _HEADERS, _REPO, "claude-ready")
assert result is None
@pytest.mark.asyncio
async def test_get_or_create_label_returns_none_on_create_error():
"""Returns None if POST raises an exception."""
client = AsyncMock()
client.get.return_value = _mock_response(200, [])
client.post.side_effect = Exception("post failed")
result = await _get_or_create_label(client, _BASE_URL, _HEADERS, _REPO, "claude-ready")
assert result is None
@pytest.mark.asyncio
async def test_get_or_create_label_uses_default_color_for_unknown():
"""Unknown label name uses '#cccccc' fallback color."""
client = AsyncMock()
client.get.return_value = _mock_response(200, [])
client.post.return_value = _mock_response(201, {"id": 5})
await _get_or_create_label(client, _BASE_URL, _HEADERS, _REPO, "unknown-label")
call_kwargs = client.post.call_args
assert call_kwargs.kwargs["json"]["color"] == "#cccccc"
# ---------------------------------------------------------------------------
# _apply_label_to_issue
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_apply_label_to_issue_success():
"""Returns True when label is found and applied."""
client = AsyncMock()
client.get.return_value = _mock_response(200, [{"name": "claude-ready", "id": 10}])
client.post.return_value = _mock_response(201)
result = await _apply_label_to_issue(client, _BASE_URL, _HEADERS, _REPO, 42, "claude-ready")
assert result is True
@pytest.mark.asyncio
async def test_apply_label_to_issue_returns_false_when_no_label_id():
"""Returns False when label ID cannot be obtained."""
client = AsyncMock()
client.get.side_effect = Exception("unavailable")
result = await _apply_label_to_issue(client, _BASE_URL, _HEADERS, _REPO, 42, "claude-ready")
assert result is False
@pytest.mark.asyncio
async def test_apply_label_to_issue_returns_false_on_bad_status():
"""Returns False when the apply POST returns a non-2xx status."""
client = AsyncMock()
client.get.return_value = _mock_response(200, [{"name": "claude-ready", "id": 10}])
client.post.return_value = _mock_response(403)
result = await _apply_label_to_issue(client, _BASE_URL, _HEADERS, _REPO, 42, "claude-ready")
assert result is False
# ---------------------------------------------------------------------------
# _post_dispatch_comment
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_post_dispatch_comment_success():
"""Returns True on successful comment post."""
client = AsyncMock()
client.post.return_value = _mock_response(201)
issue = _make_triaged(7, "Some issue", AgentTarget.CLAUDE, priority=75)
result = await _post_dispatch_comment(client, _BASE_URL, _HEADERS, _REPO, issue, "claude-ready")
assert result is True
body = client.post.call_args.kwargs["json"]["body"]
assert "Claude" in body
assert "claude-ready" in body
assert "75" in body
@pytest.mark.asyncio
async def test_post_dispatch_comment_failure():
"""Returns False when comment POST returns a non-2xx status."""
client = AsyncMock()
client.post.return_value = _mock_response(500)
issue = _make_triaged(8, "Other issue", AgentTarget.KIMI)
result = await _post_dispatch_comment(client, _BASE_URL, _HEADERS, _REPO, issue, "kimi-ready")
assert result is False
# ---------------------------------------------------------------------------
# _perform_gitea_dispatch — settings-level gate
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_perform_gitea_dispatch_skips_when_disabled():
"""Does not call Gitea when gitea_enabled is False."""
import config
from timmy.vassal.dispatch import _perform_gitea_dispatch
mock_settings = SimpleNamespace(gitea_enabled=False, gitea_token="tok")
with patch.object(config, "settings", mock_settings):
issue = _make_triaged(9, "Disabled", AgentTarget.CLAUDE)
record = DispatchRecord(
issue_number=9,
issue_title="Disabled",
agent=AgentTarget.CLAUDE,
rationale="r",
)
await _perform_gitea_dispatch(issue, record)
assert record.label_applied is False
assert record.comment_posted is False
@pytest.mark.asyncio
async def test_perform_gitea_dispatch_skips_when_no_token():
"""Does not call Gitea when gitea_token is empty."""
import config
from timmy.vassal.dispatch import _perform_gitea_dispatch
mock_settings = SimpleNamespace(gitea_enabled=True, gitea_token="")
with patch.object(config, "settings", mock_settings):
issue = _make_triaged(10, "No token", AgentTarget.CLAUDE)
record = DispatchRecord(
issue_number=10,
issue_title="No token",
agent=AgentTarget.CLAUDE,
rationale="r",
)
await _perform_gitea_dispatch(issue, record)
assert record.label_applied is False
@pytest.mark.asyncio
async def test_perform_gitea_dispatch_updates_record():
"""Record is mutated to reflect label/comment success."""
import config
from timmy.vassal.dispatch import _perform_gitea_dispatch
mock_settings = SimpleNamespace(
gitea_enabled=True,
gitea_token="tok",
gitea_url="http://gitea",
gitea_repo="org/repo",
)
mock_client = AsyncMock()
# GET labels → empty list, POST create label → id 1
mock_client.get.return_value = _mock_response(200, [])
mock_client.post.side_effect = [
_mock_response(201, {"id": 1}), # create label
_mock_response(201), # apply label
_mock_response(201), # post comment
]
with (
patch.object(config, "settings", mock_settings),
patch("httpx.AsyncClient") as mock_cls,
):
mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
issue = _make_triaged(11, "Full dispatch", AgentTarget.CLAUDE)
record = DispatchRecord(
issue_number=11,
issue_title="Full dispatch",
agent=AgentTarget.CLAUDE,
rationale="r",
)
await _perform_gitea_dispatch(issue, record)
assert record.label_applied is True
assert record.comment_posted is True