[kimi] Add /api/matrix/memory/search endpoint (#678) (#740)

This commit is contained in:
2026-03-21 14:52:31 +00:00
parent 8fc8e0fc3d
commit ddadc95e55
2 changed files with 382 additions and 1 deletions

View File

@@ -26,12 +26,13 @@ from pathlib import Path
from typing import Any
import yaml
from fastapi import APIRouter, WebSocket
from fastapi import APIRouter, Request, WebSocket
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from config import settings
from infrastructure.presence import produce_bark, serialize_presence
from timmy.memory_system import search_memories
from timmy.workshop_state import PRESENCE_FILE
logger = logging.getLogger(__name__)
@@ -767,3 +768,120 @@ async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONRespon
content=thoughts,
headers={"Cache-Control": "no-cache, no-store"},
)
# ---------------------------------------------------------------------------
# Matrix Memory Search Endpoint — visitors query Timmy's memory
# ---------------------------------------------------------------------------
# Rate limiting: 1 search per 5 seconds per IP
_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5
_memory_search_last_request: dict[str, float] = {}
_MAX_MEMORY_RESULTS = 5
_MAX_MEMORY_TEXT_LENGTH = 200
def _get_client_ip(request) -> str:
"""Extract client IP from request, respecting X-Forwarded-For header."""
# Check for forwarded IP (when behind proxy)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
# Take the first IP in the chain
return forwarded.split(",")[0].strip()
# Fall back to direct client IP
if request.client:
return request.client.host
return "unknown"
def _build_matrix_memory_response(
memories: list,
) -> list[dict[str, Any]]:
"""Build the Matrix memory search response.
Formats memory entries for Matrix display:
- text: truncated to 200 characters
- relevance: 0-1 score from relevance_score
- created_at: ISO-8601 timestamp
- context_type: the memory type
Results are capped at _MAX_MEMORY_RESULTS.
"""
results = []
for mem in memories[:_MAX_MEMORY_RESULTS]:
text = mem.content
if len(text) > _MAX_MEMORY_TEXT_LENGTH:
text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..."
results.append(
{
"text": text,
"relevance": round(mem.relevance_score or 0.0, 4),
"created_at": mem.timestamp,
"context_type": mem.context_type,
}
)
return results
@matrix_router.get("/memory/search")
async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse:
"""Search Timmy's memory for relevant snippets.
Allows Matrix visitors to query Timmy's memory ("what do you remember
about sovereignty?"). Results appear as floating crystal-ball text
in the Workshop room.
Query params:
- q: Search query text (required)
Response: JSON array of memory objects:
- text: Memory content (truncated to 200 chars)
- relevance: Similarity score 0-1
- created_at: ISO-8601 timestamp
- context_type: Memory type (conversation, fact, etc.)
Rate limited to 1 search per 5 seconds per IP.
Returns:
- 200: JSON array of memory results (max 5)
- 400: Missing or empty query parameter
- 429: Rate limit exceeded
"""
# Validate query parameter
query = q.strip() if q else ""
if not query:
return JSONResponse(
status_code=400,
content={"error": "Query parameter 'q' is required"},
)
# Rate limiting check by IP
client_ip = _get_client_ip(request)
now = time.time()
last_request = _memory_search_last_request.get(client_ip, 0)
time_since_last = now - last_request
if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS:
retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded. Try again later."},
headers={"Retry-After": str(int(retry_after) + 1)},
)
# Record this request
_memory_search_last_request[client_ip] = now
# Search memories
try:
memories = search_memories(query, limit=_MAX_MEMORY_RESULTS)
results = _build_matrix_memory_response(memories)
except Exception as exc:
logger.warning("Memory search failed: %s", exc)
results = []
return JSONResponse(
content=results,
headers={"Cache-Control": "no-cache, no-store"},
)

View File

@@ -1411,3 +1411,266 @@ class TestMatrixBarkEndpoint:
assert resp.status_code == 200
data = resp.json()
assert len(data["data"]["text"]) == 280
# ---------------------------------------------------------------------------
# Matrix Memory Search Endpoint (/api/matrix/memory/search)
# ---------------------------------------------------------------------------
class TestMatrixMemorySearchEndpoint:
"""Tests for the Matrix memory search endpoint."""
def setup_method(self):
"""Reset rate limiting state before each test."""
from dashboard.routes.world import _memory_search_last_request
_memory_search_last_request.clear()
def teardown_method(self):
"""Clean up rate limiting state after each test."""
from dashboard.routes.world import _memory_search_last_request
_memory_search_last_request.clear()
def test_memory_search_requires_query(self, matrix_client):
"""GET /api/matrix/memory/search returns 400 if q is missing."""
resp = matrix_client.get("/api/matrix/memory/search")
assert resp.status_code == 400
data = resp.json()
assert "error" in data
assert "'q' is required" in data["error"]
def test_memory_search_rejects_empty_query(self, matrix_client):
"""GET /api/matrix/memory/search returns 400 if q is empty."""
resp = matrix_client.get("/api/matrix/memory/search?q=")
assert resp.status_code == 400
data = resp.json()
assert "error" in data
def test_memory_search_rejects_whitespace_query(self, matrix_client):
"""GET /api/matrix/memory/search returns 400 if q is whitespace."""
resp = matrix_client.get("/api/matrix/memory/search?q= ")
assert resp.status_code == 400
data = resp.json()
assert "error" in data
def test_memory_search_returns_json_array(self, matrix_client):
"""GET /api/matrix/memory/search returns JSON array of results."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = [
MagicMock(
content="Bitcoin is a decentralized digital currency",
relevance_score=0.92,
timestamp="2026-03-21T10:00:00Z",
context_type="conversation",
),
]
resp = matrix_client.get("/api/matrix/memory/search?q=bitcoin")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
assert len(data) == 1
assert resp.headers["cache-control"] == "no-cache, no-store"
def test_memory_search_result_structure(self, matrix_client):
"""Each result has required fields with correct types."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = [
MagicMock(
content="Bitcoin sovereignty content here",
relevance_score=0.85,
timestamp="2026-03-21T10:00:00Z",
context_type="fact",
),
]
resp = matrix_client.get("/api/matrix/memory/search?q=sovereignty")
data = resp.json()
assert len(data) == 1
result = data[0]
assert "text" in result
assert "relevance" in result
assert "created_at" in result
assert "context_type" in result
assert isinstance(result["text"], str)
assert isinstance(result["relevance"], (int, float))
assert isinstance(result["created_at"], str)
assert isinstance(result["context_type"], str)
def test_memory_search_text_truncation(self, matrix_client):
"""Text is truncated to 200 characters with ellipsis."""
long_content = "A" * 300
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = [
MagicMock(
content=long_content,
relevance_score=0.75,
timestamp="2026-03-21T10:00:00Z",
context_type="conversation",
),
]
resp = matrix_client.get("/api/matrix/memory/search?q=test")
data = resp.json()
assert len(data[0]["text"]) == 203 # 200 chars + "..."
assert data[0]["text"].endswith("...")
def test_memory_search_relevance_rounding(self, matrix_client):
"""Relevance score is rounded to 4 decimal places."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = [
MagicMock(
content="Test content",
relevance_score=0.123456789,
timestamp="2026-03-21T10:00:00Z",
context_type="conversation",
),
]
resp = matrix_client.get("/api/matrix/memory/search?q=test")
data = resp.json()
# Should be rounded to 4 decimal places
assert data[0]["relevance"] == 0.1235
def test_memory_search_max_results(self, matrix_client):
"""Endpoint returns max 5 results."""
with patch("dashboard.routes.world.search_memories") as mock_search:
# Return more than 5 results
mock_search.return_value = [
MagicMock(
content=f"Memory {i}",
relevance_score=0.9 - (i * 0.05),
timestamp="2026-03-21T10:00:00Z",
context_type="conversation",
)
for i in range(10)
]
resp = matrix_client.get("/api/matrix/memory/search?q=test")
data = resp.json()
# Should be limited to 5 results
assert len(data) <= 5
def test_memory_search_passes_limit_to_search(self, matrix_client):
"""Endpoint passes correct limit to search_memories."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
matrix_client.get("/api/matrix/memory/search?q=bitcoin")
mock_search.assert_called_once_with("bitcoin", limit=5)
def test_memory_search_empty_results(self, matrix_client):
"""Endpoint returns empty array when no memories found."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
resp = matrix_client.get("/api/matrix/memory/search?q=nonexistent")
assert resp.status_code == 200
assert resp.json() == []
def test_memory_search_graceful_degradation(self, matrix_client):
"""Returns empty array when search fails."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.side_effect = RuntimeError("Database error")
resp = matrix_client.get("/api/matrix/memory/search?q=test")
assert resp.status_code == 200
assert resp.json() == []
def test_memory_search_rate_limit_blocks_second_request(self, matrix_client):
"""Second request within 5 seconds returns 429."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
# First request should succeed
resp1 = matrix_client.get("/api/matrix/memory/search?q=first")
assert resp1.status_code == 200
# Second request within 5 seconds should be rate limited
resp2 = matrix_client.get("/api/matrix/memory/search?q=second")
assert resp2.status_code == 429
data = resp2.json()
assert "error" in data
assert "Rate limit" in data["error"]
def test_memory_search_rate_limit_per_ip(self, matrix_client):
"""Rate limiting is per-IP address."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
# First request from one IP
resp1 = matrix_client.get(
"/api/matrix/memory/search?q=test",
headers={"X-Forwarded-For": "1.2.3.4"},
)
assert resp1.status_code == 200
# Same request from different IP should succeed
resp2 = matrix_client.get(
"/api/matrix/memory/search?q=test",
headers={"X-Forwarded-For": "5.6.7.8"},
)
assert resp2.status_code == 200
def test_memory_search_rate_limit_uses_client_host(self, matrix_client):
"""Rate limiting falls back to client.host when no X-Forwarded-For."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
# First request
resp1 = matrix_client.get("/api/matrix/memory/search?q=first")
assert resp1.status_code == 200
# Second request should be rate limited (same client)
resp2 = matrix_client.get("/api/matrix/memory/search?q=second")
assert resp2.status_code == 429
def test_memory_search_rate_limit_retry_after_header(self, matrix_client):
"""429 response includes Retry-After header."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
# First request
matrix_client.get("/api/matrix/memory/search?q=first")
# Second request (rate limited)
resp = matrix_client.get("/api/matrix/memory/search?q=second")
assert resp.status_code == 429
assert "Retry-After" in resp.headers
retry_after = int(resp.headers["Retry-After"])
assert 1 <= retry_after <= 6 # Should be around 5 seconds
def test_memory_search_multiple_results_ordering(self, matrix_client):
"""Results maintain order from search_memories."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = [
MagicMock(
content="First memory",
relevance_score=0.95,
timestamp="2026-03-21T10:00:00Z",
context_type="fact",
),
MagicMock(
content="Second memory",
relevance_score=0.85,
timestamp="2026-03-21T10:01:00Z",
context_type="conversation",
),
]
resp = matrix_client.get("/api/matrix/memory/search?q=test")
data = resp.json()
assert len(data) == 2
assert data[0]["text"] == "First memory"
assert data[1]["text"] == "Second memory"
def test_memory_search_url_encoding(self, matrix_client):
"""Query parameter can be URL encoded."""
with patch("dashboard.routes.world.search_memories") as mock_search:
mock_search.return_value = []
resp = matrix_client.get("/api/matrix/memory/search?q=bitcoin%20sovereignty")
assert resp.status_code == 200
mock_search.assert_called_once_with("bitcoin sovereignty", limit=5)