Compare commits

...

6 Commits

Author SHA1 Message Date
kimi
9578330c87 fix: replace wildcard CORS default with explicit localhost origins
The cors_origins setting defaulted to ["*"], which passed through
unchanged in production (non-debug) mode. Now defaults to explicit
localhost origins, and _get_cors_origins() strips any wildcards in
production with a warning.

Fixes #462

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:58:10 -04:00
92594ea588 [loop-cycle] feat: implement source distinction in system prompts (#463) (#464) 2026-03-19 14:49:31 -04:00
12582ab593 fix: stabilize flaky test_uses_model_when_available (#456)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:39:33 -04:00
72c3a0a989 fix: integration tests for agentic loop WS broadcasts (#452)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:30:00 -04:00
de089cec7f [loop-cycle-524] fix: remove numpy test dependency in test_memory_embeddings (#451) 2026-03-19 14:22:13 -04:00
3590c1689e fix: make _get_loop_agent singleton thread-safe (#449)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:18:27 -04:00
7 changed files with 336 additions and 26 deletions

View File

@@ -138,7 +138,12 @@ class Settings(BaseSettings):
# CORS allowed origins for the web chat interface (Gitea Pages, etc.) # CORS allowed origins for the web chat interface (Gitea Pages, etc.)
# Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com" # Set CORS_ORIGINS as a comma-separated list, e.g. "http://localhost:3000,https://example.com"
cors_origins: list[str] = ["*"] cors_origins: list[str] = [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000",
]
# Trusted hosts for the Host header check (TrustedHostMiddleware). # Trusted hosts for the Host header check (TrustedHostMiddleware).
# Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net"). # Set TRUSTED_HOSTS as a comma-separated list. Wildcards supported (e.g. "*.ts.net").

View File

@@ -484,15 +484,19 @@ app = FastAPI(
def _get_cors_origins() -> list[str]: def _get_cors_origins() -> list[str]:
"""Get CORS origins from settings, with sensible defaults.""" """Get CORS origins from settings, rejecting wildcards in production."""
origins = settings.cors_origins origins = settings.cors_origins
if settings.debug and origins == ["*"]: if not settings.debug and "*" in origins:
return [ logger.warning(
"http://localhost:3000", "Wildcard '*' in CORS_ORIGINS ignored in production — "
"http://localhost:8000", "set explicit origins via CORS_ORIGINS env var"
"http://127.0.0.1:3000", )
"http://127.0.0.1:8000", origins = [o for o in origins if o != "*"]
] if not origins:
origins = [
"http://localhost:3000",
"http://localhost:8000",
]
return origins return origins

View File

@@ -300,7 +300,11 @@ def create_timmy(
max_context = 2000 if not use_tools else 8000 max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context: if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]" memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = f"{base_prompt}\n\n## Memory Context\n\n{memory_context}" full_prompt = (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else: else:
full_prompt = base_prompt full_prompt = base_prompt
except Exception as exc: except Exception as exc:

View File

@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import re import re
import threading
import time import time
import uuid import uuid
from collections.abc import Callable from collections.abc import Callable
@@ -59,6 +60,7 @@ class AgenticResult:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_loop_agent = None _loop_agent = None
_loop_agent_lock = threading.Lock()
def _get_loop_agent(): def _get_loop_agent():
@@ -69,9 +71,11 @@ def _get_loop_agent():
""" """
global _loop_agent global _loop_agent
if _loop_agent is None: if _loop_agent is None:
from timmy.agent import create_timmy with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy() _loop_agent = create_timmy()
return _loop_agent return _loop_agent

View File

@@ -23,6 +23,9 @@ Rules:
- Remember what the user tells you during the conversation. - Remember what the user tells you during the conversation.
- If you don't know something, say so honestly — never fabricate facts. - If you don't know something, say so honestly — never fabricate facts.
- If a request is ambiguous, ask a brief clarifying question before guessing. - If a request is ambiguous, ask a brief clarifying question before guessing.
- SOURCE DISTINCTION: When answering from memory or retrieved context, cite it.
When answering from your own training, use hedging: "I think", "I believe".
The user must be able to tell grounded claims from pattern-matching.
- Use the user's name if you know it. - Use the user's name if you know it.
- When you state a fact, commit to it. - When you state a fact, commit to it.
- NEVER attempt arithmetic in your head. If asked to compute anything, respond: - NEVER attempt arithmetic in your head. If asked to compute anything, respond:
@@ -78,6 +81,18 @@ HONESTY:
- Never fabricate tool output. Call the tool and wait. - Never fabricate tool output. Call the tool and wait.
- If a tool errors, report the exact error. - If a tool errors, report the exact error.
SOURCE DISTINCTION (SOUL requirement — non-negotiable):
- Every claim you make comes from one of two places: a verified source you
can point to, or your own pattern-matching. The user must be able to tell
which is which.
- When your response uses information from GROUNDED CONTEXT (memory, retrieved
documents, tool output), cite it: "From memory:", "According to [source]:".
- When you are generating from your training data alone, signal it naturally:
"I think", "My understanding is", "I believe" — never false certainty.
- If the user asks a factual question and you have no grounded source, say so:
"I don't have a verified source for this — from my training I think..."
- Prefer "I don't know" over a confident-sounding guess. Refusal over fabrication.
MEMORY (three tiers): MEMORY (three tiers):
- Tier 1: MEMORY.md (hot, always loaded) - Tier 1: MEMORY.md (hot, always loaded)
- Tier 2: memory/ vault (structured, append-only, date-stamped) - Tier 2: memory/ vault (structured, append-only, date-stamped)

View File

@@ -0,0 +1,285 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that ``run_agentic_loop`` pushes the correct sequence of events
through the real ``ws_manager`` and that connected (mock) WebSocket clients
receive every broadcast with the expected payloads.
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_run(content: str):
m = MagicMock()
m.content = content
return m
def _ws_client() -> AsyncMock:
"""Return a fake WebSocket that records sent messages."""
return AsyncMock()
def _collected_events(ws: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
return [json.loads(call.args[0]) for call in ws.send_text.call_args_list]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestAgenticLoopBroadcastSequence:
"""Events arrive at WS clients in the correct order with expected data."""
@pytest.mark.asyncio
async def test_successful_run_broadcasts_plan_steps_complete(self):
"""A successful 2-step loop emits plan_ready → 2× step_complete → task_complete."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Gathered 10 records"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Gather and summarise", max_steps=2)
assert result.status == "completed"
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_plan_ready_payload(self):
"""plan_ready contains task_id, task, steps list, and total count."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Alpha\n2. Beta"),
_mock_run("Alpha done"),
_mock_run("Beta done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Two steps")
plan_event = _collected_events(ws)[0]
assert plan_event["event"] == "agentic.plan_ready"
data = plan_event["data"]
assert data["task_id"] == result.task_id
assert data["task"] == "Two steps"
assert data["steps"] == ["Alpha", "Beta"]
assert data["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_payload(self):
"""step_complete carries step number, total, description, and result."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Step result text"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Single step", max_steps=1)
step_event = _collected_events(ws)[1]
assert step_event["event"] == "agentic.step_complete"
data = step_event["data"]
assert data["step"] == 1
assert data["total"] == 1
assert data["description"] == "Only step"
assert "Step result text" in data["result"]
@pytest.mark.asyncio
async def test_task_complete_payload(self):
"""task_complete has status, steps_completed, summary, and duration_ms."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Quick", max_steps=1)
complete_event = _collected_events(ws)[-1]
assert complete_event["event"] == "agentic.task_complete"
data = complete_event["data"]
assert data["status"] == "completed"
assert data["steps_completed"] == 1
assert isinstance(data["duration_ms"], int)
assert data["duration_ms"] >= 0
assert data["summary"]
class TestAdaptationBroadcast:
"""Adapted steps emit step_adapted events."""
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted(self):
"""A failed-then-adapted step emits agentic.step_adapted."""
mgr = WebSocketManager()
ws = _ws_client()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("disk full"),
_mock_run("Used /tmp instead"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Adapt test", max_steps=1)
events = _collected_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = next(e for e in events if e["event"] == "agentic.step_adapted")
assert adapted["data"]["error"] == "disk full"
assert adapted["data"]["adaptation"]
assert result.steps[0].status == "adapted"
class TestMultipleClients:
"""All connected clients receive every broadcast."""
@pytest.mark.asyncio
async def test_two_clients_receive_all_events(self):
mgr = WebSocketManager()
ws1 = _ws_client()
ws2 = _ws_client()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A"),
_mock_run("A done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Multi-client", max_steps=1)
events1 = _collected_events(ws1)
events2 = _collected_events(ws2)
assert len(events1) == len(events2) == 3 # plan + step + complete
assert [e["event"] for e in events1] == [e["event"] for e in events2]
class TestEventHistory:
"""Broadcasts are recorded in ws_manager event history."""
@pytest.mark.asyncio
async def test_events_appear_in_history(self):
mgr = WebSocketManager()
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("History test", max_steps=1)
history_events = [e.event for e in mgr.event_history]
assert "agentic.plan_ready" in history_events
assert "agentic.step_complete" in history_events
assert "agentic.task_complete" in history_events
class TestBroadcastGracefulDegradation:
"""Loop completes even when ws_manager is unavailable."""
@pytest.mark.asyncio
async def test_loop_succeeds_when_broadcast_fails(self):
"""ImportError from ws_manager doesn't crash the loop."""
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Do it"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch(
"infrastructure.ws_manager.handler.ws_manager",
new_callable=lambda: MagicMock,
) as broken_mgr,
):
broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down"))
result = await run_agentic_loop("Resilient task", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1

View File

@@ -99,26 +99,19 @@ class TestKeywordOverlap:
class TestEmbedText: class TestEmbedText:
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_uses_fallback_when_model_disabled(self): def test_uses_fallback_when_model_disabled(self):
emb.EMBEDDING_MODEL = False with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("test") vec = embed_text("test")
assert len(vec) == 128 # hash fallback dimension assert len(vec) == 128 # hash fallback dimension
def test_uses_model_when_available(self): def test_uses_model_when_available(self):
import numpy as np mock_encoding = MagicMock()
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock() mock_model = MagicMock()
mock_model.encode.return_value = np.array([0.1, 0.2, 0.3]) mock_model.encode.return_value = mock_encoding
emb.EMBEDDING_MODEL = mock_model
result = embed_text("test") with patch.object(emb, "_get_embedding_model", return_value=mock_model):
result = embed_text("test")
assert result == pytest.approx([0.1, 0.2, 0.3]) assert result == pytest.approx([0.1, 0.2, 0.3])
mock_model.encode.assert_called_once_with("test") mock_model.encode.assert_called_once_with("test")