Compare commits

...

12 Commits

Author SHA1 Message Date
kimi
f3fc86de07 fix: remove dead airllm provider type from cascade router
Remove the airllm provider block from providers.yaml, the airllm
availability check from cascade.py, and associated tests. Renumber
remaining provider priorities (OpenAI → 2, Anthropic → 3).

Fixes #459

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 15:46:32 -04:00
dbc2fd5b0f [loop-cycle-536] fix: validate_startup checks CORS wildcard in production (#472) (#478) 2026-03-19 15:29:26 -04:00
3c3aca57f1 [loop-cycle-535] perf: cache Timmy agent at startup (#471) (#476)
## What
Cache the Timmy agent instance at app startup (in lifespan) instead of creating a new one per `/serve/chat` request.

## Changes
- `src/timmy_serve/app.py`: Create agent in lifespan, store in `app.state.timmy`
- `tests/timmy/test_timmy_serve_app.py`: Updated tests for lifespan-based caching, added `test_agent_cached_at_startup`

2085 unit tests pass. 2102 pre-push tests pass. 78.5% coverage.

Closes #471

Co-authored-by: Timmy <timmy@timmytime.ai>
Reviewed-on: http://localhost:3000/rockachopa/Timmy-time-dashboard/pulls/476
Co-authored-by: Timmy Time <timmy@Alexanderwhitestone.ai>
Co-committed-by: Timmy Time <timmy@Alexanderwhitestone.ai>
2026-03-19 15:28:57 -04:00
0ae00af3f8 fix: remove AirLLM config settings from config.py (#475)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 15:24:43 -04:00
3df526f6ef [loop-cycle-2] feat: hot-reload providers.yaml without restart (#458) (#470) 2026-03-19 15:11:40 -04:00
50aaf60db2 [loop-cycle-2] fix: strip CORS wildcards in production (#462) (#469) 2026-03-19 15:05:27 -04:00
a751be3038 fix: default CORS origins to localhost instead of wildcard (#467)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:57:36 -04:00
92594ea588 [loop-cycle] feat: implement source distinction in system prompts (#463) (#464) 2026-03-19 14:49:31 -04:00
12582ab593 fix: stabilize flaky test_uses_model_when_available (#456)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:39:33 -04:00
72c3a0a989 fix: integration tests for agentic loop WS broadcasts (#452)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:30:00 -04:00
de089cec7f [loop-cycle-524] fix: remove numpy test dependency in test_memory_embeddings (#451) 2026-03-19 14:22:13 -04:00
3590c1689e fix: make _get_loop_agent singleton thread-safe (#449)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:18:27 -04:00
14 changed files with 666 additions and 97 deletions

View File

@@ -54,25 +54,11 @@ providers:
context_window: 2048
capabilities: [text, vision, streaming]
# Secondary: Local AirLLM (if installed)
- name: airllm-local
type: airllm
enabled: false # Enable if pip install airllm
priority: 2
models:
- name: 70b
default: true
capabilities: [text, tools, json, streaming]
- name: 8b
capabilities: [text, tools, json, streaming]
- name: 405b
capabilities: [text, tools, json, streaming]
# Tertiary: OpenAI (if API key available)
# Secondary: OpenAI (if API key available)
- name: openai-backup
type: openai
enabled: false # Enable by setting OPENAI_API_KEY
priority: 3
priority: 2
api_key: "${OPENAI_API_KEY}" # Loaded from environment
base_url: null # Use default OpenAI endpoint
models:
@@ -84,11 +70,11 @@ providers:
context_window: 128000
capabilities: [text, vision, tools, json, streaming]
# Quaternary: Anthropic (if API key available)
# Tertiary: Anthropic (if API key available)
- name: anthropic-backup
type: anthropic
enabled: false # Enable by setting ANTHROPIC_API_KEY
priority: 4
priority: 3
api_key: "${ANTHROPIC_API_KEY}"
models:
- name: claude-3-haiku-20240307

View File

@@ -64,17 +64,10 @@ class Settings(BaseSettings):
# Seconds to wait for user confirmation before auto-rejecting.
discord_confirm_timeout: int = 120
# ── AirLLM / backend selection ───────────────────────────────────────────
# ── Backend selection ────────────────────────────────────────────────────
# "ollama" — always use Ollama (default, safe everywhere)
# "airllm" — always use AirLLM (requires pip install ".[bigbrain]")
# "auto" — use AirLLM on Apple Silicon if airllm is installed,
# fall back to Ollama otherwise
timmy_model_backend: Literal["ollama", "airllm", "grok", "claude", "auto"] = "ollama"
# AirLLM model size when backend is airllm or auto.
# Larger = smarter, but needs more RAM / disk.
# 8b ~16 GB | 70b ~140 GB | 405b ~810 GB
airllm_model_size: Literal["8b", "70b", "405b"] = "70b"
# "auto" — pick best available local backend, fall back to Ollama
timmy_model_backend: Literal["ollama", "grok", "claude", "auto"] = "ollama"
# ── Grok (xAI) — opt-in premium cloud backend ────────────────────────
# Grok is a premium augmentation layer — local-first ethos preserved.
@@ -138,7 +131,12 @@ class Settings(BaseSettings):
# CORS allowed origins for the web chat interface (Gitea Pages, etc.)
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
cors_origins: list[str] = ["*"]
cors_origins: list[str] = [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
# Trusted hosts for the Host header check (TrustedHostMiddleware).
# Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net").
@@ -471,8 +469,19 @@ def validate_startup(*, force: bool = False) -> None:
", ".join(_missing),
)
sys.exit(1)
if "*" in settings.cors_origins:
_startup_logger.error(
"PRODUCTION SECURITY ERROR: CORS wildcard '*' is not allowed "
"in production. Set CORS_ORIGINS to explicit origins."
)
sys.exit(1)
_startup_logger.info("Production mode: security secrets validated ✓")
else:
if "*" in settings.cors_origins:
_startup_logger.warning(
"SEC: CORS_ORIGINS contains wildcard '*'"
"restrict to explicit origins before deploying to production."
)
if not settings.l402_hmac_secret:
_startup_logger.warning(
"SEC: L402_HMAC_SECRET is not set — "

View File

@@ -484,15 +484,14 @@ app = FastAPI(
def _get_cors_origins() -> list[str]:
"""Get CORS origins from settings, with sensible defaults."""
"""Get CORS origins from settings, rejecting wildcards in production."""
origins = settings.cors_origins
if settings.debug and origins == ["*"]:
return [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
if "*" in origins and not settings.debug:
logger.warning(
"Wildcard '*' in CORS_ORIGINS stripped in production — "
"set explicit origins via CORS_ORIGINS env var"
)
origins = [o for o in origins if o != "*"]
return origins

View File

@@ -183,6 +183,22 @@ async def run_health_check(
}
@router.post("/reload")
async def reload_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],
) -> dict[str, Any]:
"""Hot-reload providers.yaml without restart.
Preserves circuit breaker state and metrics for existing providers.
"""
try:
result = cascade.reload_config()
return {"status": "ok", **result}
except Exception as exc:
logger.error("Config reload failed: %s", exc)
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/config")
async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -100,7 +100,7 @@ class Provider:
"""LLM provider configuration and state."""
name: str
type: str # ollama, openai, anthropic, airllm
type: str # ollama, openai, anthropic
enabled: bool
priority: int
url: str | None = None
@@ -308,15 +308,6 @@ class CascadeRouter:
logger.debug("Ollama provider check error: %s", exc)
return False
elif provider.type == "airllm":
# Check if airllm is installed
try:
import importlib.util
return importlib.util.find_spec("airllm") is not None
except (ImportError, ModuleNotFoundError):
return False
elif provider.type in ("openai", "anthropic", "grok"):
# Check if API key is set
return provider.api_key is not None and provider.api_key != ""
@@ -815,6 +806,66 @@ class CascadeRouter:
provider.status = ProviderStatus.HEALTHY
logger.info("Circuit breaker CLOSED for %s", provider.name)
def reload_config(self) -> dict:
"""Hot-reload providers.yaml, preserving runtime state.
Re-reads the config file, rebuilds the provider list, and
preserves circuit breaker state and metrics for providers
that still exist after reload.
Returns:
Summary dict with added/removed/preserved counts.
"""
# Snapshot current runtime state keyed by provider name
old_state: dict[
str, tuple[ProviderMetrics, CircuitState, float | None, int, ProviderStatus]
] = {}
for p in self.providers:
old_state[p.name] = (
p.metrics,
p.circuit_state,
p.circuit_opened_at,
p.half_open_calls,
p.status,
)
old_names = set(old_state.keys())
# Reload from disk
self.providers = []
self._load_config()
# Restore preserved state
new_names = {p.name for p in self.providers}
preserved = 0
for p in self.providers:
if p.name in old_state:
metrics, circuit, opened_at, half_open, status = old_state[p.name]
p.metrics = metrics
p.circuit_state = circuit
p.circuit_opened_at = opened_at
p.half_open_calls = half_open
p.status = status
preserved += 1
added = new_names - old_names
removed = old_names - new_names
logger.info(
"Config reloaded: %d providers (%d preserved, %d added, %d removed)",
len(self.providers),
preserved,
len(added),
len(removed),
)
return {
"total_providers": len(self.providers),
"preserved": preserved,
"added": sorted(added),
"removed": sorted(removed),
}
def get_metrics(self) -> dict:
"""Get metrics for all providers."""
return {

View File

@@ -220,7 +220,7 @@ def create_timmy(
print_response(message, stream).
"""
resolved = _resolve_backend(backend)
size = model_size or settings.airllm_model_size
size = model_size or "70b"
if resolved == "claude":
from timmy.backends import ClaudeBackend
@@ -300,7 +300,11 @@ def create_timmy(
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = f"{base_prompt}\n\n## Memory Context\n\n{memory_context}"
full_prompt = (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else:
full_prompt = base_prompt
except Exception as exc:

View File

@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import logging
import re
import threading
import time
import uuid
from collections.abc import Callable
@@ -59,6 +60,7 @@ class AgenticResult:
# ---------------------------------------------------------------------------
_loop_agent = None
_loop_agent_lock = threading.Lock()
def _get_loop_agent():
@@ -69,9 +71,11 @@ def _get_loop_agent():
"""
global _loop_agent
if _loop_agent is None:
from timmy.agent import create_timmy
with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy()
_loop_agent = create_timmy()
return _loop_agent

View File

@@ -23,6 +23,9 @@ Rules:
- Remember what the user tells you during the conversation.
- If you don't know something, say so honestly — never fabricate facts.
- If a request is ambiguous, ask a brief clarifying question before guessing.
- SOURCE DISTINCTION: When answering from memory or retrieved context, cite it.
When answering from your own training, use hedging: "I think", "I believe".
The user must be able to tell grounded claims from pattern-matching.
- Use the user's name if you know it.
- When you state a fact, commit to it.
- NEVER attempt arithmetic in your head. If asked to compute anything, respond:
@@ -78,6 +81,18 @@ HONESTY:
- Never fabricate tool output. Call the tool and wait.
- If a tool errors, report the exact error.
SOURCE DISTINCTION (SOUL requirement — non-negotiable):
- Every claim you make comes from one of two places: a verified source you
can point to, or your own pattern-matching. The user must be able to tell
which is which.
- When your response uses information from GROUNDED CONTEXT (memory, retrieved
documents, tool output), cite it: "From memory:", "According to [source]:".
- When you are generating from your training data alone, signal it naturally:
"I think", "My understanding is", "I believe" — never false certainty.
- If the user asks a factual question and you have no grounded source, say so:
"I don't have a verified source for this — from my training I think..."
- Prefer "I don't know" over a confident-sounding guess. Refusal over fabrication.
MEMORY (three tiers):
- Tier 1: MEMORY.md (hot, always loaded)
- Tier 2: memory/ vault (structured, append-only, date-stamped)

View File

@@ -75,6 +75,8 @@ def create_timmy_serve_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Timmy Serve starting")
app.state.timmy = create_timmy()
logger.info("Timmy agent cached in app state")
yield
logger.info("Timmy Serve shutting down")
@@ -101,7 +103,7 @@ def create_timmy_serve_app() -> FastAPI:
async def serve_chat(request: Request, body: ChatRequest):
"""Process a chat request."""
try:
timmy = create_timmy()
timmy = request.app.state.timmy
result = timmy.run(body.message, stream=False)
response_text = result.content if hasattr(result, "content") else str(result)

View File

@@ -2,7 +2,7 @@
import time
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import pytest
import yaml
@@ -489,30 +489,182 @@ class TestProviderAvailabilityCheck:
assert router._check_provider_available(provider) is False
def test_check_airllm_installed(self):
"""Test AirLLM when installed."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="airllm",
type="airllm",
enabled=True,
priority=1,
class TestCascadeRouterReload:
"""Test hot-reload of providers.yaml."""
def test_reload_preserves_metrics(self, tmp_path):
"""Test that reload preserves metrics for existing providers."""
config = {
"providers": [
{
"name": "test-openai",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert len(router.providers) == 1
# Simulate some traffic
router._record_success(router.providers[0], 150.0)
router._record_success(router.providers[0], 250.0)
assert router.providers[0].metrics.total_requests == 2
# Reload
result = router.reload_config()
assert result["total_providers"] == 1
assert result["preserved"] == 1
assert result["added"] == []
assert result["removed"] == []
# Metrics survived
assert router.providers[0].metrics.total_requests == 2
assert router.providers[0].metrics.total_latency_ms == 400.0
def test_reload_preserves_circuit_breaker(self, tmp_path):
"""Test that reload preserves circuit breaker state."""
config = {
"cascade": {"circuit_breaker": {"failure_threshold": 2}},
"providers": [
{
"name": "test-openai",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
# Open circuit breaker
for _ in range(2):
router._record_failure(router.providers[0])
assert router.providers[0].circuit_state == CircuitState.OPEN
# Reload
router.reload_config()
# Circuit breaker state preserved
assert router.providers[0].circuit_state == CircuitState.OPEN
assert router.providers[0].status == ProviderStatus.UNHEALTHY
def test_reload_detects_added_provider(self, tmp_path):
"""Test that reload detects newly added providers."""
config = {
"providers": [
{
"name": "openai-1",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
}
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert len(router.providers) == 1
# Add a second provider to config
config["providers"].append(
{
"name": "anthropic-1",
"type": "anthropic",
"enabled": True,
"priority": 2,
"api_key": "sk-ant-test",
}
)
config_path.write_text(yaml.dump(config))
with patch("importlib.util.find_spec", return_value=MagicMock()):
assert router._check_provider_available(provider) is True
result = router.reload_config()
def test_check_airllm_not_installed(self):
"""Test AirLLM when not installed."""
router = CascadeRouter(config_path=Path("/nonexistent"))
assert result["total_providers"] == 2
assert result["preserved"] == 1
assert result["added"] == ["anthropic-1"]
assert result["removed"] == []
provider = Provider(
name="airllm",
type="airllm",
enabled=True,
priority=1,
)
def test_reload_detects_removed_provider(self, tmp_path):
"""Test that reload detects removed providers."""
config = {
"providers": [
{
"name": "openai-1",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test",
},
{
"name": "anthropic-1",
"type": "anthropic",
"enabled": True,
"priority": 2,
"api_key": "sk-ant-test",
},
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
with patch("importlib.util.find_spec", return_value=None):
assert router._check_provider_available(provider) is False
router = CascadeRouter(config_path=config_path)
assert len(router.providers) == 2
# Remove anthropic
config["providers"] = [config["providers"][0]]
config_path.write_text(yaml.dump(config))
result = router.reload_config()
assert result["total_providers"] == 1
assert result["preserved"] == 1
assert result["removed"] == ["anthropic-1"]
def test_reload_re_sorts_by_priority(self, tmp_path):
"""Test that providers are re-sorted by priority after reload."""
config = {
"providers": [
{
"name": "low-priority",
"type": "openai",
"enabled": True,
"priority": 10,
"api_key": "sk-test",
},
{
"name": "high-priority",
"type": "openai",
"enabled": True,
"priority": 1,
"api_key": "sk-test2",
},
],
}
config_path = tmp_path / "providers.yaml"
config_path.write_text(yaml.dump(config))
router = CascadeRouter(config_path=config_path)
assert router.providers[0].name == "high-priority"
# Swap priorities
config["providers"][0]["priority"] = 1
config["providers"][1]["priority"] = 10
config_path.write_text(yaml.dump(config))
router.reload_config()
assert router.providers[0].name == "low-priority"
assert router.providers[1].name == "high-priority"

View File

@@ -0,0 +1,285 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that ``run_agentic_loop`` pushes the correct sequence of events
through the real ``ws_manager`` and that connected (mock) WebSocket clients
receive every broadcast with the expected payloads.
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_run(content: str):
m = MagicMock()
m.content = content
return m
def _ws_client() -> AsyncMock:
"""Return a fake WebSocket that records sent messages."""
return AsyncMock()
def _collected_events(ws: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
return [json.loads(call.args[0]) for call in ws.send_text.call_args_list]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestAgenticLoopBroadcastSequence:
"""Events arrive at WS clients in the correct order with expected data."""
@pytest.mark.asyncio
async def test_successful_run_broadcasts_plan_steps_complete(self):
"""A successful 2-step loop emits plan_ready → 2× step_complete → task_complete."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Gathered 10 records"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Gather and summarise", max_steps=2)
assert result.status == "completed"
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_plan_ready_payload(self):
"""plan_ready contains task_id, task, steps list, and total count."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Alpha\n2. Beta"),
_mock_run("Alpha done"),
_mock_run("Beta done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Two steps")
plan_event = _collected_events(ws)[0]
assert plan_event["event"] == "agentic.plan_ready"
data = plan_event["data"]
assert data["task_id"] == result.task_id
assert data["task"] == "Two steps"
assert data["steps"] == ["Alpha", "Beta"]
assert data["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_payload(self):
"""step_complete carries step number, total, description, and result."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Step result text"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Single step", max_steps=1)
step_event = _collected_events(ws)[1]
assert step_event["event"] == "agentic.step_complete"
data = step_event["data"]
assert data["step"] == 1
assert data["total"] == 1
assert data["description"] == "Only step"
assert "Step result text" in data["result"]
@pytest.mark.asyncio
async def test_task_complete_payload(self):
"""task_complete has status, steps_completed, summary, and duration_ms."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Quick", max_steps=1)
complete_event = _collected_events(ws)[-1]
assert complete_event["event"] == "agentic.task_complete"
data = complete_event["data"]
assert data["status"] == "completed"
assert data["steps_completed"] == 1
assert isinstance(data["duration_ms"], int)
assert data["duration_ms"] >= 0
assert data["summary"]
class TestAdaptationBroadcast:
"""Adapted steps emit step_adapted events."""
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted(self):
"""A failed-then-adapted step emits agentic.step_adapted."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("disk full"),
_mock_run("Used /tmp instead"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Adapt test", max_steps=1)
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = next(e for e in events if e["event"] == "agentic.step_adapted")
assert adapted["data"]["error"] == "disk full"
assert adapted["data"]["adaptation"]
assert result.steps[0].status == "adapted"
class TestMultipleClients:
"""All connected clients receive every broadcast."""
@pytest.mark.asyncio
async def test_two_clients_receive_all_events(self):
mgr = WebSocketManager()
ws1 = _ws_client()
ws2 = _ws_client()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A"),
_mock_run("A done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Multi-client", max_steps=1)
events1 = _collected_events(ws1)
events2 = _collected_events(ws2)
assert len(events1) == len(events2) == 3 # plan + step + complete
assert [e["event"] for e in events1] == [e["event"] for e in events2]
class TestEventHistory:
"""Broadcasts are recorded in ws_manager event history."""
@pytest.mark.asyncio
async def test_events_appear_in_history(self):
mgr = WebSocketManager()
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("History test", max_steps=1)
history_events = [e.event for e in mgr.event_history]
assert "agentic.plan_ready" in history_events
assert "agentic.step_complete" in history_events
assert "agentic.task_complete" in history_events
class TestBroadcastGracefulDegradation:
"""Loop completes even when ws_manager is unavailable."""
@pytest.mark.asyncio
async def test_loop_succeeds_when_broadcast_fails(self):
"""ImportError from ws_manager doesn't crash the loop."""
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch(
"infrastructure.ws_manager.handler.ws_manager",
new_callable=lambda: MagicMock,
) as broken_mgr,
):
broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down"))
result = await run_agentic_loop("Resilient task", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1

View File

@@ -49,6 +49,34 @@ class TestConfigLazyValidation:
# Should not raise
validate_startup(force=True)
def test_validate_startup_exits_on_cors_wildcard_in_production(self):
"""validate_startup() should exit in production when CORS has wildcard."""
from config import settings, validate_startup
with (
patch.object(settings, "timmy_env", "production"),
patch.object(settings, "l402_hmac_secret", "test-secret-hex-value-32"),
patch.object(settings, "l402_macaroon_secret", "test-macaroon-hex-value-32"),
patch.object(settings, "cors_origins", ["*"]),
pytest.raises(SystemExit),
):
validate_startup(force=True)
def test_validate_startup_warns_cors_wildcard_in_dev(self):
"""validate_startup() should warn in dev when CORS has wildcard."""
from config import settings, validate_startup
with (
patch.object(settings, "timmy_env", "development"),
patch.object(settings, "cors_origins", ["*"]),
patch("config._startup_logger") as mock_logger,
):
validate_startup(force=True)
mock_logger.warning.assert_any_call(
"SEC: CORS_ORIGINS contains wildcard '*'"
"restrict to explicit origins before deploying to production."
)
def test_validate_startup_skips_in_test_mode(self):
"""validate_startup() should be a no-op in test mode."""
from config import validate_startup

View File

@@ -8,11 +8,14 @@ from fastapi.testclient import TestClient
@pytest.fixture
def serve_client():
"""Create a TestClient for the timmy-serve app."""
from timmy_serve.app import create_timmy_serve_app
"""Create a TestClient for the timmy-serve app with mocked Timmy agent."""
with patch("timmy_serve.app.create_timmy") as mock_create:
mock_create.return_value = MagicMock()
from timmy_serve.app import create_timmy_serve_app
app = create_timmy_serve_app()
return TestClient(app)
app = create_timmy_serve_app()
with TestClient(app) as client:
yield client
class TestHealthEndpoint:
@@ -34,18 +37,40 @@ class TestServeStatus:
class TestServeChatEndpoint:
@patch("timmy_serve.app.create_timmy")
def test_chat_returns_response(self, mock_create, serve_client):
def test_chat_returns_response(self, mock_create):
mock_agent = MagicMock()
mock_result = MagicMock()
mock_result.content = "I am Timmy."
mock_agent.run.return_value = mock_result
mock_create.return_value = mock_agent
resp = serve_client.post(
"/serve/chat",
json={"message": "Who are you?"},
)
from timmy_serve.app import create_timmy_serve_app
app = create_timmy_serve_app()
with TestClient(app) as client:
resp = client.post(
"/serve/chat",
json={"message": "Who are you?"},
)
assert resp.status_code == 200
data = resp.json()
assert data["response"] == "I am Timmy."
mock_agent.run.assert_called_once_with("Who are you?", stream=False)
@patch("timmy_serve.app.create_timmy")
def test_agent_cached_at_startup(self, mock_create):
"""Verify create_timmy is called once at startup, not per request."""
mock_agent = MagicMock()
mock_result = MagicMock()
mock_result.content = "reply"
mock_agent.run.return_value = mock_result
mock_create.return_value = mock_agent
from timmy_serve.app import create_timmy_serve_app
app = create_timmy_serve_app()
with TestClient(app) as client:
# Two requests — create_timmy should only be called once (at startup)
client.post("/serve/chat", json={"message": "hello"})
client.post("/serve/chat", json={"message": "world"})
mock_create.assert_called_once()

View File

@@ -99,26 +99,19 @@ class TestKeywordOverlap:
class TestEmbedText:
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_uses_fallback_when_model_disabled(self):
emb.EMBEDDING_MODEL = False
vec = embed_text("test")
with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("test")
assert len(vec) == 128 # hash fallback dimension
def test_uses_model_when_available(self):
import numpy as np
mock_encoding = MagicMock()
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock()
mock_model.encode.return_value = np.array([0.1, 0.2, 0.3])
emb.EMBEDDING_MODEL = mock_model
mock_model.encode.return_value = mock_encoding
result = embed_text("test")
with patch.object(emb, "_get_embedding_model", return_value=mock_model):
result = embed_text("test")
assert result == pytest.approx([0.1, 0.2, 0.3])
mock_model.encode.assert_called_once_with("test")