forked from Rockachopa/Timmy-time-dashboard
Compare commits
1 Commits
test/chat-
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
617ef43f99 |
@@ -242,64 +242,6 @@ def produce_agent_state(agent_id: str, presence: dict) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _get_agents_online() -> int:
|
||||
"""Return the count of agents with a non-offline status."""
|
||||
try:
|
||||
from timmy.agents.loader import list_agents
|
||||
|
||||
agents = list_agents()
|
||||
return sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count agents: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
def _get_visitors() -> int:
|
||||
"""Return the count of active WebSocket visitor clients."""
|
||||
try:
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
return len(_ws_clients)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count visitors: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
def _get_uptime_seconds() -> int:
|
||||
"""Return seconds elapsed since application start."""
|
||||
try:
|
||||
from config import APP_START_TIME
|
||||
|
||||
return int((datetime.now(UTC) - APP_START_TIME).total_seconds())
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate uptime: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
def _get_thinking_active() -> bool:
|
||||
"""Return True if the thinking engine is enabled and running."""
|
||||
try:
|
||||
from config import settings
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
return settings.thinking_enabled and thinking_engine is not None
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to check thinking status: %s", exc)
|
||||
return False
|
||||
|
||||
|
||||
def _get_memory_count() -> int:
|
||||
"""Return total entries in the vector memory store."""
|
||||
try:
|
||||
from timmy.memory_system import get_memory_stats
|
||||
|
||||
stats = get_memory_stats()
|
||||
return stats.get("total_entries", 0)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count memories: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
def produce_system_status() -> dict:
|
||||
"""Generate a system_status message for the Matrix.
|
||||
|
||||
@@ -328,14 +270,64 @@ def produce_system_status() -> dict:
|
||||
"ts": 1742529600,
|
||||
}
|
||||
"""
|
||||
# Count agents with status != offline
|
||||
agents_online = 0
|
||||
try:
|
||||
from timmy.agents.loader import list_agents
|
||||
|
||||
agents = list_agents()
|
||||
agents_online = sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count agents: %s", exc)
|
||||
|
||||
# Count visitors from WebSocket clients
|
||||
visitors = 0
|
||||
try:
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
visitors = len(_ws_clients)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count visitors: %s", exc)
|
||||
|
||||
# Calculate uptime
|
||||
uptime_seconds = 0
|
||||
try:
|
||||
from datetime import UTC
|
||||
|
||||
from config import APP_START_TIME
|
||||
|
||||
uptime_seconds = int((datetime.now(UTC) - APP_START_TIME).total_seconds())
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate uptime: %s", exc)
|
||||
|
||||
# Check thinking engine status
|
||||
thinking_active = False
|
||||
try:
|
||||
from config import settings
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thinking_active = settings.thinking_enabled and thinking_engine is not None
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to check thinking status: %s", exc)
|
||||
|
||||
# Count memories in vector store
|
||||
memory_count = 0
|
||||
try:
|
||||
from timmy.memory_system import get_memory_stats
|
||||
|
||||
stats = get_memory_stats()
|
||||
memory_count = stats.get("total_entries", 0)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count memories: %s", exc)
|
||||
|
||||
return {
|
||||
"type": "system_status",
|
||||
"data": {
|
||||
"agents_online": _get_agents_online(),
|
||||
"visitors": _get_visitors(),
|
||||
"uptime_seconds": _get_uptime_seconds(),
|
||||
"thinking_active": _get_thinking_active(),
|
||||
"memory_count": _get_memory_count(),
|
||||
"agents_online": agents_online,
|
||||
"visitors": visitors,
|
||||
"uptime_seconds": uptime_seconds,
|
||||
"thinking_active": thinking_active,
|
||||
"memory_count": memory_count,
|
||||
},
|
||||
"ts": int(time.time()),
|
||||
}
|
||||
|
||||
@@ -528,71 +528,6 @@ class CascadeRouter:
|
||||
|
||||
return True
|
||||
|
||||
def _filter_providers(self, cascade_tier: str | None) -> list["Provider"]:
|
||||
"""Return the provider list filtered by tier.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If a tier is specified but no matching providers exist.
|
||||
"""
|
||||
if cascade_tier == "frontier_required":
|
||||
providers = [p for p in self.providers if p.type == "anthropic"]
|
||||
if not providers:
|
||||
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
|
||||
return providers
|
||||
if cascade_tier:
|
||||
providers = [p for p in self.providers if p.tier == cascade_tier]
|
||||
if not providers:
|
||||
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
|
||||
return providers
|
||||
return self.providers
|
||||
|
||||
async def _try_single_provider(
|
||||
self,
|
||||
provider: "Provider",
|
||||
messages: list[dict],
|
||||
model: str | None,
|
||||
temperature: float,
|
||||
max_tokens: int | None,
|
||||
content_type: ContentType,
|
||||
errors: list[str],
|
||||
) -> dict | None:
|
||||
"""Attempt one provider, returning a result dict on success or None on failure.
|
||||
|
||||
On failure the error string is appended to *errors* and the provider's
|
||||
failure metrics are updated so the caller can move on to the next provider.
|
||||
"""
|
||||
if not self._is_provider_available(provider):
|
||||
return None
|
||||
|
||||
# Metabolic protocol: skip cloud providers when quota is low
|
||||
if provider.type in ("anthropic", "openai", "grok"):
|
||||
if not self._quota_allows_cloud(provider):
|
||||
logger.info(
|
||||
"Metabolic protocol: skipping cloud provider %s (quota too low)",
|
||||
provider.name,
|
||||
)
|
||||
return None
|
||||
|
||||
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
|
||||
|
||||
try:
|
||||
result = await self._attempt_with_retry(
|
||||
provider, messages, selected_model, temperature, max_tokens, content_type
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
errors.append(str(exc))
|
||||
self._record_failure(provider)
|
||||
return None
|
||||
|
||||
self._record_success(provider, result.get("latency_ms", 0))
|
||||
return {
|
||||
"content": result["content"],
|
||||
"provider": provider.name,
|
||||
"model": result.get("model", selected_model or provider.get_default_model()),
|
||||
"latency_ms": result.get("latency_ms", 0),
|
||||
"is_fallback_model": is_fallback_model,
|
||||
}
|
||||
|
||||
async def complete(
|
||||
self,
|
||||
messages: list[dict],
|
||||
@@ -626,15 +561,55 @@ class CascadeRouter:
|
||||
if content_type != ContentType.TEXT:
|
||||
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
|
||||
|
||||
errors: list[str] = []
|
||||
providers = self._filter_providers(cascade_tier)
|
||||
errors = []
|
||||
|
||||
providers = self.providers
|
||||
if cascade_tier == "frontier_required":
|
||||
providers = [p for p in self.providers if p.type == "anthropic"]
|
||||
if not providers:
|
||||
raise RuntimeError("No Anthropic provider configured for 'frontier_required' tier.")
|
||||
elif cascade_tier:
|
||||
providers = [p for p in self.providers if p.tier == cascade_tier]
|
||||
if not providers:
|
||||
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
|
||||
|
||||
for provider in providers:
|
||||
result = await self._try_single_provider(
|
||||
provider, messages, model, temperature, max_tokens, content_type, errors
|
||||
)
|
||||
if result is not None:
|
||||
return result
|
||||
if not self._is_provider_available(provider):
|
||||
continue
|
||||
|
||||
# Metabolic protocol: skip cloud providers when quota is low
|
||||
if provider.type in ("anthropic", "openai", "grok"):
|
||||
if not self._quota_allows_cloud(provider):
|
||||
logger.info(
|
||||
"Metabolic protocol: skipping cloud provider %s (quota too low)",
|
||||
provider.name,
|
||||
)
|
||||
continue
|
||||
|
||||
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
|
||||
|
||||
try:
|
||||
result = await self._attempt_with_retry(
|
||||
provider,
|
||||
messages,
|
||||
selected_model,
|
||||
temperature,
|
||||
max_tokens,
|
||||
content_type,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
errors.append(str(exc))
|
||||
self._record_failure(provider)
|
||||
continue
|
||||
|
||||
self._record_success(provider, result.get("latency_ms", 0))
|
||||
return {
|
||||
"content": result["content"],
|
||||
"provider": provider.name,
|
||||
"model": result.get("model", selected_model or provider.get_default_model()),
|
||||
"latency_ms": result.get("latency_ms", 0),
|
||||
"is_fallback_model": is_fallback_model,
|
||||
}
|
||||
|
||||
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
|
||||
|
||||
|
||||
@@ -110,92 +110,6 @@ async def _get_or_create_label(
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch action helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _apply_label_to_issue(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict,
|
||||
repo: str,
|
||||
issue_number: int,
|
||||
label_name: str,
|
||||
) -> bool:
|
||||
"""Get-or-create the label then apply it to the issue. Returns True on success."""
|
||||
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
|
||||
if label_id is None:
|
||||
return False
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
|
||||
|
||||
async def _post_dispatch_comment(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict,
|
||||
repo: str,
|
||||
issue: TriagedIssue,
|
||||
label_name: str,
|
||||
) -> bool:
|
||||
"""Post the vassal routing comment. Returns True on success."""
|
||||
agent_name = issue.agent_target.value.capitalize()
|
||||
comment_body = (
|
||||
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
|
||||
f"Priority score: {issue.priority_score} \n"
|
||||
f"Rationale: {issue.rationale} \n"
|
||||
f"Label: `{label_name}`"
|
||||
)
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
|
||||
headers=headers,
|
||||
json={"body": comment_body},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
|
||||
|
||||
async def _perform_gitea_dispatch(
|
||||
issue: TriagedIssue,
|
||||
record: DispatchRecord,
|
||||
) -> None:
|
||||
"""Apply label and post comment via Gitea. Mutates *record* in-place."""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("dispatch_issue: missing dependency — %s", exc)
|
||||
return
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
|
||||
return
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
label_name = _LABEL_MAP[issue.agent_target]
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
record.label_applied = await _apply_label_to_issue(
|
||||
client, base_url, headers, repo, issue.number, label_name
|
||||
)
|
||||
record.comment_posted = await _post_dispatch_comment(
|
||||
client, base_url, headers, repo, issue, label_name
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch action
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -230,7 +144,58 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
await _perform_gitea_dispatch(issue, record)
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("dispatch_issue: missing dependency — %s", exc)
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
label_name = _LABEL_MAP[issue.agent_target]
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
|
||||
|
||||
# Apply label
|
||||
if label_id is not None:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
record.label_applied = resp.status_code in (200, 201)
|
||||
|
||||
# Post routing comment
|
||||
agent_name = issue.agent_target.value.capitalize()
|
||||
comment_body = (
|
||||
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
|
||||
f"Priority score: {issue.priority_score} \n"
|
||||
f"Rationale: {issue.rationale} \n"
|
||||
f"Label: `{label_name}`"
|
||||
)
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
|
||||
headers=headers,
|
||||
json={"body": comment_body},
|
||||
)
|
||||
record.comment_posted = resp.status_code in (200, 201)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
|
||||
|
||||
_registry[issue.number] = record
|
||||
logger.info(
|
||||
|
||||
@@ -95,106 +95,6 @@ def _get_config_dir() -> Path:
|
||||
return DEFAULT_CONFIG_DIR
|
||||
|
||||
|
||||
def _load_daily_run_config() -> dict[str, Any]:
|
||||
"""Load and validate the daily run configuration."""
|
||||
config_path = _get_config_dir() / "daily_run.json"
|
||||
config = _load_json_config(config_path)
|
||||
|
||||
if not config:
|
||||
console.print("[yellow]No daily run configuration found.[/yellow]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def _display_schedules_table(schedules: dict[str, Any]) -> None:
|
||||
"""Display the daily run schedules in a table."""
|
||||
table = Table(title="Daily Run Schedules")
|
||||
table.add_column("Schedule", style="cyan")
|
||||
table.add_column("Description", style="green")
|
||||
table.add_column("Automations", style="yellow")
|
||||
|
||||
for schedule_name, schedule_data in schedules.items():
|
||||
automations = schedule_data.get("automations", [])
|
||||
table.add_row(
|
||||
schedule_name,
|
||||
schedule_data.get("description", ""),
|
||||
", ".join(automations) if automations else "—",
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
console.print()
|
||||
|
||||
|
||||
def _display_triggers_table(triggers: dict[str, Any]) -> None:
|
||||
"""Display the triggers in a table."""
|
||||
trigger_table = Table(title="Triggers")
|
||||
trigger_table.add_column("Trigger", style="cyan")
|
||||
trigger_table.add_column("Description", style="green")
|
||||
trigger_table.add_column("Automations", style="yellow")
|
||||
|
||||
for trigger_name, trigger_data in triggers.items():
|
||||
automations = trigger_data.get("automations", [])
|
||||
trigger_table.add_row(
|
||||
trigger_name,
|
||||
trigger_data.get("description", ""),
|
||||
", ".join(automations) if automations else "—",
|
||||
)
|
||||
|
||||
console.print(trigger_table)
|
||||
console.print()
|
||||
|
||||
|
||||
def _execute_automation(auto: dict[str, Any], verbose: bool) -> None:
|
||||
"""Execute a single automation and display results."""
|
||||
cmd = auto.get("command")
|
||||
name = auto.get("name", auto.get("id", "unnamed"))
|
||||
if not cmd:
|
||||
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
|
||||
return
|
||||
|
||||
console.print(f"[cyan]▶ Running: {name}[/cyan]")
|
||||
if verbose:
|
||||
console.print(f"[dim] $ {cmd}[/dim]")
|
||||
|
||||
try:
|
||||
result = subprocess.run( # noqa: S602
|
||||
cmd,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
if result.stdout.strip():
|
||||
console.print(result.stdout.strip())
|
||||
if result.returncode != 0:
|
||||
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
|
||||
if result.stderr.strip():
|
||||
console.print(f"[red]{result.stderr.strip()}[/red]")
|
||||
else:
|
||||
console.print(f"[green] ✓ {name} completed successfully[/green]")
|
||||
except subprocess.TimeoutExpired:
|
||||
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
|
||||
except Exception as exc:
|
||||
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
|
||||
|
||||
|
||||
def _execute_all_automations(verbose: bool) -> None:
|
||||
"""Execute all enabled automations."""
|
||||
console.print("[green]Executing daily run automations...[/green]")
|
||||
auto_config_path = _get_config_dir() / "automations.json"
|
||||
auto_config = _load_json_config(auto_config_path)
|
||||
all_automations = auto_config.get("automations", [])
|
||||
enabled = [a for a in all_automations if a.get("enabled", False)]
|
||||
|
||||
if not enabled:
|
||||
console.print("[yellow]No enabled automations found.[/yellow]")
|
||||
return
|
||||
|
||||
for auto in enabled:
|
||||
_execute_automation(auto, verbose)
|
||||
|
||||
|
||||
@app.command()
|
||||
def daily_run(
|
||||
dry_run: bool = typer.Option(
|
||||
@@ -213,22 +113,93 @@ def daily_run(
|
||||
console.print("[bold green]Timmy Daily Run[/bold green]")
|
||||
console.print()
|
||||
|
||||
config = _load_daily_run_config()
|
||||
config_path = _get_config_dir() / "daily_run.json"
|
||||
config = _load_json_config(config_path)
|
||||
|
||||
if not config:
|
||||
console.print("[yellow]No daily run configuration found.[/yellow]")
|
||||
raise typer.Exit(1)
|
||||
|
||||
schedules = config.get("schedules", {})
|
||||
triggers = config.get("triggers", {})
|
||||
|
||||
if verbose:
|
||||
config_path = _get_config_dir() / "daily_run.json"
|
||||
console.print(f"[dim]Config loaded from: {config_path}[/dim]")
|
||||
console.print()
|
||||
|
||||
_display_schedules_table(schedules)
|
||||
_display_triggers_table(triggers)
|
||||
# Show the daily run schedule
|
||||
table = Table(title="Daily Run Schedules")
|
||||
table.add_column("Schedule", style="cyan")
|
||||
table.add_column("Description", style="green")
|
||||
table.add_column("Automations", style="yellow")
|
||||
|
||||
for schedule_name, schedule_data in schedules.items():
|
||||
automations = schedule_data.get("automations", [])
|
||||
table.add_row(
|
||||
schedule_name,
|
||||
schedule_data.get("description", ""),
|
||||
", ".join(automations) if automations else "—",
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
console.print()
|
||||
|
||||
# Show triggers
|
||||
trigger_table = Table(title="Triggers")
|
||||
trigger_table.add_column("Trigger", style="cyan")
|
||||
trigger_table.add_column("Description", style="green")
|
||||
trigger_table.add_column("Automations", style="yellow")
|
||||
|
||||
for trigger_name, trigger_data in triggers.items():
|
||||
automations = trigger_data.get("automations", [])
|
||||
trigger_table.add_row(
|
||||
trigger_name,
|
||||
trigger_data.get("description", ""),
|
||||
", ".join(automations) if automations else "—",
|
||||
)
|
||||
|
||||
console.print(trigger_table)
|
||||
console.print()
|
||||
|
||||
if dry_run:
|
||||
console.print("[yellow]Dry run mode — no actions executed.[/yellow]")
|
||||
else:
|
||||
_execute_all_automations(verbose)
|
||||
console.print("[green]Executing daily run automations...[/green]")
|
||||
auto_config_path = _get_config_dir() / "automations.json"
|
||||
auto_config = _load_json_config(auto_config_path)
|
||||
all_automations = auto_config.get("automations", [])
|
||||
enabled = [a for a in all_automations if a.get("enabled", False)]
|
||||
if not enabled:
|
||||
console.print("[yellow]No enabled automations found.[/yellow]")
|
||||
for auto in enabled:
|
||||
cmd = auto.get("command")
|
||||
name = auto.get("name", auto.get("id", "unnamed"))
|
||||
if not cmd:
|
||||
console.print(f"[yellow]Skipping {name} — no command defined.[/yellow]")
|
||||
continue
|
||||
console.print(f"[cyan]▶ Running: {name}[/cyan]")
|
||||
if verbose:
|
||||
console.print(f"[dim] $ {cmd}[/dim]")
|
||||
try:
|
||||
result = subprocess.run( # noqa: S602
|
||||
cmd,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
if result.stdout.strip():
|
||||
console.print(result.stdout.strip())
|
||||
if result.returncode != 0:
|
||||
console.print(f"[red] ✗ {name} exited with code {result.returncode}[/red]")
|
||||
if result.stderr.strip():
|
||||
console.print(f"[red]{result.stderr.strip()}[/red]")
|
||||
else:
|
||||
console.print(f"[green] ✓ {name} completed successfully[/green]")
|
||||
except subprocess.TimeoutExpired:
|
||||
console.print(f"[red] ✗ {name} timed out after 120s[/red]")
|
||||
except Exception as exc:
|
||||
console.print(f"[red] ✗ {name} failed: {exc}[/red]")
|
||||
|
||||
|
||||
@app.command()
|
||||
|
||||
530
tests/dashboard/test_daily_run.py
Normal file
530
tests/dashboard/test_daily_run.py
Normal file
@@ -0,0 +1,530 @@
|
||||
"""Unit tests for dashboard/routes/daily_run.py."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
import pytest
|
||||
|
||||
from dashboard.routes.daily_run import (
|
||||
DEFAULT_CONFIG,
|
||||
LAYER_LABELS,
|
||||
DailyRunMetrics,
|
||||
GiteaClient,
|
||||
LayerMetrics,
|
||||
_extract_layer,
|
||||
_fetch_layer_metrics,
|
||||
_get_metrics,
|
||||
_get_token,
|
||||
_load_config,
|
||||
_load_cycle_data,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _load_config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_load_config_returns_defaults():
|
||||
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
|
||||
mock_path.exists.return_value = False
|
||||
config = _load_config()
|
||||
assert config["gitea_api"] == DEFAULT_CONFIG["gitea_api"]
|
||||
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
|
||||
|
||||
|
||||
def test_load_config_merges_file_orchestrator_section(tmp_path):
|
||||
config_file = tmp_path / "daily_run.json"
|
||||
config_file.write_text(
|
||||
json.dumps({"orchestrator": {"repo_slug": "custom/repo", "gitea_api": "http://custom:3000/api/v1"}})
|
||||
)
|
||||
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
|
||||
config = _load_config()
|
||||
assert config["repo_slug"] == "custom/repo"
|
||||
assert config["gitea_api"] == "http://custom:3000/api/v1"
|
||||
|
||||
|
||||
def test_load_config_ignores_invalid_json(tmp_path):
|
||||
config_file = tmp_path / "daily_run.json"
|
||||
config_file.write_text("not valid json{{")
|
||||
with patch("dashboard.routes.daily_run.CONFIG_PATH", config_file):
|
||||
config = _load_config()
|
||||
assert config["repo_slug"] == DEFAULT_CONFIG["repo_slug"]
|
||||
|
||||
|
||||
def test_load_config_env_overrides(monkeypatch):
|
||||
monkeypatch.setenv("TIMMY_GITEA_API", "http://envapi:3000/api/v1")
|
||||
monkeypatch.setenv("TIMMY_REPO_SLUG", "env/repo")
|
||||
monkeypatch.setenv("TIMMY_GITEA_TOKEN", "env-token-123")
|
||||
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
|
||||
mock_path.exists.return_value = False
|
||||
config = _load_config()
|
||||
assert config["gitea_api"] == "http://envapi:3000/api/v1"
|
||||
assert config["repo_slug"] == "env/repo"
|
||||
assert config["token"] == "env-token-123"
|
||||
|
||||
|
||||
def test_load_config_no_env_overrides_without_vars(monkeypatch):
|
||||
monkeypatch.delenv("TIMMY_GITEA_API", raising=False)
|
||||
monkeypatch.delenv("TIMMY_REPO_SLUG", raising=False)
|
||||
monkeypatch.delenv("TIMMY_GITEA_TOKEN", raising=False)
|
||||
with patch("dashboard.routes.daily_run.CONFIG_PATH") as mock_path:
|
||||
mock_path.exists.return_value = False
|
||||
config = _load_config()
|
||||
assert "token" not in config
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_token
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_token_from_config_dict():
|
||||
config = {"token": "direct-token", "token_file": "~/.hermes/gitea_token"}
|
||||
assert _get_token(config) == "direct-token"
|
||||
|
||||
|
||||
def test_get_token_from_file(tmp_path):
|
||||
token_file = tmp_path / "token.txt"
|
||||
token_file.write_text(" file-token \n")
|
||||
config = {"token_file": str(token_file)}
|
||||
assert _get_token(config) == "file-token"
|
||||
|
||||
|
||||
def test_get_token_returns_none_when_file_missing(tmp_path):
|
||||
config = {"token_file": str(tmp_path / "nonexistent_token")}
|
||||
assert _get_token(config) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GiteaClient
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_client(**kwargs) -> GiteaClient:
|
||||
config = {**DEFAULT_CONFIG, **kwargs}
|
||||
return GiteaClient(config, token="test-token")
|
||||
|
||||
|
||||
def test_gitea_client_headers_include_auth():
|
||||
client = _make_client()
|
||||
headers = client._headers()
|
||||
assert headers["Authorization"] == "token test-token"
|
||||
assert headers["Accept"] == "application/json"
|
||||
|
||||
|
||||
def test_gitea_client_headers_no_token():
|
||||
config = {**DEFAULT_CONFIG}
|
||||
client = GiteaClient(config, token=None)
|
||||
headers = client._headers()
|
||||
assert "Authorization" not in headers
|
||||
|
||||
|
||||
def test_gitea_client_api_url():
|
||||
client = _make_client()
|
||||
url = client._api_url("issues")
|
||||
assert url == f"{DEFAULT_CONFIG['gitea_api']}/repos/{DEFAULT_CONFIG['repo_slug']}/issues"
|
||||
|
||||
|
||||
def test_gitea_client_api_url_strips_trailing_slash():
|
||||
config = {**DEFAULT_CONFIG, "gitea_api": "http://localhost:3000/api/v1/"}
|
||||
client = GiteaClient(config, token=None)
|
||||
url = client._api_url("issues")
|
||||
assert "//" not in url.replace("http://", "")
|
||||
|
||||
|
||||
def test_gitea_client_is_available_true():
|
||||
client = _make_client()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_resp.__enter__ = lambda s: mock_resp
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
|
||||
assert client.is_available() is True
|
||||
|
||||
|
||||
def test_gitea_client_is_available_cached():
|
||||
client = _make_client()
|
||||
client._available = True
|
||||
# Should not call urlopen at all
|
||||
with patch("dashboard.routes.daily_run.urlopen") as mock_urlopen:
|
||||
assert client.is_available() is True
|
||||
mock_urlopen.assert_not_called()
|
||||
|
||||
|
||||
def test_gitea_client_is_available_false_on_url_error():
|
||||
client = _make_client()
|
||||
with patch("dashboard.routes.daily_run.urlopen", side_effect=URLError("refused")):
|
||||
assert client.is_available() is False
|
||||
|
||||
|
||||
def test_gitea_client_is_available_false_on_timeout():
|
||||
client = _make_client()
|
||||
with patch("dashboard.routes.daily_run.urlopen", side_effect=TimeoutError()):
|
||||
assert client.is_available() is False
|
||||
|
||||
|
||||
def test_gitea_client_get_paginated_single_page():
|
||||
client = _make_client()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = json.dumps([{"id": 1}, {"id": 2}]).encode()
|
||||
mock_resp.__enter__ = lambda s: mock_resp
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
|
||||
result = client.get_paginated("issues")
|
||||
assert len(result) == 2
|
||||
assert result[0]["id"] == 1
|
||||
|
||||
|
||||
def test_gitea_client_get_paginated_empty():
|
||||
client = _make_client()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = b"[]"
|
||||
mock_resp.__enter__ = lambda s: mock_resp
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
with patch("dashboard.routes.daily_run.urlopen", return_value=mock_resp):
|
||||
result = client.get_paginated("issues")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LayerMetrics.trend
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_layer_metrics_trend_no_previous_no_current():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=0, previous_count=0)
|
||||
assert lm.trend == "→"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_no_previous_with_current():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=0)
|
||||
assert lm.trend == "↑"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_big_increase():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=130, previous_count=100)
|
||||
assert lm.trend == "↑↑"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_small_increase():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=108, previous_count=100)
|
||||
assert lm.trend == "↑"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_stable():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
|
||||
assert lm.trend == "→"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_small_decrease():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=92, previous_count=100)
|
||||
assert lm.trend == "↓"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_big_decrease():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=70, previous_count=100)
|
||||
assert lm.trend == "↓↓"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_color_up():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=200, previous_count=100)
|
||||
assert lm.trend_color == "var(--green)"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_color_down():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=50, previous_count=100)
|
||||
assert lm.trend_color == "var(--amber)"
|
||||
|
||||
|
||||
def test_layer_metrics_trend_color_stable():
|
||||
lm = LayerMetrics(name="triage", label="layer:triage", current_count=100, previous_count=100)
|
||||
assert lm.trend_color == "var(--text-dim)"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DailyRunMetrics.sessions_trend
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_daily_metrics(**kwargs) -> DailyRunMetrics:
|
||||
defaults = dict(
|
||||
sessions_completed=10,
|
||||
sessions_previous=8,
|
||||
layers=[],
|
||||
total_touched_current=20,
|
||||
total_touched_previous=15,
|
||||
lookback_days=7,
|
||||
generated_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
defaults.update(kwargs)
|
||||
return DailyRunMetrics(**defaults)
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_big_increase():
|
||||
m = _make_daily_metrics(sessions_completed=130, sessions_previous=100)
|
||||
assert m.sessions_trend == "↑↑"
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_stable():
|
||||
m = _make_daily_metrics(sessions_completed=100, sessions_previous=100)
|
||||
assert m.sessions_trend == "→"
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_no_previous_zero_completed():
|
||||
m = _make_daily_metrics(sessions_completed=0, sessions_previous=0)
|
||||
assert m.sessions_trend == "→"
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_no_previous_with_completed():
|
||||
m = _make_daily_metrics(sessions_completed=5, sessions_previous=0)
|
||||
assert m.sessions_trend == "↑"
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_color_green():
|
||||
m = _make_daily_metrics(sessions_completed=200, sessions_previous=100)
|
||||
assert m.sessions_trend_color == "var(--green)"
|
||||
|
||||
|
||||
def test_daily_metrics_sessions_trend_color_amber():
|
||||
m = _make_daily_metrics(sessions_completed=50, sessions_previous=100)
|
||||
assert m.sessions_trend_color == "var(--amber)"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_layer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_extract_layer_finds_layer_label():
|
||||
labels = [{"name": "bug"}, {"name": "layer:triage"}, {"name": "urgent"}]
|
||||
assert _extract_layer(labels) == "triage"
|
||||
|
||||
|
||||
def test_extract_layer_returns_none_when_no_layer():
|
||||
labels = [{"name": "bug"}, {"name": "feature"}]
|
||||
assert _extract_layer(labels) is None
|
||||
|
||||
|
||||
def test_extract_layer_empty_labels():
|
||||
assert _extract_layer([]) is None
|
||||
|
||||
|
||||
def test_extract_layer_first_match_wins():
|
||||
labels = [{"name": "layer:micro-fix"}, {"name": "layer:tests"}]
|
||||
assert _extract_layer(labels) == "micro-fix"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _load_cycle_data
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_load_cycle_data_missing_file(tmp_path):
|
||||
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
|
||||
result = _load_cycle_data(days=14)
|
||||
assert result == {"current": 0, "previous": 0}
|
||||
|
||||
|
||||
def test_load_cycle_data_counts_successful_sessions(tmp_path):
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
retro_file = retro_dir / "cycles.jsonl"
|
||||
|
||||
now = datetime.now(UTC)
|
||||
recent_ts = (now - timedelta(days=3)).isoformat()
|
||||
older_ts = (now - timedelta(days=10)).isoformat()
|
||||
old_ts = (now - timedelta(days=20)).isoformat()
|
||||
|
||||
lines = [
|
||||
json.dumps({"timestamp": recent_ts, "success": True}),
|
||||
json.dumps({"timestamp": recent_ts, "success": False}), # not counted
|
||||
json.dumps({"timestamp": older_ts, "success": True}),
|
||||
json.dumps({"timestamp": old_ts, "success": True}), # outside window
|
||||
]
|
||||
retro_file.write_text("\n".join(lines))
|
||||
|
||||
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
|
||||
result = _load_cycle_data(days=7)
|
||||
|
||||
assert result["current"] == 1
|
||||
assert result["previous"] == 1
|
||||
|
||||
|
||||
def test_load_cycle_data_skips_invalid_json_lines(tmp_path):
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
retro_file = retro_dir / "cycles.jsonl"
|
||||
|
||||
now = datetime.now(UTC)
|
||||
recent_ts = (now - timedelta(days=1)).isoformat()
|
||||
retro_file.write_text(
|
||||
f'not valid json\n{json.dumps({"timestamp": recent_ts, "success": True})}\n'
|
||||
)
|
||||
|
||||
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
|
||||
result = _load_cycle_data(days=7)
|
||||
|
||||
assert result["current"] == 1
|
||||
|
||||
|
||||
def test_load_cycle_data_skips_entries_with_no_timestamp(tmp_path):
|
||||
retro_dir = tmp_path / ".loop" / "retro"
|
||||
retro_dir.mkdir(parents=True)
|
||||
retro_file = retro_dir / "cycles.jsonl"
|
||||
retro_file.write_text(json.dumps({"success": True}))
|
||||
|
||||
with patch("dashboard.routes.daily_run.REPO_ROOT", tmp_path):
|
||||
result = _load_cycle_data(days=7)
|
||||
|
||||
assert result == {"current": 0, "previous": 0}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _fetch_layer_metrics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_issue(updated_offset_days: int) -> dict:
|
||||
ts = (datetime.now(UTC) - timedelta(days=updated_offset_days)).isoformat()
|
||||
return {"updated_at": ts, "labels": [{"name": "layer:triage"}]}
|
||||
|
||||
|
||||
def test_fetch_layer_metrics_counts_current_and_previous():
|
||||
client = _make_client()
|
||||
client._available = True
|
||||
|
||||
recent_issue = _make_issue(updated_offset_days=3)
|
||||
older_issue = _make_issue(updated_offset_days=10)
|
||||
|
||||
with patch.object(client, "get_paginated", return_value=[recent_issue, older_issue]):
|
||||
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
|
||||
|
||||
# Should have one entry per LAYER_LABELS
|
||||
assert len(layers) == len(LAYER_LABELS)
|
||||
triage = next(lm for lm in layers if lm.name == "triage")
|
||||
assert triage.current_count == 1
|
||||
assert triage.previous_count == 1
|
||||
|
||||
|
||||
def test_fetch_layer_metrics_degrades_on_http_error():
|
||||
client = _make_client()
|
||||
client._available = True
|
||||
|
||||
with patch.object(client, "get_paginated", side_effect=URLError("network")):
|
||||
layers, total_current, total_previous = _fetch_layer_metrics(client, lookback_days=7)
|
||||
|
||||
assert len(layers) == len(LAYER_LABELS)
|
||||
for lm in layers:
|
||||
assert lm.current_count == 0
|
||||
assert lm.previous_count == 0
|
||||
assert total_current == 0
|
||||
assert total_previous == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_metrics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_metrics_returns_none_when_gitea_unavailable():
|
||||
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
|
||||
with patch("dashboard.routes.daily_run._get_token", return_value=None):
|
||||
with patch.object(GiteaClient, "is_available", return_value=False):
|
||||
result = _get_metrics()
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_get_metrics_returns_daily_run_metrics():
|
||||
mock_layers = [
|
||||
LayerMetrics(name="triage", label="layer:triage", current_count=5, previous_count=3)
|
||||
]
|
||||
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
|
||||
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
|
||||
with patch.object(GiteaClient, "is_available", return_value=True):
|
||||
with patch(
|
||||
"dashboard.routes.daily_run._fetch_layer_metrics",
|
||||
return_value=(mock_layers, 5, 3),
|
||||
):
|
||||
with patch(
|
||||
"dashboard.routes.daily_run._load_cycle_data",
|
||||
return_value={"current": 10, "previous": 8},
|
||||
):
|
||||
result = _get_metrics(lookback_days=7)
|
||||
|
||||
assert result is not None
|
||||
assert result.sessions_completed == 10
|
||||
assert result.sessions_previous == 8
|
||||
assert result.lookback_days == 7
|
||||
assert result.layers == mock_layers
|
||||
|
||||
|
||||
def test_get_metrics_returns_none_on_exception():
|
||||
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
|
||||
with patch("dashboard.routes.daily_run._get_token", return_value="tok"):
|
||||
with patch.object(GiteaClient, "is_available", return_value=True):
|
||||
with patch(
|
||||
"dashboard.routes.daily_run._fetch_layer_metrics",
|
||||
side_effect=Exception("unexpected"),
|
||||
):
|
||||
result = _get_metrics()
|
||||
assert result is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Route handlers (FastAPI)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_daily_run_metrics_api_unavailable(client):
|
||||
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
|
||||
resp = client.get("/daily-run/metrics")
|
||||
assert resp.status_code == 503
|
||||
data = resp.json()
|
||||
assert data["status"] == "unavailable"
|
||||
|
||||
|
||||
def test_daily_run_metrics_api_returns_json(client):
|
||||
mock_metrics = _make_daily_metrics(
|
||||
layers=[
|
||||
LayerMetrics(name="triage", label="layer:triage", current_count=3, previous_count=2)
|
||||
]
|
||||
)
|
||||
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
|
||||
with patch(
|
||||
"dashboard.routes.quests.check_daily_run_quests",
|
||||
return_value=[],
|
||||
create=True,
|
||||
):
|
||||
resp = client.get("/daily-run/metrics?lookback_days=7")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "ok"
|
||||
assert data["lookback_days"] == 7
|
||||
assert "sessions" in data
|
||||
assert "layers" in data
|
||||
assert "totals" in data
|
||||
assert len(data["layers"]) == 1
|
||||
assert data["layers"][0]["name"] == "triage"
|
||||
|
||||
|
||||
def test_daily_run_panel_returns_html(client):
|
||||
mock_metrics = _make_daily_metrics()
|
||||
with patch("dashboard.routes.daily_run._get_metrics", return_value=mock_metrics):
|
||||
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
|
||||
resp = client.get("/daily-run/panel")
|
||||
assert resp.status_code == 200
|
||||
assert "text/html" in resp.headers["content-type"]
|
||||
|
||||
|
||||
def test_daily_run_panel_when_unavailable(client):
|
||||
with patch("dashboard.routes.daily_run._get_metrics", return_value=None):
|
||||
with patch("dashboard.routes.daily_run._load_config", return_value=DEFAULT_CONFIG):
|
||||
resp = client.get("/daily-run/panel")
|
||||
assert resp.status_code == 200
|
||||
@@ -1,247 +0,0 @@
|
||||
"""Unit tests for src/infrastructure/chat_store.py."""
|
||||
|
||||
import sqlite3
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from src.infrastructure.chat_store import MAX_MESSAGES, Message, MessageLog, _get_conn
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def tmp_db(tmp_path: Path) -> Path:
|
||||
"""Return a temporary database path."""
|
||||
return tmp_path / "test_chat.db"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def log(tmp_db: Path) -> MessageLog:
|
||||
"""Return a MessageLog backed by a temp database."""
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
yield ml
|
||||
ml.close()
|
||||
|
||||
|
||||
# ── Message dataclass ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMessage:
|
||||
def test_default_source(self):
|
||||
m = Message(role="user", content="hi", timestamp="2026-01-01T00:00:00")
|
||||
assert m.source == "browser"
|
||||
|
||||
def test_custom_source(self):
|
||||
m = Message(role="agent", content="ok", timestamp="t1", source="telegram")
|
||||
assert m.source == "telegram"
|
||||
|
||||
def test_fields(self):
|
||||
m = Message(role="error", content="boom", timestamp="t2", source="api")
|
||||
assert m.role == "error"
|
||||
assert m.content == "boom"
|
||||
assert m.timestamp == "t2"
|
||||
|
||||
|
||||
# ── _get_conn context manager ──────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetConn:
|
||||
def test_creates_db_and_table(self, tmp_db: Path):
|
||||
with _get_conn(tmp_db) as conn:
|
||||
tables = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||
).fetchall()
|
||||
names = [t["name"] for t in tables]
|
||||
assert "chat_messages" in names
|
||||
|
||||
def test_creates_parent_dirs(self, tmp_path: Path):
|
||||
deep = tmp_path / "a" / "b" / "c" / "chat.db"
|
||||
with _get_conn(deep) as conn:
|
||||
assert deep.parent.exists()
|
||||
|
||||
def test_connection_closed_after_context(self, tmp_db: Path):
|
||||
with _get_conn(tmp_db) as conn:
|
||||
conn.execute("SELECT 1")
|
||||
# Connection should be closed — operations should fail
|
||||
with pytest.raises(Exception):
|
||||
conn.execute("SELECT 1")
|
||||
|
||||
|
||||
# ── MessageLog core operations ─────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMessageLogAppendAndAll:
|
||||
def test_append_and_all(self, log: MessageLog):
|
||||
log.append("user", "hello", "t1")
|
||||
log.append("agent", "hi back", "t2", source="api")
|
||||
msgs = log.all()
|
||||
assert len(msgs) == 2
|
||||
assert msgs[0].role == "user"
|
||||
assert msgs[0].content == "hello"
|
||||
assert msgs[0].source == "browser"
|
||||
assert msgs[1].role == "agent"
|
||||
assert msgs[1].source == "api"
|
||||
|
||||
def test_all_returns_ordered_by_id(self, log: MessageLog):
|
||||
for i in range(5):
|
||||
log.append("user", f"msg{i}", f"t{i}")
|
||||
msgs = log.all()
|
||||
assert [m.content for m in msgs] == [f"msg{i}" for i in range(5)]
|
||||
|
||||
def test_all_empty_store(self, log: MessageLog):
|
||||
assert log.all() == []
|
||||
|
||||
|
||||
class TestMessageLogRecent:
|
||||
def test_recent_returns_newest(self, log: MessageLog):
|
||||
for i in range(10):
|
||||
log.append("user", f"msg{i}", f"t{i}")
|
||||
recent = log.recent(limit=3)
|
||||
assert len(recent) == 3
|
||||
assert recent[0].content == "msg7"
|
||||
assert recent[2].content == "msg9"
|
||||
|
||||
def test_recent_oldest_first(self, log: MessageLog):
|
||||
for i in range(5):
|
||||
log.append("user", f"msg{i}", f"t{i}")
|
||||
recent = log.recent(limit=3)
|
||||
# Should be oldest-first within the window
|
||||
assert recent[0].content == "msg2"
|
||||
assert recent[1].content == "msg3"
|
||||
assert recent[2].content == "msg4"
|
||||
|
||||
def test_recent_more_than_exists(self, log: MessageLog):
|
||||
log.append("user", "only", "t0")
|
||||
recent = log.recent(limit=100)
|
||||
assert len(recent) == 1
|
||||
|
||||
def test_recent_empty_store(self, log: MessageLog):
|
||||
assert log.recent() == []
|
||||
|
||||
|
||||
class TestMessageLogClear:
|
||||
def test_clear_removes_all(self, log: MessageLog):
|
||||
for i in range(5):
|
||||
log.append("user", f"msg{i}", f"t{i}")
|
||||
assert len(log) == 5
|
||||
log.clear()
|
||||
assert len(log) == 0
|
||||
assert log.all() == []
|
||||
|
||||
def test_clear_empty_store(self, log: MessageLog):
|
||||
log.clear() # Should not raise
|
||||
assert len(log) == 0
|
||||
|
||||
|
||||
class TestMessageLogLen:
|
||||
def test_len_empty(self, log: MessageLog):
|
||||
assert len(log) == 0
|
||||
|
||||
def test_len_after_appends(self, log: MessageLog):
|
||||
for i in range(7):
|
||||
log.append("user", f"msg{i}", f"t{i}")
|
||||
assert len(log) == 7
|
||||
|
||||
|
||||
class TestMessageLogClose:
|
||||
def test_close_sets_conn_none(self, tmp_db: Path):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
ml.append("user", "x", "t0")
|
||||
ml.close()
|
||||
assert ml._conn is None
|
||||
|
||||
def test_close_idempotent(self, tmp_db: Path):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
ml.close()
|
||||
ml.close() # Should not raise
|
||||
|
||||
def test_reopen_after_close(self, tmp_db: Path):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
ml.append("user", "before", "t0")
|
||||
ml.close()
|
||||
# Should reconnect on next use
|
||||
ml.append("user", "after", "t1")
|
||||
assert len(ml) == 2
|
||||
ml.close()
|
||||
|
||||
|
||||
# ── Pruning ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPrune:
|
||||
def test_prune_keeps_max_messages(self, tmp_db: Path):
|
||||
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 5):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
for i in range(10):
|
||||
ml.append("user", f"msg{i}", f"t{i}")
|
||||
# Should have pruned to 5
|
||||
assert len(ml) == 5
|
||||
msgs = ml.all()
|
||||
# Oldest should be pruned, newest kept
|
||||
assert msgs[0].content == "msg5"
|
||||
assert msgs[-1].content == "msg9"
|
||||
ml.close()
|
||||
|
||||
def test_no_prune_under_limit(self, tmp_db: Path):
|
||||
with patch("src.infrastructure.chat_store.MAX_MESSAGES", 100):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
for i in range(10):
|
||||
ml.append("user", f"msg{i}", f"t{i}")
|
||||
assert len(ml) == 10
|
||||
ml.close()
|
||||
|
||||
|
||||
# ── Thread safety ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestThreadSafety:
|
||||
def test_concurrent_appends(self, tmp_db: Path):
|
||||
ml = MessageLog(db_path=tmp_db)
|
||||
errors = []
|
||||
|
||||
def writer(start: int):
|
||||
try:
|
||||
for i in range(20):
|
||||
ml.append("user", f"msg{start + i}", f"t{start + i}")
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=writer, args=(i * 20,)) for i in range(5)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert not errors, f"Thread errors: {errors}"
|
||||
assert len(ml) == 100
|
||||
ml.close()
|
||||
|
||||
|
||||
# ── Edge cases ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_content(self, log: MessageLog):
|
||||
log.append("user", "", "t0")
|
||||
msgs = log.all()
|
||||
assert len(msgs) == 1
|
||||
assert msgs[0].content == ""
|
||||
|
||||
def test_unicode_content(self, log: MessageLog):
|
||||
log.append("user", "こんにちは 🎉 مرحبا", "t0")
|
||||
msgs = log.all()
|
||||
assert msgs[0].content == "こんにちは 🎉 مرحبا"
|
||||
|
||||
def test_multiline_content(self, log: MessageLog):
|
||||
content = "line1\nline2\nline3"
|
||||
log.append("user", content, "t0")
|
||||
assert log.all()[0].content == content
|
||||
|
||||
def test_special_sql_characters(self, log: MessageLog):
|
||||
log.append("user", "Robert'; DROP TABLE chat_messages;--", "t0")
|
||||
msgs = log.all()
|
||||
assert len(msgs) == 1
|
||||
assert "DROP TABLE" in msgs[0].content
|
||||
@@ -1376,141 +1376,3 @@ class TestIsProviderAvailable:
|
||||
result = router._is_provider_available(provider)
|
||||
assert result is True
|
||||
assert provider.circuit_state == CircuitState.HALF_OPEN
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestFilterProviders:
|
||||
"""Test _filter_providers helper extracted from complete()."""
|
||||
|
||||
def _router(self) -> CascadeRouter:
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.providers = [
|
||||
Provider(
|
||||
name="anthropic-p",
|
||||
type="anthropic",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
api_key="key",
|
||||
tier="frontier",
|
||||
),
|
||||
Provider(
|
||||
name="ollama-p",
|
||||
type="ollama",
|
||||
enabled=True,
|
||||
priority=2,
|
||||
tier="local",
|
||||
),
|
||||
]
|
||||
return router
|
||||
|
||||
def test_no_tier_returns_all_providers(self):
|
||||
router = self._router()
|
||||
result = router._filter_providers(None)
|
||||
assert result is router.providers
|
||||
|
||||
def test_frontier_required_returns_only_anthropic(self):
|
||||
router = self._router()
|
||||
result = router._filter_providers("frontier_required")
|
||||
assert len(result) == 1
|
||||
assert result[0].type == "anthropic"
|
||||
|
||||
def test_frontier_required_no_anthropic_raises(self):
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.providers = [
|
||||
Provider(name="ollama-p", type="ollama", enabled=True, priority=1)
|
||||
]
|
||||
with pytest.raises(RuntimeError, match="No Anthropic provider configured"):
|
||||
router._filter_providers("frontier_required")
|
||||
|
||||
def test_named_tier_filters_by_tier(self):
|
||||
router = self._router()
|
||||
result = router._filter_providers("local")
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "ollama-p"
|
||||
|
||||
def test_named_tier_not_found_raises(self):
|
||||
router = self._router()
|
||||
with pytest.raises(RuntimeError, match="No providers found for tier"):
|
||||
router._filter_providers("nonexistent")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
class TestTrySingleProvider:
|
||||
"""Test _try_single_provider helper extracted from complete()."""
|
||||
|
||||
def _router(self) -> CascadeRouter:
|
||||
return CascadeRouter(config_path=Path("/nonexistent"))
|
||||
|
||||
def _provider(self, name: str = "test", ptype: str = "ollama") -> Provider:
|
||||
return Provider(
|
||||
name=name,
|
||||
type=ptype,
|
||||
enabled=True,
|
||||
priority=1,
|
||||
models=[{"name": "llama3.2", "default": True}],
|
||||
)
|
||||
|
||||
async def test_unavailable_provider_returns_none(self):
|
||||
router = self._router()
|
||||
provider = self._provider()
|
||||
provider.enabled = False
|
||||
errors: list[str] = []
|
||||
result = await router._try_single_provider(
|
||||
provider, [], None, 0.7, None, ContentType.TEXT, errors
|
||||
)
|
||||
assert result is None
|
||||
assert errors == []
|
||||
|
||||
async def test_quota_blocked_cloud_provider_returns_none(self):
|
||||
router = self._router()
|
||||
provider = self._provider(ptype="anthropic")
|
||||
errors: list[str] = []
|
||||
with patch("infrastructure.router.cascade._quota_monitor") as mock_qm:
|
||||
mock_qm.select_model.return_value = "qwen3:14b" # non-cloud → ACTIVE tier
|
||||
mock_qm.check.return_value = None
|
||||
result = await router._try_single_provider(
|
||||
provider, [], None, 0.7, None, ContentType.TEXT, errors
|
||||
)
|
||||
assert result is None
|
||||
assert errors == []
|
||||
|
||||
async def test_success_returns_result_dict(self):
|
||||
router = self._router()
|
||||
provider = self._provider()
|
||||
errors: list[str] = []
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "hi", "model": "llama3.2"}
|
||||
result = await router._try_single_provider(
|
||||
provider,
|
||||
[{"role": "user", "content": "hi"}],
|
||||
None,
|
||||
0.7,
|
||||
None,
|
||||
ContentType.TEXT,
|
||||
errors,
|
||||
)
|
||||
assert result is not None
|
||||
assert result["content"] == "hi"
|
||||
assert result["provider"] == "test"
|
||||
assert errors == []
|
||||
|
||||
async def test_failure_appends_error_and_returns_none(self):
|
||||
router = self._router()
|
||||
provider = self._provider()
|
||||
errors: list[str] = []
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.side_effect = RuntimeError("boom")
|
||||
result = await router._try_single_provider(
|
||||
provider,
|
||||
[{"role": "user", "content": "hi"}],
|
||||
None,
|
||||
0.7,
|
||||
None,
|
||||
ContentType.TEXT,
|
||||
errors,
|
||||
)
|
||||
assert result is None
|
||||
assert len(errors) == 1
|
||||
assert "boom" in errors[0]
|
||||
assert provider.metrics.failed_requests == 1
|
||||
|
||||
@@ -6,12 +6,7 @@ import pytest
|
||||
|
||||
from infrastructure.presence import (
|
||||
DEFAULT_PIP_STATE,
|
||||
_get_agents_online,
|
||||
_get_familiar_state,
|
||||
_get_memory_count,
|
||||
_get_thinking_active,
|
||||
_get_uptime_seconds,
|
||||
_get_visitors,
|
||||
produce_agent_state,
|
||||
produce_bark,
|
||||
produce_system_status,
|
||||
@@ -505,36 +500,3 @@ class TestProduceSystemStatus:
|
||||
"""produce_system_status always returns a plain dict."""
|
||||
result = produce_system_status()
|
||||
assert isinstance(result, dict)
|
||||
|
||||
|
||||
class TestSystemStatusHelpers:
|
||||
"""Tests for the helper functions extracted from produce_system_status()."""
|
||||
|
||||
def test_get_agents_online_returns_int(self):
|
||||
"""_get_agents_online returns a non-negative int."""
|
||||
result = _get_agents_online()
|
||||
assert isinstance(result, int)
|
||||
assert result >= 0
|
||||
|
||||
def test_get_visitors_returns_int(self):
|
||||
"""_get_visitors returns a non-negative int."""
|
||||
result = _get_visitors()
|
||||
assert isinstance(result, int)
|
||||
assert result >= 0
|
||||
|
||||
def test_get_uptime_seconds_returns_int(self):
|
||||
"""_get_uptime_seconds returns a non-negative int."""
|
||||
result = _get_uptime_seconds()
|
||||
assert isinstance(result, int)
|
||||
assert result >= 0
|
||||
|
||||
def test_get_thinking_active_returns_bool(self):
|
||||
"""_get_thinking_active returns a bool."""
|
||||
result = _get_thinking_active()
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_get_memory_count_returns_int(self):
|
||||
"""_get_memory_count returns a non-negative int."""
|
||||
result = _get_memory_count()
|
||||
assert isinstance(result, int)
|
||||
assert result >= 0
|
||||
|
||||
Reference in New Issue
Block a user