1
0

Compare commits

...

5 Commits

7 changed files with 750 additions and 230 deletions

View File

@@ -242,6 +242,64 @@ def produce_agent_state(agent_id: str, presence: dict) -> dict:
}
def _get_agents_online() -> int:
"""Return the count of agents with a non-offline status."""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
return sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
return 0
def _get_visitors() -> int:
"""Return the count of active WebSocket visitor clients."""
try:
from dashboard.routes.world import _ws_clients
return len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
return 0
def _get_uptime_seconds() -> int:
"""Return seconds elapsed since application start."""
try:
from config import APP_START_TIME
return int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
return 0
def _get_thinking_active() -> bool:
"""Return True if the thinking engine is enabled and running."""
try:
from config import settings
from timmy.thinking import thinking_engine
return settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
return False
def _get_memory_count() -> int:
"""Return total entries in the vector memory store."""
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
return stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return 0
def produce_system_status() -> dict:
"""Generate a system_status message for the Matrix.
@@ -270,64 +328,14 @@ def produce_system_status() -> dict:
"ts": 1742529600,
}
"""
# Count agents with status != offline
agents_online = 0
try:
from timmy.agents.loader import list_agents
agents = list_agents()
agents_online = sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
except Exception as exc:
logger.debug("Failed to count agents: %s", exc)
# Count visitors from WebSocket clients
visitors = 0
try:
from dashboard.routes.world import _ws_clients
visitors = len(_ws_clients)
except Exception as exc:
logger.debug("Failed to count visitors: %s", exc)
# Calculate uptime
uptime_seconds = 0
try:
from datetime import UTC
from config import APP_START_TIME
uptime_seconds = int((datetime.now(UTC) - APP_START_TIME).total_seconds())
except Exception as exc:
logger.debug("Failed to calculate uptime: %s", exc)
# Check thinking engine status
thinking_active = False
try:
from config import settings
from timmy.thinking import thinking_engine
thinking_active = settings.thinking_enabled and thinking_engine is not None
except Exception as exc:
logger.debug("Failed to check thinking status: %s", exc)
# Count memories in vector store
memory_count = 0
try:
from timmy.memory_system import get_memory_stats
stats = get_memory_stats()
memory_count = stats.get("total_entries", 0)
except Exception as exc:
logger.debug("Failed to count memories: %s", exc)
return {
"type": "system_status",
"data": {
"agents_online": agents_online,
"visitors": visitors,
"uptime_seconds": uptime_seconds,
"thinking_active": thinking_active,
"memory_count": memory_count,
"agents_online": _get_agents_online(),
"visitors": _get_visitors(),
"uptime_seconds": _get_uptime_seconds(),
"thinking_active": _get_thinking_active(),
"memory_count": _get_memory_count(),
},
"ts": int(time.time()),
}

View File

@@ -528,6 +528,71 @@ class CascadeRouter:
return True
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
"""Return the provider list filtered by tier.
Raises:
RuntimeError: If a tier is specified but no matching providers exist.
"""
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
return providers
if cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
return providers
return self.providers
async def _try_single_provider(
self,
provider: "Provider",
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
errors: list[str],
) -> dict | None:
"""Attempt one provider, returning a result dict on success or None on failure.
On failure the error string is appended to *errors* and the provider's
failure metrics are updated so the caller can move on to the next provider.
"""
if not self._is_provider_available(provider):
return None
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
return None
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider, messages, selected_model, temperature, max_tokens, content_type
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
return None
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
async def complete(
self,
messages: list[dict],
@@ -561,55 +626,15 @@ class CascadeRouter:
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
errors = []
providers = self.providers
if cascade_tier == "frontier_required":
providers = [p for p in self.providers if p.type == "anthropic"]
if not providers:
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
elif cascade_tier:
providers = [p for p in self.providers if p.tier == cascade_tier]
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
errors: list[str] = []
providers = self._filter_providers(cascade_tier)
for provider in providers:
if not self._is_provider_available(provider):
continue
# Metabolic protocol: skip cloud providers when quota is low
if provider.type in ("anthropic", "openai", "grok"):
if not self._quota_allows_cloud(provider):
logger.info(
"Metabolic protocol: skipping cloud provider %s (quota too low)",
provider.name,
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
result = await self._try_single_provider(
provider, messages, model, temperature, max_tokens, content_type, errors
)
if result is not None:
return result
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -110,6 +110,92 @@ async def _get_or_create_label(
return None
# ---------------------------------------------------------------------------
# Dispatch action helpers
# ---------------------------------------------------------------------------
async def _apply_label_to_issue(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
label_name: str,
) -> bool:
"""Get-or-create the label then apply it to the issue. Returns True on success."""
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
if label_id is None:
return False
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
async def _post_dispatch_comment(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue: TriagedIssue,
label_name: str,
) -> bool:
"""Post the vassal routing comment. Returns True on success."""
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
return resp.status_code in (200, 201)
async def _perform_gitea_dispatch(
issue: TriagedIssue,
record: DispatchRecord,
) -> None:
"""Apply label and post comment via Gitea. Mutates *record* in-place."""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
return
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
record.label_applied = await _apply_label_to_issue(
client, base_url, headers, repo, issue.number, label_name
)
record.comment_posted = await _post_dispatch_comment(
client, base_url, headers, repo, issue, label_name
)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
@@ -144,58 +230,7 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
_registry[issue.number] = record
return record
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
await _perform_gitea_dispatch(issue, record)
_registry[issue.number] = record
logger.info(

View File

@@ -95,6 +95,106 @@ def _get_config_dir() -> Path:
return DEFAULT_CONFIG_DIR
def _load_daily_run_config() -> dict[str, Any]:
"""Load and validate the daily run configuration."""
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
return config
def _display_schedules_table(schedules: dict[str, Any]) -> None:
"""Display the daily run schedules in a table."""
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
def _display_triggers_table(triggers: dict[str, Any]) -> None:
"""Display the triggers in a table."""
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
def _execute_automation(auto: dict[str, Any], verbose: bool) -> None:
"""Execute a single automation and display results."""
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
return
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
def _execute_all_automations(verbose: bool) -> None:
"""Execute all enabled automations."""
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
return
for auto in enabled:
_execute_automation(auto, verbose)
@app.command()
def daily_run(
dry_run: bool = typer.Option(
@@ -113,93 +213,22 @@ def daily_run(
console.print("[bold green]Timmy Daily Run[/bold green]")
console.print()
config_path = _get_config_dir() / "daily_run.json"
config = _load_json_config(config_path)
if not config:
console.print("[yellow]No daily run configuration found.[/yellow]")
raise typer.Exit(1)
config = _load_daily_run_config()
schedules = config.get("schedules", {})
triggers = config.get("triggers", {})
if verbose:
config_path = _get_config_dir() / "daily_run.json"
console.print(f"[dim]Config loaded from: {config_path}[/dim]")
console.print()
# Show the daily run schedule
table = Table(title="Daily Run Schedules")
table.add_column("Schedule", style="cyan")
table.add_column("Description", style="green")
table.add_column("Automations", style="yellow")
for schedule_name, schedule_data in schedules.items():
automations = schedule_data.get("automations", [])
table.add_row(
schedule_name,
schedule_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(table)
console.print()
# Show triggers
trigger_table = Table(title="Triggers")
trigger_table.add_column("Trigger", style="cyan")
trigger_table.add_column("Description", style="green")
trigger_table.add_column("Automations", style="yellow")
for trigger_name, trigger_data in triggers.items():
automations = trigger_data.get("automations", [])
trigger_table.add_row(
trigger_name,
trigger_data.get("description", ""),
", ".join(automations) if automations else "",
)
console.print(trigger_table)
console.print()
_display_schedules_table(schedules)
_display_triggers_table(triggers)
if dry_run:
console.print("[yellow]Dry run mode — no actions executed.[/yellow]")
else:
console.print("[green]Executing daily run automations...[/green]")
auto_config_path = _get_config_dir() / "automations.json"
auto_config = _load_json_config(auto_config_path)
all_automations = auto_config.get("automations", [])
enabled = [a for a in all_automations if a.get("enabled", False)]
if not enabled:
console.print("[yellow]No enabled automations found.[/yellow]")
for auto in enabled:
cmd = auto.get("command")
name = auto.get("name", auto.get("id", "unnamed"))
if not cmd:
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
continue
console.print(f"[cyan]▶ Running: {name}[/cyan]")
if verbose:
console.print(f"[dim] $ {cmd}[/dim]")
try:
result = subprocess.run( # noqa: S602
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout.strip():
console.print(result.stdout.strip())
if result.returncode != 0:
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
if result.stderr.strip():
console.print(f"[red]{result.stderr.strip()}[/red]")
else:
console.print(f"[green] ✓ {name} completed successfully[/green]")
except subprocess.TimeoutExpired:
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
except Exception as exc:
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
_execute_all_automations(verbose)
@app.command()

View File

@@ -0,0 +1,247 @@
"""Unit tests for src/infrastructure/chat_store.py."""
import sqlite3
import threading
from pathlib import Path
from unittest.mock import patch
import pytest
from src.infrastructure.chat_store import MAX_MESSAGES, Message, MessageLog, _get_conn
pytestmark = pytest.mark.unit
@pytest.fixture()
def tmp_db(tmp_path: Path) -> Path:
"""Return a temporary database path."""
return tmp_path / "test_chat.db"
@pytest.fixture()
def log(tmp_db: Path) -> MessageLog:
"""Return a MessageLog backed by a temp database."""
ml = MessageLog(db_path=tmp_db)
yield ml
ml.close()
# ── Message dataclass ──────────────────────────────────────────────────
class TestMessage:
def test_default_source(self):
m = Message(role="user", content="hi", timestamp="2026-01-01T00:00:00")
assert m.source == "browser"
def test_custom_source(self):
m = Message(role="agent", content="ok", timestamp="t1", source="telegram")
assert m.source == "telegram"
def test_fields(self):
m = Message(role="error", content="boom", timestamp="t2", source="api")
assert m.role == "error"
assert m.content == "boom"
assert m.timestamp == "t2"
# ── _get_conn context manager ──────────────────────────────────────────
class TestGetConn:
def test_creates_db_and_table(self, tmp_db: Path):
with _get_conn(tmp_db) as conn:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
names = [t["name"] for t in tables]
assert "chat_messages" in names
def test_creates_parent_dirs(self, tmp_path: Path):
deep = tmp_path / "a" / "b" / "c" / "chat.db"
with _get_conn(deep) as conn:
assert deep.parent.exists()
def test_connection_closed_after_context(self, tmp_db: Path):
with _get_conn(tmp_db) as conn:
conn.execute("SELECT 1")
# Connection should be closed — operations should fail
with pytest.raises(Exception):
conn.execute("SELECT 1")
# ── MessageLog core operations ─────────────────────────────────────────
class TestMessageLogAppendAndAll:
def test_append_and_all(self, log: MessageLog):
log.append("user", "hello", "t1")
log.append("agent", "hi back", "t2", source="api")
msgs = log.all()
assert len(msgs) == 2
assert msgs[0].role == "user"
assert msgs[0].content == "hello"
assert msgs[0].source == "browser"
assert msgs[1].role == "agent"
assert msgs[1].source == "api"
def test_all_returns_ordered_by_id(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
msgs = log.all()
assert [m.content for m in msgs] == [f"msg{i}" for i in range(5)]
def test_all_empty_store(self, log: MessageLog):
assert log.all() == []
class TestMessageLogRecent:
def test_recent_returns_newest(self, log: MessageLog):
for i in range(10):
log.append("user", f"msg{i}", f"t{i}")
recent = log.recent(limit=3)
assert len(recent) == 3
assert recent[0].content == "msg7"
assert recent[2].content == "msg9"
def test_recent_oldest_first(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
recent = log.recent(limit=3)
# Should be oldest-first within the window
assert recent[0].content == "msg2"
assert recent[1].content == "msg3"
assert recent[2].content == "msg4"
def test_recent_more_than_exists(self, log: MessageLog):
log.append("user", "only", "t0")
recent = log.recent(limit=100)
assert len(recent) == 1
def test_recent_empty_store(self, log: MessageLog):
assert log.recent() == []
class TestMessageLogClear:
def test_clear_removes_all(self, log: MessageLog):
for i in range(5):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 5
log.clear()
assert len(log) == 0
assert log.all() == []
def test_clear_empty_store(self, log: MessageLog):
log.clear() # Should not raise
assert len(log) == 0
class TestMessageLogLen:
def test_len_empty(self, log: MessageLog):
assert len(log) == 0
def test_len_after_appends(self, log: MessageLog):
for i in range(7):
log.append("user", f"msg{i}", f"t{i}")
assert len(log) == 7
class TestMessageLogClose:
def test_close_sets_conn_none(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.append("user", "x", "t0")
ml.close()
assert ml._conn is None
def test_close_idempotent(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.close()
ml.close() # Should not raise
def test_reopen_after_close(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
ml.append("user", "before", "t0")
ml.close()
# Should reconnect on next use
ml.append("user", "after", "t1")
assert len(ml) == 2
ml.close()
# ── Pruning ────────────────────────────────────────────────────────────
class TestPrune:
def test_prune_keeps_max_messages(self, tmp_db: Path):
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 5):
ml = MessageLog(db_path=tmp_db)
for i in range(10):
ml.append("user", f"msg{i}", f"t{i}")
# Should have pruned to 5
assert len(ml) == 5
msgs = ml.all()
# Oldest should be pruned, newest kept
assert msgs[0].content == "msg5"
assert msgs[-1].content == "msg9"
ml.close()
def test_no_prune_under_limit(self, tmp_db: Path):
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 100):
ml = MessageLog(db_path=tmp_db)
for i in range(10):
ml.append("user", f"msg{i}", f"t{i}")
assert len(ml) == 10
ml.close()
# ── Thread safety ──────────────────────────────────────────────────────
class TestThreadSafety:
def test_concurrent_appends(self, tmp_db: Path):
ml = MessageLog(db_path=tmp_db)
errors = []
def writer(start: int):
try:
for i in range(20):
ml.append("user", f"msg{start + i}", f"t{start + i}")
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=writer, args=(i * 20,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Thread errors: {errors}"
assert len(ml) == 100
ml.close()
# ── Edge cases ─────────────────────────────────────────────────────────
class TestEdgeCases:
def test_empty_content(self, log: MessageLog):
log.append("user", "", "t0")
msgs = log.all()
assert len(msgs) == 1
assert msgs[0].content == ""
def test_unicode_content(self, log: MessageLog):
log.append("user", "こんにちは 🎉 مرحبا", "t0")
msgs = log.all()
assert msgs[0].content == "こんにちは 🎉 مرحبا"
def test_multiline_content(self, log: MessageLog):
content = "line1\nline2\nline3"
log.append("user", content, "t0")
assert log.all()[0].content == content
def test_special_sql_characters(self, log: MessageLog):
log.append("user", "Robert'; DROP TABLE chat_messages;--", "t0")
msgs = log.all()
assert len(msgs) == 1
assert "DROP TABLE" in msgs[0].content

View File

@@ -1376,3 +1376,141 @@ class TestIsProviderAvailable:
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
@pytest.mark.unit
class TestFilterProviders:
"""Test _filter_providers helper extracted from complete()."""
def _router(self) -> CascadeRouter:
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(
name="anthropic-p",
type="anthropic",
enabled=True,
priority=1,
api_key="key",
tier="frontier",
),
Provider(
name="ollama-p",
type="ollama",
enabled=True,
priority=2,
tier="local",
),
]
return router
def test_no_tier_returns_all_providers(self):
router = self._router()
result = router._filter_providers(None)
assert result is router.providers
def test_frontier_required_returns_only_anthropic(self):
router = self._router()
result = router._filter_providers("frontier_required")
assert len(result) == 1
assert result[0].type == "anthropic"
def test_frontier_required_no_anthropic_raises(self):
router = CascadeRouter(config_path=Path("/nonexistent"))
router.providers = [
Provider(name="ollama-p", type="ollama", enabled=True, priority=1)
]
with pytest.raises(RuntimeError, match="No Anthropic provider configured"):
router._filter_providers("frontier_required")
def test_named_tier_filters_by_tier(self):
router = self._router()
result = router._filter_providers("local")
assert len(result) == 1
assert result[0].name == "ollama-p"
def test_named_tier_not_found_raises(self):
router = self._router()
with pytest.raises(RuntimeError, match="No providers found for tier"):
router._filter_providers("nonexistent")
@pytest.mark.unit
@pytest.mark.asyncio
class TestTrySingleProvider:
"""Test _try_single_provider helper extracted from complete()."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def _provider(self, name: str = "test", ptype: str = "ollama") -> Provider:
return Provider(
name=name,
type=ptype,
enabled=True,
priority=1,
models=[{"name": "llama3.2", "default": True}],
)
async def test_unavailable_provider_returns_none(self):
router = self._router()
provider = self._provider()
provider.enabled = False
errors: list[str] = []
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_quota_blocked_cloud_provider_returns_none(self):
router = self._router()
provider = self._provider(ptype="anthropic")
errors: list[str] = []
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
mock_qm.check.return_value = None
result = await router._try_single_provider(
provider, [], None, 0.7, None, ContentType.TEXT, errors
)
assert result is None
assert errors == []
async def test_success_returns_result_dict(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "hi", "model": "llama3.2"}
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is not None
assert result["content"] == "hi"
assert result["provider"] == "test"
assert errors == []
async def test_failure_appends_error_and_returns_none(self):
router = self._router()
provider = self._provider()
errors: list[str] = []
with patch.object(router, "_call_ollama") as mock_call:
mock_call.side_effect = RuntimeError("boom")
result = await router._try_single_provider(
provider,
[{"role": "user", "content": "hi"}],
None,
0.7,
None,
ContentType.TEXT,
errors,
)
assert result is None
assert len(errors) == 1
assert "boom" in errors[0]
assert provider.metrics.failed_requests == 1

View File

@@ -6,7 +6,12 @@ import pytest
from infrastructure.presence import (
DEFAULT_PIP_STATE,
_get_agents_online,
_get_familiar_state,
_get_memory_count,
_get_thinking_active,
_get_uptime_seconds,
_get_visitors,
produce_agent_state,
produce_bark,
produce_system_status,
@@ -500,3 +505,36 @@ class TestProduceSystemStatus:
"""produce_system_status always returns a plain dict."""
result = produce_system_status()
assert isinstance(result, dict)
class TestSystemStatusHelpers:
"""Tests for the helper functions extracted from produce_system_status()."""
def test_get_agents_online_returns_int(self):
"""_get_agents_online returns a non-negative int."""
result = _get_agents_online()
assert isinstance(result, int)
assert result >= 0
def test_get_visitors_returns_int(self):
"""_get_visitors returns a non-negative int."""
result = _get_visitors()
assert isinstance(result, int)
assert result >= 0
def test_get_uptime_seconds_returns_int(self):
"""_get_uptime_seconds returns a non-negative int."""
result = _get_uptime_seconds()
assert isinstance(result, int)
assert result >= 0
def test_get_thinking_active_returns_bool(self):
"""_get_thinking_active returns a bool."""
result = _get_thinking_active()
assert isinstance(result, bool)
def test_get_memory_count_returns_int(self):
"""_get_memory_count returns a non-negative int."""
result = _get_memory_count()
assert isinstance(result, int)
assert result >= 0