Compare commits

..

6 Commits

Author SHA1 Message Date
kimi
9578330c87 fix: replace wildcard CORS default with explicit localhost origins
The cors_origins setting defaulted to ["*"], which passed through
unchanged in production (non-debug) mode. Now defaults to explicit
localhost origins, and _get_cors_origins() strips any wildcards in
production with a warning.

Fixes #462

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:58:10 -04:00
92594ea588 [loop-cycle] feat: implement source distinction in system prompts (#463) (#464) 2026-03-19 14:49:31 -04:00
12582ab593 fix: stabilize flaky test_uses_model_when_available (#456)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:39:33 -04:00
72c3a0a989 fix: integration tests for agentic loop WS broadcasts (#452)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:30:00 -04:00
de089cec7f [loop-cycle-524] fix: remove numpy test dependency in test_memory_embeddings (#451) 2026-03-19 14:22:13 -04:00
3590c1689e fix: make _get_loop_agent singleton thread-safe (#449)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:18:27 -04:00
8 changed files with 335 additions and 73 deletions

View File

@@ -138,7 +138,12 @@ class Settings(BaseSettings):
# CORS allowed origins for the web chat interface (Gitea Pages, etc.)
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
cors_origins: list[str] = ["*"]
cors_origins: list[str] = [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
# Trusted hosts for the Host header check (TrustedHostMiddleware).
# Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net").

View File

@@ -484,15 +484,19 @@ app = FastAPI(
def _get_cors_origins() -> list[str]:
"""Get CORS origins from settings, with sensible defaults."""
"""Get CORS origins from settings, rejecting wildcards in production."""
origins = settings.cors_origins
if settings.debug and origins == ["*"]:
return [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
if not settings.debug and "*" in origins:
logger.warning(
"Wildcard '*' in CORS_ORIGINS ignored in production — "
"set explicit origins via CORS_ORIGINS env var"
)
origins = [o for o in origins if o != "*"]
if not origins:
origins = [
"http://localhost:3000",
"http://localhost:8000",
]
return origins

View File

@@ -300,7 +300,11 @@ def create_timmy(
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = f"{base_prompt}\n\n## Memory Context\n\n{memory_context}"
full_prompt = (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else:
full_prompt = base_prompt
except Exception as exc:

View File

@@ -68,18 +68,14 @@ def _get_loop_agent():
Returns the same type of agent as `create_timmy()` but with a
dedicated session so it doesn't pollute the main chat history.
Thread-safe: uses a lock to prevent duplicate agent creation
when multiple loops start concurrently.
"""
global _loop_agent
if _loop_agent is not None:
return _loop_agent
with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
if _loop_agent is None:
with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy()
_loop_agent = create_timmy()
return _loop_agent

View File

@@ -23,6 +23,9 @@ Rules:
- Remember what the user tells you during the conversation.
- If you don't know something, say so honestly — never fabricate facts.
- If a request is ambiguous, ask a brief clarifying question before guessing.
- SOURCE DISTINCTION: When answering from memory or retrieved context, cite it.
When answering from your own training, use hedging: "I think", "I believe".
The user must be able to tell grounded claims from pattern-matching.
- Use the user's name if you know it.
- When you state a fact, commit to it.
- NEVER attempt arithmetic in your head. If asked to compute anything, respond:
@@ -78,6 +81,18 @@ HONESTY:
- Never fabricate tool output. Call the tool and wait.
- If a tool errors, report the exact error.
SOURCE DISTINCTION (SOUL requirement — non-negotiable):
- Every claim you make comes from one of two places: a verified source you
can point to, or your own pattern-matching. The user must be able to tell
which is which.
- When your response uses information from GROUNDED CONTEXT (memory, retrieved
documents, tool output), cite it: "From memory:", "According to [source]:".
- When you are generating from your training data alone, signal it naturally:
"I think", "My understanding is", "I believe" — never false certainty.
- If the user asks a factual question and you have no grounded source, say so:
"I don't have a verified source for this — from my training I think..."
- Prefer "I don't know" over a confident-sounding guess. Refusal over fabrication.
MEMORY (three tiers):
- Tier 1: MEMORY.md (hot, always loaded)
- Tier 2: memory/ vault (structured, append-only, date-stamped)

View File

@@ -0,0 +1,285 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that ``run_agentic_loop`` pushes the correct sequence of events
through the real ``ws_manager`` and that connected (mock) WebSocket clients
receive every broadcast with the expected payloads.
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_run(content: str):
m = MagicMock()
m.content = content
return m
def _ws_client() -> AsyncMock:
"""Return a fake WebSocket that records sent messages."""
return AsyncMock()
def _collected_events(ws: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
return [json.loads(call.args[0]) for call in ws.send_text.call_args_list]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestAgenticLoopBroadcastSequence:
"""Events arrive at WS clients in the correct order with expected data."""
@pytest.mark.asyncio
async def test_successful_run_broadcasts_plan_steps_complete(self):
"""A successful 2-step loop emits plan_ready → 2× step_complete → task_complete."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Gathered 10 records"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Gather and summarise", max_steps=2)
assert result.status == "completed"
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_plan_ready_payload(self):
"""plan_ready contains task_id, task, steps list, and total count."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Alpha\n2. Beta"),
_mock_run("Alpha done"),
_mock_run("Beta done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Two steps")
plan_event = _collected_events(ws)[0]
assert plan_event["event"] == "agentic.plan_ready"
data = plan_event["data"]
assert data["task_id"] == result.task_id
assert data["task"] == "Two steps"
assert data["steps"] == ["Alpha", "Beta"]
assert data["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_payload(self):
"""step_complete carries step number, total, description, and result."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Step result text"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Single step", max_steps=1)
step_event = _collected_events(ws)[1]
assert step_event["event"] == "agentic.step_complete"
data = step_event["data"]
assert data["step"] == 1
assert data["total"] == 1
assert data["description"] == "Only step"
assert "Step result text" in data["result"]
@pytest.mark.asyncio
async def test_task_complete_payload(self):
"""task_complete has status, steps_completed, summary, and duration_ms."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Quick", max_steps=1)
complete_event = _collected_events(ws)[-1]
assert complete_event["event"] == "agentic.task_complete"
data = complete_event["data"]
assert data["status"] == "completed"
assert data["steps_completed"] == 1
assert isinstance(data["duration_ms"], int)
assert data["duration_ms"] >= 0
assert data["summary"]
class TestAdaptationBroadcast:
"""Adapted steps emit step_adapted events."""
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted(self):
"""A failed-then-adapted step emits agentic.step_adapted."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("disk full"),
_mock_run("Used /tmp instead"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Adapt test", max_steps=1)
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = next(e for e in events if e["event"] == "agentic.step_adapted")
assert adapted["data"]["error"] == "disk full"
assert adapted["data"]["adaptation"]
assert result.steps[0].status == "adapted"
class TestMultipleClients:
"""All connected clients receive every broadcast."""
@pytest.mark.asyncio
async def test_two_clients_receive_all_events(self):
mgr = WebSocketManager()
ws1 = _ws_client()
ws2 = _ws_client()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A"),
_mock_run("A done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Multi-client", max_steps=1)
events1 = _collected_events(ws1)
events2 = _collected_events(ws2)
assert len(events1) == len(events2) == 3 # plan + step + complete
assert [e["event"] for e in events1] == [e["event"] for e in events2]
class TestEventHistory:
"""Broadcasts are recorded in ws_manager event history."""
@pytest.mark.asyncio
async def test_events_appear_in_history(self):
mgr = WebSocketManager()
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("History test", max_steps=1)
history_events = [e.event for e in mgr.event_history]
assert "agentic.plan_ready" in history_events
assert "agentic.step_complete" in history_events
assert "agentic.task_complete" in history_events
class TestBroadcastGracefulDegradation:
"""Loop completes even when ws_manager is unavailable."""
@pytest.mark.asyncio
async def test_loop_succeeds_when_broadcast_fails(self):
"""ImportError from ws_manager doesn't crash the loop."""
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch(
"infrastructure.ws_manager.handler.ws_manager",
new_callable=lambda: MagicMock,
) as broken_mgr,
):
broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down"))
result = await run_agentic_loop("Resilient task", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1

View File

@@ -104,46 +104,6 @@ class TestGetLoopAgent:
finally:
al._loop_agent = saved
def test_thread_safe_creation(self):
"""Concurrent calls must only create one agent (thread-safety)."""
import threading
import timmy.agentic_loop as al
saved = al._loop_agent
try:
al._loop_agent = None
mock_agent = MagicMock()
call_count = 0
barrier = threading.Barrier(4)
original_create = MagicMock(return_value=mock_agent)
def slow_create():
nonlocal call_count
call_count += 1
return original_create()
results = [None] * 4
def worker(idx):
barrier.wait()
results[idx] = al._get_loop_agent()
with patch("timmy.agent.create_timmy", side_effect=slow_create):
threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
# All threads got the same agent
assert all(r is mock_agent for r in results)
# create_timmy called exactly once
assert call_count == 1
finally:
al._loop_agent = saved
# ── _broadcast_progress ──────────────────────────────────────────────────────

View File

@@ -99,26 +99,19 @@ class TestKeywordOverlap:
class TestEmbedText:
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_uses_fallback_when_model_disabled(self):
emb.EMBEDDING_MODEL = False
vec = embed_text("test")
with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("test")
assert len(vec) == 128 # hash fallback dimension
def test_uses_model_when_available(self):
import numpy as np
mock_encoding = MagicMock()
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock()
mock_model.encode.return_value = np.array([0.1, 0.2, 0.3])
emb.EMBEDDING_MODEL = mock_model
mock_model.encode.return_value = mock_encoding
result = embed_text("test")
with patch.object(emb, "_get_embedding_model", return_value=mock_model):
result = embed_text("test")
assert result == pytest.approx([0.1, 0.2, 0.3])
mock_model.encode.assert_called_once_with("test")