1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
617ef43f99 test: add unit tests for daily_run.py — 51 tests covering main handlers
Adds tests/dashboard/test_daily_run.py with 51 test cases covering:
- _load_config(): defaults, file loading, env var overrides, invalid JSON
- _get_token(): from config dict, from file, missing file
- GiteaClient: headers, api_url, is_available (true/false/cached), get_paginated
- LayerMetrics: trend and trend_color properties (all directions)
- DailyRunMetrics: sessions_trend and sessions_trend_color properties
- _extract_layer(): label extraction from issue label lists
- _load_cycle_data(): success counting, invalid JSON lines, missing timestamps
- _fetch_layer_metrics(): counting logic, graceful degradation on errors
- _get_metrics(): unavailable client, happy path, exception handling
- Route handlers: /daily-run/metrics (JSON) and /daily-run/panel (HTML)

All 51 tests pass. tox -e unit remains green (293 passing).

Fixes #1186

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 17:57:26 -04:00
8 changed files with 760 additions and 750 deletions

View File

@@ -242,64 +242,6 @@ def produce_agent_state(agent_id: str, presence: dict) -> dict:
}
def _get_agents_online() -> int:
"""Return the count of agents with a non-offline status."""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
return sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
return 0
def _get_visitors() -> int:
"""Return the count of active WebSocket visitor clients."""
try:
from dashboard.routes.world import _ws_clients
return len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
return 0
def _get_uptime_seconds() -> int:
"""Return seconds elapsed since application start."""
try:
from config import APP_START_TIME
return int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
return 0
def _get_thinking_active() -> bool:
"""Return True if the thinking engine is enabled and running."""
try:
from config import settings
from timmy.thinking import thinking_engine
return settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
return False
def _get_memory_count() -> int:
"""Return total entries in the vector memory store."""
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
return stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return 0
def produce_system_status() -> dict:
"""Generate a system_status message for the Matrix.
@@ -328,14 +270,64 @@ def produce_system_status() -> dict:
"ts": 1742529600,
}
"""
# Count agents with status != offline
agents_online = 0
try:
from timmy.agents.loader import list_agents
agents = list_agents()
agents_online = sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
# Count visitors from WebSocket clients
visitors = 0
try:
from dashboard.routes.world import _ws_clients
visitors = len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
# Calculate uptime
uptime_seconds = 0
try:
from datetime import UTC
from config import APP_START_TIME
uptime_seconds = int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
# Check thinking engine status
thinking_active = False
try:
from config import settings
from timmy.thinking import thinking_engine
thinking_active = settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
# Count memories in vector store
memory_count = 0
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
memory_count = stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return {
"type": "system_status",
"data": {
"agents_online": _get_agents_online(),
"visitors": _get_visitors(),
"uptime_seconds": _get_uptime_seconds(),
"thinking_active": _get_thinking_active(),
"memory_count": _get_memory_count(),
"agents_online": agents_online,
"visitors": visitors,
"uptime_seconds": uptime_seconds,
"thinking_active": thinking_active,
"memory_count": memory_count,
},
"ts": int(time.time()),
}

View File

@@ -528,71 +528,6 @@ class CascadeRouter:
return True
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
"""Return the provider list filtered by tier.
Raises:
RuntimeError: If a tier is specified but no matching providers exist.
"""
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
return providers
if cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
return providers
return self.providers
async def _try_single_provider(
self,
provider: "Provider",
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
errors: list[str],
) -> dict | None:
"""Attempt one provider, returning a result dict on success or None on failure.
On failure the error string is appended to *errors* and the provider's
failure metrics are updated so the caller can move on to the next provider.
"""
if not self._is_provider_available(provider):
return None
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
return None
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider, messages, selected_model, temperature, max_tokens, content_type
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
return None
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
async def complete(
self,
messages: list[dict],
@@ -626,15 +561,55 @@ class CascadeRouter:
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
errors: list[str] = []
providers = self._filter_providers(cascade_tier)
errors = []
providers = self.providers
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
elif cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
result = await self._try_single_provider(
provider, messages, model, temperature, max_tokens, content_type, errors
)
if result is not None:
return result
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -110,92 +110,6 @@ async def _get_or_create_label(
return None
# ---------------------------------------------------------------------------
# Dispatch action helpers
# ---------------------------------------------------------------------------
async def _apply_label_to_issue(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
label_name: str,
) -> bool:
"""Get-or-create the label then apply it to the issue. Returns True on success."""
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
if label_id is None:
return False
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
async def _post_dispatch_comment(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue: TriagedIssue,
label_name: str,
) -> bool:
"""Post the vassal routing comment. Returns True on success."""
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
return resp.status_code in (200, 201)
async def _perform_gitea_dispatch(
issue: TriagedIssue,
record: DispatchRecord,
) -> None:
"""Apply label and post comment via Gitea. Mutates *record* in-place."""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
return
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
record.label_applied = await _apply_label_to_issue(
client, base_url, headers, repo, issue.number, label_name
)
record.comment_posted = await _post_dispatch_comment(
client, base_url, headers, repo, issue, label_name
)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
@@ -230,7 +144,58 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
_registry[issue.number] = record
return record
await _perform_gitea_dispatch(issue, record)
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(

View File

@@ -95,106 +95,6 @@ def _get_config_dir() -> Path:
return DEFAULT_CONFIG_DIR
def _load_daily_run_config() -> dict[str, Any]:
"""Load and validate the daily run configuration."""
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
return config
def _display_schedules_table(schedules: dict[str, Any]) -> None:
"""Display the daily run schedules in a table."""
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
def _display_triggers_table(triggers: dict[str, Any]) -> None:
"""Display the triggers in a table."""
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
def _execute_automation(auto: dict[str, Any], verbose: bool) -> None:
"""Execute a single automation and display results."""
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
return
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
def _execute_all_automations(verbose: bool) -> None:
"""Execute all enabled automations."""
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
return
for auto in enabled:
_execute_automation(auto, verbose)
@app.command()
def daily_run(
dry_run: bool = typer.Option(
@@ -213,22 +113,93 @@ def daily_run(
console.print("[bold green]Timmy Daily Run[/bold green]")
console.print()
config = _load_daily_run_config()
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
schedules = config.get("schedules", {})
triggers = config.get("triggers", {})
if verbose:
config_path = _get_config_dir() / "daily_run.json"
console.print(f"[dim]Config loaded from: {config_path}[/dim]")
console.print()
_display_schedules_table(schedules)
_display_triggers_table(triggers)
# Show the daily run schedule
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
# Show triggers
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
if dry_run:
console.print("[yellow]Dry run mode — no actions executed.[/yellow]")
else:
_execute_all_automations(verbose)
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
for auto in enabled:
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
continue
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
@app.command()

View File

@@ -0,0 +1,530 @@
"""Unit tests for dashboard/routes/daily_run.py."""
from __future__ import annotations
import json
import os
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from urllib.error import HTTPError, URLError
import pytest
from dashboard.routes.daily_run import (
DEFAULT_CONFIG,
LAYER_LABELS,
DailyRunMetrics,
GiteaClient,
LayerMetrics,
_extract_layer,
_fetch_layer_metrics,
_get_metrics,
_get_token,
_load_config,
_load_cycle_data,
)
# ---------------------------------------------------------------------------
# _load_config
# ---------------------------------------------------------------------------
def test_load_config_returns_defaults():
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert config["gitea_api"] == DEFAULT_CONFIG["gitea_api"]
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
def test_load_config_merges_file_orchestrator_section(tmp_path):
config_file = tmp_path / "daily_run.json"
config_file.write_text(
json.dumps({"orchestrator": {"repo_slug": "custom/repo", "gitea_api": "http://custom:3000/api/v1"}})
)
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
config = _load_config()
assert config["repo_slug"] == "custom/repo"
assert config["gitea_api"] == "http://custom:3000/api/v1"
def test_load_config_ignores_invalid_json(tmp_path):
config_file = tmp_path / "daily_run.json"
config_file.write_text("not valid json{{")
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
config = _load_config()
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
def test_load_config_env_overrides(monkeypatch):
monkeypatch.setenv("TIMMY_GITEA_API", "http://envapi:3000/api/v1")
monkeypatch.setenv("TIMMY_REPO_SLUG", "env/repo")
monkeypatch.setenv("TIMMY_GITEA_TOKEN", "env-token-123")
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert config["gitea_api"] == "http://envapi:3000/api/v1"
assert config["repo_slug"] == "env/repo"
assert config["token"] == "env-token-123"
def test_load_config_no_env_overrides_without_vars(monkeypatch):
monkeypatch.delenv("TIMMY_GITEA_API", raising=False)
monkeypatch.delenv("TIMMY_REPO_SLUG", raising=False)
monkeypatch.delenv("TIMMY_GITEA_TOKEN", raising=False)
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
mock_path.exists.return_value = False
config = _load_config()
assert "token" not in config
# ---------------------------------------------------------------------------
# _get_token
# ---------------------------------------------------------------------------
def test_get_token_from_config_dict():
config = {"token": "direct-token", "token_file": "~/.hermes/gitea_token"}
assert _get_token(config) == "direct-token"
def test_get_token_from_file(tmp_path):
token_file = tmp_path / "token.txt"
token_file.write_text(" file-token \n")
config = {"token_file": str(token_file)}
assert _get_token(config) == "file-token"
def test_get_token_returns_none_when_file_missing(tmp_path):
config = {"token_file": str(tmp_path / "nonexistent_token")}
assert _get_token(config) is None
# ---------------------------------------------------------------------------
# GiteaClient
# ---------------------------------------------------------------------------
def _make_client(**kwargs) -> GiteaClient:
config = {**DEFAULT_CONFIG, **kwargs}
return GiteaClient(config, token="test-token")
def test_gitea_client_headers_include_auth():
client = _make_client()
headers = client._headers()
assert headers["Authorization"] == "token test-token"
assert headers["Accept"] == "application/json"
def test_gitea_client_headers_no_token():
config = {**DEFAULT_CONFIG}
client = GiteaClient(config, token=None)
headers = client._headers()
assert "Authorization" not in headers
def test_gitea_client_api_url():
client = _make_client()
url = client._api_url("issues")
assert url == f"{DEFAULT_CONFIG['gitea_api']}/repos/{DEFAULT_CONFIG['repo_slug']}/issues"
def test_gitea_client_api_url_strips_trailing_slash():
config = {**DEFAULT_CONFIG, "gitea_api": "http://localhost:3000/api/v1/"}
client = GiteaClient(config, token=None)
url = client._api_url("issues")
assert "//" not in url.replace("http://", "")
def test_gitea_client_is_available_true():
client = _make_client()
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
assert client.is_available() is True
def test_gitea_client_is_available_cached():
client = _make_client()
client._available = True
# Should not call urlopen at all
with patch("dashboard.routes.daily_run.urlopen") as mock_urlopen:
assert client.is_available() is True
mock_urlopen.assert_not_called()
def test_gitea_client_is_available_false_on_url_error():
client = _make_client()
with patch("dashboard.routes.daily_run.urlopen", side_effect=URLError("refused")):
assert client.is_available() is False
def test_gitea_client_is_available_false_on_timeout():
client = _make_client()
with patch("dashboard.routes.daily_run.urlopen", side_effect=TimeoutError()):
assert client.is_available() is False
def test_gitea_client_get_paginated_single_page():
client = _make_client()
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps([{"id": 1}, {"id": 2}]).encode()
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
result = client.get_paginated("issues")
assert len(result) == 2
assert result[0]["id"] == 1
def test_gitea_client_get_paginated_empty():
client = _make_client()
mock_resp = MagicMock()
mock_resp.read.return_value = b"[]"
mock_resp.__enter__ = lambda s: mock_resp
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
result = client.get_paginated("issues")
assert result == []
# ---------------------------------------------------------------------------
# LayerMetrics.trend
# ---------------------------------------------------------------------------
def test_layer_metrics_trend_no_previous_no_current():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=0, previous_count=0)
assert lm.trend == ""
def test_layer_metrics_trend_no_previous_with_current():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=0)
assert lm.trend == ""
def test_layer_metrics_trend_big_increase():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=130, previous_count=100)
assert lm.trend == "↑↑"
def test_layer_metrics_trend_small_increase():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=108, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_stable():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_small_decrease():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=92, previous_count=100)
assert lm.trend == ""
def test_layer_metrics_trend_big_decrease():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=70, previous_count=100)
assert lm.trend == "↓↓"
def test_layer_metrics_trend_color_up():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=200, previous_count=100)
assert lm.trend_color == "var(--green)"
def test_layer_metrics_trend_color_down():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=50, previous_count=100)
assert lm.trend_color == "var(--amber)"
def test_layer_metrics_trend_color_stable():
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
assert lm.trend_color == "var(--text-dim)"
# ---------------------------------------------------------------------------
# DailyRunMetrics.sessions_trend
# ---------------------------------------------------------------------------
def _make_daily_metrics(**kwargs) -> DailyRunMetrics:
defaults = dict(
sessions_completed=10,
sessions_previous=8,
layers=[],
total_touched_current=20,
total_touched_previous=15,
lookback_days=7,
generated_at=datetime.now(UTC).isoformat(),
)
defaults.update(kwargs)
return DailyRunMetrics(**defaults)
def test_daily_metrics_sessions_trend_big_increase():
m = _make_daily_metrics(sessions_completed=130, sessions_previous=100)
assert m.sessions_trend == "↑↑"
def test_daily_metrics_sessions_trend_stable():
m = _make_daily_metrics(sessions_completed=100, sessions_previous=100)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_no_previous_zero_completed():
m = _make_daily_metrics(sessions_completed=0, sessions_previous=0)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_no_previous_with_completed():
m = _make_daily_metrics(sessions_completed=5, sessions_previous=0)
assert m.sessions_trend == ""
def test_daily_metrics_sessions_trend_color_green():
m = _make_daily_metrics(sessions_completed=200, sessions_previous=100)
assert m.sessions_trend_color == "var(--green)"
def test_daily_metrics_sessions_trend_color_amber():
m = _make_daily_metrics(sessions_completed=50, sessions_previous=100)
assert m.sessions_trend_color == "var(--amber)"
# ---------------------------------------------------------------------------
# _extract_layer
# ---------------------------------------------------------------------------
def test_extract_layer_finds_layer_label():
labels = [{"name": "bug"}, {"name": "layer:triage"}, {"name": "urgent"}]
assert _extract_layer(labels) == "triage"
def test_extract_layer_returns_none_when_no_layer():
labels = [{"name": "bug"}, {"name": "feature"}]
assert _extract_layer(labels) is None
def test_extract_layer_empty_labels():
assert _extract_layer([]) is None
def test_extract_layer_first_match_wins():
labels = [{"name": "layer:micro-fix"}, {"name": "layer:tests"}]
assert _extract_layer(labels) == "micro-fix"
# ---------------------------------------------------------------------------
# _load_cycle_data
# ---------------------------------------------------------------------------
def test_load_cycle_data_missing_file(tmp_path):
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=14)
assert result == {"current": 0, "previous": 0}
def test_load_cycle_data_counts_successful_sessions(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
now = datetime.now(UTC)
recent_ts = (now - timedelta(days=3)).isoformat()
older_ts = (now - timedelta(days=10)).isoformat()
old_ts = (now - timedelta(days=20)).isoformat()
lines = [
json.dumps({"timestamp": recent_ts, "success": True}),
json.dumps({"timestamp": recent_ts, "success": False}), # not counted
json.dumps({"timestamp": older_ts, "success": True}),
json.dumps({"timestamp": old_ts, "success": True}), # outside window
]
retro_file.write_text("\n".join(lines))
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result["current"] == 1
assert result["previous"] == 1
def test_load_cycle_data_skips_invalid_json_lines(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
now = datetime.now(UTC)
recent_ts = (now - timedelta(days=1)).isoformat()
retro_file.write_text(
f'not valid json\n{json.dumps({"timestamp": recent_ts, "success": True})}\n'
)
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result["current"] == 1
def test_load_cycle_data_skips_entries_with_no_timestamp(tmp_path):
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
retro_file = retro_dir / "cycles.jsonl"
retro_file.write_text(json.dumps({"success": True}))
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
result = _load_cycle_data(days=7)
assert result == {"current": 0, "previous": 0}
# ---------------------------------------------------------------------------
# _fetch_layer_metrics
# ---------------------------------------------------------------------------
def _make_issue(updated_offset_days: int) -> dict:
ts = (datetime.now(UTC) - timedelta(days=updated_offset_days)).isoformat()
return {"updated_at": ts, "labels": [{"name": "layer:triage"}]}
def test_fetch_layer_metrics_counts_current_and_previous():
client = _make_client()
client._available = True
recent_issue = _make_issue(updated_offset_days=3)
older_issue = _make_issue(updated_offset_days=10)
with patch.object(client, "get_paginated", return_value=[recent_issue, older_issue]):
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
# Should have one entry per LAYER_LABELS
assert len(layers) == len(LAYER_LABELS)
triage = next(lm for lm in layers if lm.name == "triage")
assert triage.current_count == 1
assert triage.previous_count == 1
def test_fetch_layer_metrics_degrades_on_http_error():
client = _make_client()
client._available = True
with patch.object(client, "get_paginated", side_effect=URLError("network")):
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
assert len(layers) == len(LAYER_LABELS)
for lm in layers:
assert lm.current_count == 0
assert lm.previous_count == 0
assert total_current == 0
assert total_previous == 0
# ---------------------------------------------------------------------------
# _get_metrics
# ---------------------------------------------------------------------------
def test_get_metrics_returns_none_when_gitea_unavailable():
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value=None):
with patch.object(GiteaClient, "is_available", return_value=False):
result = _get_metrics()
assert result is None
def test_get_metrics_returns_daily_run_metrics():
mock_layers = [
LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=3)
]
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
with patch.object(GiteaClient, "is_available", return_value=True):
with patch(
"dashboard.routes.daily_run._fetch_layer_metrics",
return_value=(mock_layers, 5, 3),
):
with patch(
"dashboard.routes.daily_run._load_cycle_data",
return_value={"current": 10, "previous": 8},
):
result = _get_metrics(lookback_days=7)
assert result is not None
assert result.sessions_completed == 10
assert result.sessions_previous == 8
assert result.lookback_days == 7
assert result.layers == mock_layers
def test_get_metrics_returns_none_on_exception():
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
with patch.object(GiteaClient, "is_available", return_value=True):
with patch(
"dashboard.routes.daily_run._fetch_layer_metrics",
side_effect=Exception("unexpected"),
):
result = _get_metrics()
assert result is None
# ---------------------------------------------------------------------------
# Route handlers (FastAPI)
# ---------------------------------------------------------------------------
def test_daily_run_metrics_api_unavailable(client):
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
resp = client.get("/daily-run/metrics")
assert resp.status_code == 503
data = resp.json()
assert data["status"] == "unavailable"
def test_daily_run_metrics_api_returns_json(client):
mock_metrics = _make_daily_metrics(
layers=[
LayerMetrics(name="triage", label="layer:triage", current_count=3, previous_count=2)
]
)
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
with patch(
"dashboard.routes.quests.check_daily_run_quests",
return_value=[],
create=True,
):
resp = client.get("/daily-run/metrics?lookback_days=7")
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["lookback_days"] == 7
assert "sessions" in data
assert "layers" in data
assert "totals" in data
assert len(data["layers"]) == 1
assert data["layers"][0]["name"] == "triage"
def test_daily_run_panel_returns_html(client):
mock_metrics = _make_daily_metrics()
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
resp = client.get("/daily-run/panel")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
def test_daily_run_panel_when_unavailable(client):
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
resp = client.get("/daily-run/panel")
assert resp.status_code == 200

View File

@@ -1,247 +0,0 @@
"""Unit tests for src/infrastructure/chat_store.py."""
import sqlite3
import threading
from pathlib import Path
from unittest.mock import patch
import pytest
from src.infrastructure.chat_store import MAX_MESSAGES, Message, MessageLog, _get_conn
pytestmark = pytest.mark.unit
@pytest.fixture()
def tmp_db(tmp_path: Path) -> Path:
"""Return a temporary database path."""
return tmp_path / "test_chat.db"
@pytest.fixture()
def log(tmp_db: Path) -> MessageLog:
"""Return a MessageLog backed by a temp database."""
ml = MessageLog(db_path=tmp_db)
yield ml
ml.close()
# ── Message dataclass ──────────────────────────────────────────────────
class TestMessage:
def test_default_source(self):
m = Message(role="user", content="hi", timestamp="2026-01-01T00:00:00")
assert m.source == "browser"
def test_custom_source(self):
m = Message(role="agent", content="ok", timestamp="t1", source="telegram")
assert m.source == "telegram"
def test_fields(self):
m = Message(role="error", content="boom", timestamp="t2", source="api")
assert m.role == "error"
assert m.content == "boom"
assert m.timestamp == "t2"
# ── _get_conn context manager ──────────────────────────────────────────
class TestGetConn:
def test_creates_db_and_table(self, tmp_db: Path):
with _get_conn(tmp_db) as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
names = [t["name"] for t in tables]
assert "chat_messages" in names
def test_creates_parent_dirs(self, tmp_path: Path):
deep = tmp_path / "a" / "b" / "c" / "chat.db"
with _get_conn(deep) as conn:
assert deep.parent.exists()
def test_connection_closed_after_context(self, tmp_db: Path):
with _get_conn(tmp_db) as conn:
conn.execute("SELECT 1")
# Connection should be closed — operations should fail
with pytest.raises(Exception):
conn.execute("SELECT 1")
# ── MessageLog core operations ─────────────────────────────────────────
class TestMessageLogAppendAndAll:
def test_append_and_all(self, log: MessageLog):
log.append("user", "hello", "t1")
log.append("agent", "hi back", "t2", source="api")
msgs = log.all()
assert len(msgs) == 2
assert msgs[0].role == "user"
assert msgs[0].content == "hello"
assert msgs[0].source == "browser"
assert msgs[1].role == "agent"
assert msgs[1].source == "api"
def test_all_returns_ordered_by_id(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
msgs = log.all()
assert [m.content for m in msgs] == [f"msg{i}" for i in range(5)]
def test_all_empty_store(self, log: MessageLog):
assert log.all() == []
class TestMessageLogRecent:
def test_recent_returns_newest(self, log: MessageLog):
for i in range(10):
log.append("user", f"msg{i}", f"t{i}")
recent = log.recent(limit=3)
assert len(recent) == 3
assert recent[0].content == "msg7"
assert recent[2].content == "msg9"
def test_recent_oldest_first(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
recent = log.recent(limit=3)
# Should be oldest-first within the window
assert recent[0].content == "msg2"
assert recent[1].content == "msg3"
assert recent[2].content == "msg4"
def test_recent_more_than_exists(self, log: MessageLog):
log.append("user", "only", "t0")
recent = log.recent(limit=100)
assert len(recent) == 1
def test_recent_empty_store(self, log: MessageLog):
assert log.recent() == []
class TestMessageLogClear:
def test_clear_removes_all(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 5
log.clear()
assert len(log) == 0
assert log.all() == []
def test_clear_empty_store(self, log: MessageLog):
log.clear() # Should not raise
assert len(log) == 0
class TestMessageLogLen:
def test_len_empty(self, log: MessageLog):
assert len(log) == 0
def test_len_after_appends(self, log: MessageLog):
for i in range(7):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 7
class TestMessageLogClose:
def test_close_sets_conn_none(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.append("user", "x", "t0")
ml.close()
assert ml._conn is None
def test_close_idempotent(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.close()
ml.close() # Should not raise
def test_reopen_after_close(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.append("user", "before", "t0")
ml.close()
# Should reconnect on next use
ml.append("user", "after", "t1")
assert len(ml) == 2
ml.close()
# ── Pruning ────────────────────────────────────────────────────────────
class TestPrune:
def test_prune_keeps_max_messages(self, tmp_db: Path):
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 5):
ml = MessageLog(db_path=tmp_db)
for i in range(10):
ml.append("user", f"msg{i}", f"t{i}")
# Should have pruned to 5
assert len(ml) == 5
msgs = ml.all()
# Oldest should be pruned, newest kept
assert msgs[0].content == "msg5"
assert msgs[-1].content == "msg9"
ml.close()
def test_no_prune_under_limit(self, tmp_db: Path):
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 100):
ml = MessageLog(db_path=tmp_db)
for i in range(10):
ml.append("user", f"msg{i}", f"t{i}")
assert len(ml) == 10
ml.close()
# ── Thread safety ──────────────────────────────────────────────────────
class TestThreadSafety:
def test_concurrent_appends(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
errors = []
def writer(start: int):
try:
for i in range(20):
ml.append("user", f"msg{start + i}", f"t{start + i}")
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=writer, args=(i * 20,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Thread errors: {errors}"
assert len(ml) == 100
ml.close()
# ── Edge cases ─────────────────────────────────────────────────────────
class TestEdgeCases:
def test_empty_content(self, log: MessageLog):
log.append("user", "", "t0")
msgs = log.all()
assert len(msgs) == 1
assert msgs[0].content == ""
def test_unicode_content(self, log: MessageLog):
log.append("user", "こんにちは 🎉 مرحبا", "t0")
msgs = log.all()
assert msgs[0].content == "こんにちは 🎉 مرحبا"
def test_multiline_content(self, log: MessageLog):
content = "line1\nline2\nline3"
log.append("user", content, "t0")
assert log.all()[0].content == content
def test_special_sql_characters(self, log: MessageLog):
log.append("user", "Robert'; DROP TABLE chat_messages;--", "t0")
msgs = log.all()
assert len(msgs) == 1
assert "DROP TABLE" in msgs[0].content

View File

@@ -1376,141 +1376,3 @@ class TestIsProviderAvailable:
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
@pytest.mark.unit
class TestFilterProviders:
"""Test _filter_providers helper extracted from complete()."""
def _router(self) -> CascadeRouter:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(
name="anthropic-p",
type="anthropic",
enabled=True,
priority=1,
api_key="key",
tier="frontier",
),
Provider(
name="ollama-p",
type="ollama",
enabled=True,
priority=2,
tier="local",
),
]
return router
def test_no_tier_returns_all_providers(self):
router = self._router()
result = router._filter_providers(None)
assert result is router.providers
def test_frontier_required_returns_only_anthropic(self):
router = self._router()
result = router._filter_providers("frontier_required")
assert len(result) == 1
assert result[0].type == "anthropic"
def test_frontier_required_no_anthropic_raises(self):
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(name="ollama-p", type="ollama", enabled=True, priority=1)
]
with pytest.raises(RuntimeError, match="No Anthropic provider configured"):
router._filter_providers("frontier_required")
def test_named_tier_filters_by_tier(self):
router = self._router()
result = router._filter_providers("local")
assert len(result) == 1
assert result[0].name == "ollama-p"
def test_named_tier_not_found_raises(self):
router = self._router()
with pytest.raises(RuntimeError, match="No providers found for tier"):
router._filter_providers("nonexistent")
@pytest.mark.unit
@pytest.mark.asyncio
class TestTrySingleProvider:
"""Test _try_single_provider helper extracted from complete()."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def _provider(self, name: str = "test", ptype: str = "ollama") -> Provider:
return Provider(
name=name,
type=ptype,
enabled=True,
priority=1,
models=[{"name": "llama3.2", "default": True}],
)
async def test_unavailable_provider_returns_none(self):
router = self._router()
provider = self._provider()
provider.enabled = False
errors: list[str] = []
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_quota_blocked_cloud_provider_returns_none(self):
router = self._router()
provider = self._provider(ptype="anthropic")
errors: list[str] = []
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
mock_qm.check.return_value = None
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_success_returns_result_dict(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "hi", "model": "llama3.2"}
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is not None
assert result["content"] == "hi"
assert result["provider"] == "test"
assert errors == []
async def test_failure_appends_error_and_returns_none(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.side_effect = RuntimeError("boom")
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is None
assert len(errors) == 1
assert "boom" in errors[0]
assert provider.metrics.failed_requests == 1

View File

@@ -6,12 +6,7 @@ import pytest
from infrastructure.presence import (
DEFAULT_PIP_STATE,
_get_agents_online,
_get_familiar_state,
_get_memory_count,
_get_thinking_active,
_get_uptime_seconds,
_get_visitors,
produce_agent_state,
produce_bark,
produce_system_status,
@@ -505,36 +500,3 @@ class TestProduceSystemStatus:
"""produce_system_status always returns a plain dict."""
result = produce_system_status()
assert isinstance(result, dict)
class TestSystemStatusHelpers:
"""Tests for the helper functions extracted from produce_system_status()."""
def test_get_agents_online_returns_int(self):
"""_get_agents_online returns a non-negative int."""
result = _get_agents_online()
assert isinstance(result, int)
assert result >= 0
def test_get_visitors_returns_int(self):
"""_get_visitors returns a non-negative int."""
result = _get_visitors()
assert isinstance(result, int)
assert result >= 0
def test_get_uptime_seconds_returns_int(self):
"""_get_uptime_seconds returns a non-negative int."""
result = _get_uptime_seconds()
assert isinstance(result, int)
assert result >= 0
def test_get_thinking_active_returns_bool(self):
"""_get_thinking_active returns a bool."""
result = _get_thinking_active()
assert isinstance(result, bool)
def test_get_memory_count_returns_int(self):
"""_get_memory_count returns a non-negative int."""
result = _get_memory_count()
assert isinstance(result, int)
assert result >= 0