1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
617ef43f99 test: add unit tests for daily_run.py — 51 tests covering main handlers
Adds tests/dashboard/test_daily_run.py with 51 test cases covering:
- _load_config(): defaults, file loading, env var overrides, invalid JSON
- _get_token(): from config dict, from file, missing file
- GiteaClient: headers, api_url, is_available (true/false/cached), get_paginated
- LayerMetrics: trend and trend_color properties (all directions)
- DailyRunMetrics: sessions_trend and sessions_trend_color properties
- _extract_layer(): label extraction from issue label lists
- _load_cycle_data(): success counting, invalid JSON lines, missing timestamps
- _fetch_layer_metrics(): counting logic, graceful degradation on errors
- _get_metrics(): unavailable client, happy path, exception handling
- Route handlers: /daily-run/metrics (JSON) and /daily-run/panel (HTML)

All 51 tests pass. tox -e unit remains green (293 passing).

Fixes #1186

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 17:57:26 -04:00
6 changed files with 706 additions and 522 deletions

View File

@@ -528,71 +528,6 @@ class CascadeRouter:
return True
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
"""Return the provider list filtered by tier.
Raises:
RuntimeError: If a tier is specified but no matching providers exist.
"""
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
return providers
if cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
return providers
return self.providers
async def _try_single_provider(
self,
provider: "Provider",
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
errors: list[str],
) -> dict | None:
"""Attempt one provider, returning a result dict on success or None on failure.
On failure the error string is appended to *errors* and the provider's
failure metrics are updated so the caller can move on to the next provider.
"""
if not self._is_provider_available(provider):
return None
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
return None
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider, messages, selected_model, temperature, max_tokens, content_type
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
return None
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
async def complete(
self,
messages: list[dict],
@@ -626,15 +561,55 @@ class CascadeRouter:
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
errors: list[str] = []
providers = self._filter_providers(cascade_tier)
errors = []
providers = self.providers
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
elif cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
result = await self._try_single_provider(
provider, messages, model, temperature, max_tokens, content_type, errors
)
if result is not None:
return result
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -110,92 +110,6 @@ async def _get_or_create_label(
return None
# ---------------------------------------------------------------------------
# Dispatch action helpers
# ---------------------------------------------------------------------------
async def _apply_label_to_issue(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
label_name: str,
) -> bool:
"""Get-or-create the label then apply it to the issue. Returns True on success."""
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
if label_id is None:
return False
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
async def _post_dispatch_comment(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue: TriagedIssue,
label_name: str,
) -> bool:
"""Post the vassal routing comment. Returns True on success."""
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
return resp.status_code in (200, 201)
async def _perform_gitea_dispatch(
issue: TriagedIssue,
record: DispatchRecord,
) -> None:
"""Apply label and post comment via Gitea. Mutates *record* in-place."""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
return
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
record.label_applied = await _apply_label_to_issue(
client, base_url, headers, repo, issue.number, label_name
)
record.comment_posted = await _post_dispatch_comment(
client, base_url, headers, repo, issue, label_name
)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
@@ -230,7 +144,58 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
_registry[issue.number] = record
return record
await _perform_gitea_dispatch(issue, record)
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(

View File

@@ -95,106 +95,6 @@ def _get_config_dir() -> Path:
return DEFAULT_CONFIG_DIR
def _load_daily_run_config() -> dict[str, Any]:
"""Load and validate the daily run configuration."""
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
return config
def _display_schedules_table(schedules: dict[str, Any]) -> None:
"""Display the daily run schedules in a table."""
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
def _display_triggers_table(triggers: dict[str, Any]) -> None:
"""Display the triggers in a table."""
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
def _execute_automation(auto: dict[str, Any], verbose: bool) -> None:
"""Execute a single automation and display results."""
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
return
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
def _execute_all_automations(verbose: bool) -> None:
"""Execute all enabled automations."""
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
return
for auto in enabled:
_execute_automation(auto, verbose)
@app.command()
def daily_run(
dry_run: bool = typer.Option(
@@ -213,22 +113,93 @@ def daily_run(
console.print("[bold green]Timmy Daily Run[/bold green]")
console.print()
config = _load_daily_run_config()
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
schedules = config.get("schedules", {})
triggers = config.get("triggers", {})
if verbose:
config_path = _get_config_dir() / "daily_run.json"
console.print(f"[dim]Config loaded from: {config_path}[/dim]")
console.print()
_display_schedules_table(schedules)
_display_triggers_table(triggers)
# Show the daily run schedule
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
# Show triggers
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
if dry_run:
console.print("[yellow]Dry run mode — no actions executed.[/yellow]")
else:
_execute_all_automations(verbose)
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
for auto in enabled:
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
continue
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
@app.command()

View File

@@ -0,0 +1,530 @@
"""Unit tests for dashboard/routes/daily_run.py."""
from __future__ import annotations
import json
import os
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from urllib.error import HTTPError, URLError
import pytest
from dashboard.routes.daily_run import (
DEFAULT_CONFIG,
LAYER_LABELS,
DailyRunMetrics,
GiteaClient,
LayerMetrics,
_extract_layer,
_fetch_layer_metrics,
_get_metrics,
_get_token,
_load_config,
_load_cycle_data,
)
# ---------------------------------------------------------------------------
# _load_config
# ---------------------------------------------------------------------------
def test_load_config_returns_defaults():
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert config["gitea_api"] == DEFAULT_CONFIG["gitea_api"]
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
def test_load_config_merges_file_orchestrator_section(tmp_path):
config_file = tmp_path / "daily_run.json"
config_file.write_text(
json.dumps({"orchestrator": {"repo_slug": "custom/repo", "gitea_api": "http://custom:3000/api/v1"}})
)
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
config = _load_config()
assert config["repo_slug"] == "custom/repo"
assert config["gitea_api"] == "http://custom:3000/api/v1"
def test_load_config_ignores_invalid_json(tmp_path):
config_file = tmp_path / "daily_run.json"
config_file.write_text("not valid json{{")
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
config = _load_config()
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
def test_load_config_env_overrides(monkeypatch):
monkeypatch.setenv("TIMMY_GITEA_API", "http://envapi:3000/api/v1")
monkeypatch.setenv("TIMMY_REPO_SLUG", "env/repo")
monkeypatch.setenv("TIMMY_GITEA_TOKEN", "env-token-123")
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert config["gitea_api"] == "http://envapi:3000/api/v1"
assert config["repo_slug"] == "env/repo"
assert config["token"] == "env-token-123"
def test_load_config_no_env_overrides_without_vars(monkeypatch):
monkeypatch.delenv("TIMMY_GITEA_API", raising=False)
monkeypatch.delenv("TIMMY_REPO_SLUG", raising=False)
monkeypatch.delenv("TIMMY_GITEA_TOKEN", raising=False)
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert "token" not in config
# ---------------------------------------------------------------------------
# _get_token
# ---------------------------------------------------------------------------
def test_get_token_from_config_dict():
config = {"token": "direct-token", "token_file": "~/.hermes/gitea_token"}
assert _get_token(config) == "direct-token"
def test_get_token_from_file(tmp_path):
token_file = tmp_path / "token.txt"
token_file.write_text(" file-token \n")
config = {"token_file": str(token_file)}
assert _get_token(config) == "file-token"
def test_get_token_returns_none_when_file_missing(tmp_path):
config = {"token_file": str(tmp_path / "nonexistent_token")}
assert _get_token(config) is None
# ---------------------------------------------------------------------------
# GiteaClient
# ---------------------------------------------------------------------------
def _make_client(**kwargs) -> GiteaClient:
config = {**DEFAULT_CONFIG, **kwargs}
return GiteaClient(config, token="test-token")
def test_gitea_client_headers_include_auth():
client = _make_client()
headers = client._headers()
assert headers["Authorization"] == "token test-token"
assert headers["Accept"] == "application/json"
def test_gitea_client_headers_no_token():
config = {**DEFAULT_CONFIG}
client = GiteaClient(config, token=None)
headers = client._headers()
assert "Authorization" not in headers
def test_gitea_client_api_url():
client = _make_client()
url = client._api_url("issues")
assert url == f"{DEFAULT_CONFIG['gitea_api']}/repos/{DEFAULT_CONFIG['repo_slug']}/issues"
def test_gitea_client_api_url_strips_trailing_slash():
config = {**DEFAULT_CONFIG, "gitea_api": "http://localhost:3000/api/v1/"}
client = GiteaClient(config, token=None)
url = client._api_url("issues")
assert "//" not in url.replace("http://", "")
def test_gitea_client_is_available_true():
client = _make_client()
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
assert client.is_available() is True
def test_gitea_client_is_available_cached():
client = _make_client()
client._available = True
# Should not call urlopen at all
with patch("dashboard.routes.daily_run.urlopen") as mock_urlopen:
assert client.is_available() is True
mock_urlopen.assert_not_called()
def test_gitea_client_is_available_false_on_url_error():
client = _make_client()
with patch("dashboard.routes.daily_run.urlopen", side_effect=URLError("refused")):
assert client.is_available() is False
def test_gitea_client_is_available_false_on_timeout():
client = _make_client()
with patch("dashboard.routes.daily_run.urlopen", side_effect=TimeoutError()):
assert client.is_available() is False
def test_gitea_client_get_paginated_single_page():
client = _make_client()
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps([{"id": 1}, {"id": 2}]).encode()
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
result = client.get_paginated("issues")
assert len(result) == 2
assert result[0]["id"] == 1
def test_gitea_client_get_paginated_empty():
client = _make_client()
mock_resp = MagicMock()
mock_resp.read.return_value = b"[]"
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
result = client.get_paginated("issues")
assert result == []
# ---------------------------------------------------------------------------
# LayerMetrics.trend
# ---------------------------------------------------------------------------
def test_layer_metrics_trend_no_previous_no_current():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=0, previous_count=0)
assert lm.trend == ""
def test_layer_metrics_trend_no_previous_with_current():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=0)
assert lm.trend == ""
def test_layer_metrics_trend_big_increase():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=130, previous_count=100)
assert lm.trend == "↑↑"
def test_layer_metrics_trend_small_increase():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=108, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_stable():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_small_decrease():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=92, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_big_decrease():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=70, previous_count=100)
assert lm.trend == "↓↓"
def test_layer_metrics_trend_color_up():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=200, previous_count=100)
assert lm.trend_color == "var(--green)"
def test_layer_metrics_trend_color_down():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=50, previous_count=100)
assert lm.trend_color == "var(--amber)"
def test_layer_metrics_trend_color_stable():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
assert lm.trend_color == "var(--text-dim)"
# ---------------------------------------------------------------------------
# DailyRunMetrics.sessions_trend
# ---------------------------------------------------------------------------
def _make_daily_metrics(**kwargs) -> DailyRunMetrics:
defaults = dict(
sessions_completed=10,
sessions_previous=8,
layers=[],
total_touched_current=20,
total_touched_previous=15,
lookback_days=7,
generated_at=datetime.now(UTC).isoformat(),
)
defaults.update(kwargs)
return DailyRunMetrics(**defaults)
def test_daily_metrics_sessions_trend_big_increase():
m = _make_daily_metrics(sessions_completed=130, sessions_previous=100)
assert m.sessions_trend == "↑↑"
def test_daily_metrics_sessions_trend_stable():
m = _make_daily_metrics(sessions_completed=100, sessions_previous=100)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_no_previous_zero_completed():
m = _make_daily_metrics(sessions_completed=0, sessions_previous=0)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_no_previous_with_completed():
m = _make_daily_metrics(sessions_completed=5, sessions_previous=0)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_color_green():
m = _make_daily_metrics(sessions_completed=200, sessions_previous=100)
assert m.sessions_trend_color == "var(--green)"
def test_daily_metrics_sessions_trend_color_amber():
m = _make_daily_metrics(sessions_completed=50, sessions_previous=100)
assert m.sessions_trend_color == "var(--amber)"
# ---------------------------------------------------------------------------
# _extract_layer
# ---------------------------------------------------------------------------
def test_extract_layer_finds_layer_label():
labels = [{"name": "bug"}, {"name": "layer:triage"}, {"name": "urgent"}]
assert _extract_layer(labels) == "triage"
def test_extract_layer_returns_none_when_no_layer():
labels = [{"name": "bug"}, {"name": "feature"}]
assert _extract_layer(labels) is None
def test_extract_layer_empty_labels():
assert _extract_layer([]) is None
def test_extract_layer_first_match_wins():
labels = [{"name": "layer:micro-fix"}, {"name": "layer:tests"}]
assert _extract_layer(labels) == "micro-fix"
# ---------------------------------------------------------------------------
# _load_cycle_data
# ---------------------------------------------------------------------------
def test_load_cycle_data_missing_file(tmp_path):
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=14)
assert result == {"current": 0, "previous": 0}
def test_load_cycle_data_counts_successful_sessions(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
now = datetime.now(UTC)
recent_ts = (now - timedelta(days=3)).isoformat()
older_ts = (now - timedelta(days=10)).isoformat()
old_ts = (now - timedelta(days=20)).isoformat()
lines = [
json.dumps({"timestamp": recent_ts, "success": True}),
json.dumps({"timestamp": recent_ts, "success": False}), # not counted
json.dumps({"timestamp": older_ts, "success": True}),
json.dumps({"timestamp": old_ts, "success": True}), # outside window
]
retro_file.write_text("\n".join(lines))
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result["current"] == 1
assert result["previous"] == 1
def test_load_cycle_data_skips_invalid_json_lines(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
now = datetime.now(UTC)
recent_ts = (now - timedelta(days=1)).isoformat()
retro_file.write_text(
f'not valid json\n{json.dumps({"timestamp": recent_ts, "success": True})}\n'
)
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result["current"] == 1
def test_load_cycle_data_skips_entries_with_no_timestamp(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
retro_file.write_text(json.dumps({"success": True}))
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result == {"current": 0, "previous": 0}
# ---------------------------------------------------------------------------
# _fetch_layer_metrics
# ---------------------------------------------------------------------------
def _make_issue(updated_offset_days: int) -> dict:
ts = (datetime.now(UTC) - timedelta(days=updated_offset_days)).isoformat()
return {"updated_at": ts, "labels": [{"name": "layer:triage"}]}
def test_fetch_layer_metrics_counts_current_and_previous():
client = _make_client()
client._available = True
recent_issue = _make_issue(updated_offset_days=3)
older_issue = _make_issue(updated_offset_days=10)
with patch.object(client, "get_paginated", return_value=[recent_issue, older_issue]):
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
# Should have one entry per LAYER_LABELS
assert len(layers) == len(LAYER_LABELS)
triage = next(lm for lm in layers if lm.name == "triage")
assert triage.current_count == 1
assert triage.previous_count == 1
def test_fetch_layer_metrics_degrades_on_http_error():
client = _make_client()
client._available = True
with patch.object(client, "get_paginated", side_effect=URLError("network")):
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
assert len(layers) == len(LAYER_LABELS)
for lm in layers:
assert lm.current_count == 0
assert lm.previous_count == 0
assert total_current == 0
assert total_previous == 0
# ---------------------------------------------------------------------------
# _get_metrics
# ---------------------------------------------------------------------------
def test_get_metrics_returns_none_when_gitea_unavailable():
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value=None):
with patch.object(GiteaClient, "is_available", return_value=False):
result = _get_metrics()
assert result is None
def test_get_metrics_returns_daily_run_metrics():
mock_layers = [
LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=3)
]
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
with patch.object(GiteaClient, "is_available", return_value=True):
with patch(
"dashboard.routes.daily_run._fetch_layer_metrics",
return_value=(mock_layers, 5, 3),
):
with patch(
"dashboard.routes.daily_run._load_cycle_data",
return_value={"current": 10, "previous": 8},
):
result = _get_metrics(lookback_days=7)
assert result is not None
assert result.sessions_completed == 10
assert result.sessions_previous == 8
assert result.lookback_days == 7
assert result.layers == mock_layers
def test_get_metrics_returns_none_on_exception():
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
with patch.object(GiteaClient, "is_available", return_value=True):
with patch(
"dashboard.routes.daily_run._fetch_layer_metrics",
side_effect=Exception("unexpected"),
):
result = _get_metrics()
assert result is None
# ---------------------------------------------------------------------------
# Route handlers (FastAPI)
# ---------------------------------------------------------------------------
def test_daily_run_metrics_api_unavailable(client):
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
resp = client.get("/daily-run/metrics")
assert resp.status_code == 503
data = resp.json()
assert data["status"] == "unavailable"
def test_daily_run_metrics_api_returns_json(client):
mock_metrics = _make_daily_metrics(
layers=[
LayerMetrics(name="triage", label="layer:triage", current_count=3, previous_count=2)
]
)
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
with patch(
"dashboard.routes.quests.check_daily_run_quests",
return_value=[],
create=True,
):
resp = client.get("/daily-run/metrics?lookback_days=7")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["lookback_days"] == 7
assert "sessions" in data
assert "layers" in data
assert "totals" in data
assert len(data["layers"]) == 1
assert data["layers"][0]["name"] == "triage"
def test_daily_run_panel_returns_html(client):
mock_metrics = _make_daily_metrics()
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
resp = client.get("/daily-run/panel")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
def test_daily_run_panel_when_unavailable(client):
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
resp = client.get("/daily-run/panel")
assert resp.status_code == 200

View File

@@ -1,21 +1,10 @@
"""Tests for the async event bus (infrastructure.events.bus)."""
import sqlite3
from pathlib import Path
from unittest.mock import patch
import pytest
import infrastructure.events.bus as bus_module
from infrastructure.events.bus import (
Event,
EventBus,
emit,
event_bus,
get_event_bus,
init_event_bus_persistence,
on,
)
from infrastructure.events.bus import Event, EventBus, emit, event_bus, on
class TestEvent:
@@ -360,111 +349,3 @@ class TestEventBusPersistence:
assert mode == "wal"
finally:
conn.close()
async def test_persist_event_exception_is_swallowed(self, tmp_path):
"""_persist_event must not propagate SQLite errors."""
from unittest.mock import MagicMock
bus = EventBus()
bus.enable_persistence(tmp_path / "events.db")
# Make the INSERT raise an OperationalError
mock_conn = MagicMock()
mock_conn.execute.side_effect = sqlite3.OperationalError("simulated failure")
from contextlib import contextmanager
@contextmanager
def fake_ctx():
yield mock_conn
with patch.object(bus, "_get_persistence_conn", fake_ctx):
# Should not raise
bus._persist_event(Event(type="x", source="s"))
async def test_replay_exception_returns_empty(self, tmp_path):
"""replay() must return [] when SQLite query fails."""
from unittest.mock import MagicMock
bus = EventBus()
bus.enable_persistence(tmp_path / "events.db")
mock_conn = MagicMock()
mock_conn.execute.side_effect = sqlite3.OperationalError("simulated failure")
from contextlib import contextmanager
@contextmanager
def fake_ctx():
yield mock_conn
with patch.object(bus, "_get_persistence_conn", fake_ctx):
result = bus.replay()
assert result == []
# ── Singleton helpers ─────────────────────────────────────────────────────────
class TestSingletonHelpers:
"""Test get_event_bus(), init_event_bus_persistence(), and module __getattr__."""
def test_get_event_bus_returns_same_instance(self):
"""get_event_bus() is a true singleton."""
a = get_event_bus()
b = get_event_bus()
assert a is b
def test_module_event_bus_attr_is_singleton(self):
"""Accessing bus_module.event_bus via __getattr__ returns the singleton."""
assert bus_module.event_bus is get_event_bus()
def test_module_getattr_unknown_raises(self):
"""Accessing an unknown module attribute raises AttributeError."""
with pytest.raises(AttributeError):
_ = bus_module.no_such_attr # type: ignore[attr-defined]
def test_init_event_bus_persistence_sets_path(self, tmp_path):
"""init_event_bus_persistence() enables persistence on the singleton."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None # reset for the test
db_path = tmp_path / "test_init.db"
init_event_bus_persistence(db_path)
assert bus._persistence_db_path == db_path
finally:
bus._persistence_db_path = original_path
def test_init_event_bus_persistence_is_idempotent(self, tmp_path):
"""Calling init_event_bus_persistence() twice keeps the first path."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None
first_path = tmp_path / "first.db"
second_path = tmp_path / "second.db"
init_event_bus_persistence(first_path)
init_event_bus_persistence(second_path) # should be ignored
assert bus._persistence_db_path == first_path
finally:
bus._persistence_db_path = original_path
def test_init_event_bus_persistence_default_path(self):
"""init_event_bus_persistence() uses 'data/events.db' when no path given."""
bus = get_event_bus()
original_path = bus._persistence_db_path
try:
bus._persistence_db_path = None
# Patch enable_persistence to capture what path it receives
captured = {}
def fake_enable(path: Path) -> None:
captured["path"] = path
with patch.object(bus, "enable_persistence", side_effect=fake_enable):
init_event_bus_persistence()
assert captured["path"] == Path("data/events.db")
finally:
bus._persistence_db_path = original_path

View File

@@ -1376,141 +1376,3 @@ class TestIsProviderAvailable:
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
@pytest.mark.unit
class TestFilterProviders:
"""Test _filter_providers helper extracted from complete()."""
def _router(self) -> CascadeRouter:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(
name="anthropic-p",
type="anthropic",
enabled=True,
priority=1,
api_key="key",
tier="frontier",
),
Provider(
name="ollama-p",
type="ollama",
enabled=True,
priority=2,
tier="local",
),
]
return router
def test_no_tier_returns_all_providers(self):
router = self._router()
result = router._filter_providers(None)
assert result is router.providers
def test_frontier_required_returns_only_anthropic(self):
router = self._router()
result = router._filter_providers("frontier_required")
assert len(result) == 1
assert result[0].type == "anthropic"
def test_frontier_required_no_anthropic_raises(self):
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(name="ollama-p", type="ollama", enabled=True, priority=1)
]
with pytest.raises(RuntimeError, match="No Anthropic provider configured"):
router._filter_providers("frontier_required")
def test_named_tier_filters_by_tier(self):
router = self._router()
result = router._filter_providers("local")
assert len(result) == 1
assert result[0].name == "ollama-p"
def test_named_tier_not_found_raises(self):
router = self._router()
with pytest.raises(RuntimeError, match="No providers found for tier"):
router._filter_providers("nonexistent")
@pytest.mark.unit
@pytest.mark.asyncio
class TestTrySingleProvider:
"""Test _try_single_provider helper extracted from complete()."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def _provider(self, name: str = "test", ptype: str = "ollama") -> Provider:
return Provider(
name=name,
type=ptype,
enabled=True,
priority=1,
models=[{"name": "llama3.2", "default": True}],
)
async def test_unavailable_provider_returns_none(self):
router = self._router()
provider = self._provider()
provider.enabled = False
errors: list[str] = []
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_quota_blocked_cloud_provider_returns_none(self):
router = self._router()
provider = self._provider(ptype="anthropic")
errors: list[str] = []
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
mock_qm.check.return_value = None
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_success_returns_result_dict(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "hi", "model": "llama3.2"}
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is not None
assert result["content"] == "hi"
assert result["provider"] == "test"
assert errors == []
async def test_failure_appends_error_and_returns_none(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.side_effect = RuntimeError("boom")
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is None
assert len(errors) == 1
assert "boom" in errors[0]
assert provider.metrics.failed_requests == 1