1
0

Compare commits

...

6 Commits

Author SHA1 Message Date
kimi
e22b572b1d refactor: break up search_thoughts() into focused helpers
Extract _query_thoughts() and _format_thought_results() from the 73-line
search_thoughts() function, keeping each piece focused on a single
responsibility. Also fix pre-existing F821 lint errors in mcp_tools.py.

Fixes #594

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 12:27:44 -04:00
2577b71207 fix: capture thought timestamp at cycle start, not after LLM call (#590)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:13:48 -04:00
1a8b8ecaed [loop-cycle-1235] refactor: break up _migrate_schema() into focused helpers (#591) (#595) 2026-03-20 12:07:15 -04:00
d821e76589 [loop-cycle-1234] refactor: break up _generate_avatar_image (#563) (#589) 2026-03-20 11:57:53 -04:00
bc010ecfba [loop-cycle-1233] refactor: add docstrings to calm.py route handlers (#569) (#585) 2026-03-20 11:44:06 -04:00
faf6c1a5f1 [loop-cycle-1233] refactor: break up BaseAgent.run() (#561) (#584) 2026-03-20 11:24:36 -04:00
6 changed files with 327 additions and 219 deletions

View File

@@ -19,14 +19,17 @@ router = APIRouter(tags=["calm"])
# Helper functions for state machine logic
def get_now_task(db: Session) -> Task | None:
"""Return the single active NOW task, or None."""
return db.query(Task).filter(Task.state == TaskState.NOW).first()
def get_next_task(db: Session) -> Task | None:
"""Return the single queued NEXT task, or None."""
return db.query(Task).filter(Task.state == TaskState.NEXT).first()
def get_later_tasks(db: Session) -> list[Task]:
"""Return all LATER tasks ordered by MIT flag then sort_order."""
return (
db.query(Task)
.filter(Task.state == TaskState.LATER)
@@ -36,6 +39,12 @@ def get_later_tasks(db: Session) -> list[Task]:
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
- At most one NOW task (extras demoted to NEXT).
- If no NOW, promote NEXT -> NOW.
- If no NEXT, promote highest-priority LATER -> NEXT.
"""
# Ensure only one NOW task exists. If multiple, demote extras to NEXT.
now_tasks = db.query(Task).filter(Task.state == TaskState.NOW).all()
if len(now_tasks) > 1:
@@ -74,6 +83,7 @@ def promote_tasks(db: Session):
# Endpoints
@router.get("/calm", response_class=HTMLResponse)
async def get_calm_view(request: Request, db: Session = Depends(get_db)):
"""Render the main CALM dashboard with NOW/NEXT/LATER counts."""
now_task = get_now_task(db)
next_task = get_next_task(db)
later_tasks_count = len(get_later_tasks(db))
@@ -90,6 +100,7 @@ async def get_calm_view(request: Request, db: Session = Depends(get_db)):
@router.get("/calm/ritual/morning", response_class=HTMLResponse)
async def get_morning_ritual_form(request: Request):
"""Render the morning ritual intake form."""
return templates.TemplateResponse(request, "calm/morning_ritual_form.html", {})
@@ -102,6 +113,7 @@ async def post_morning_ritual(
mit3_title: str = Form(None),
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
@@ -173,6 +185,7 @@ async def post_morning_ritual(
@router.get("/calm/ritual/evening", response_class=HTMLResponse)
async def get_evening_ritual_form(request: Request, db: Session = Depends(get_db)):
"""Render the evening ritual form for today's journal entry."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -189,6 +202,7 @@ async def post_evening_ritual(
gratitude: str = Form(None),
energy_level: int = Form(None),
):
"""Process evening ritual: save reflection/gratitude, archive active tasks."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -223,6 +237,7 @@ async def create_new_task(
is_mit: bool = Form(False),
certainty: TaskCertainty = Form(TaskCertainty.SOFT),
):
"""Create a new task in LATER state and return updated count."""
task = Task(
title=title,
description=description,
@@ -247,6 +262,7 @@ async def start_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Move a task to NOW state, demoting the current NOW to NEXT."""
current_now_task = get_now_task(db)
if current_now_task and current_now_task.id != task_id:
current_now_task.state = TaskState.NEXT # Demote current NOW to NEXT
@@ -281,6 +297,7 @@ async def complete_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Mark a task as DONE and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -309,6 +326,7 @@ async def defer_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Defer a task and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -333,6 +351,7 @@ async def defer_task(
@router.get("/calm/partials/later_tasks_list", response_class=HTMLResponse)
async def get_later_tasks_list(request: Request, db: Session = Depends(get_db)):
"""Render the expandable list of LATER tasks."""
later_tasks = get_later_tasks(db)
return templates.TemplateResponse(
"calm/partials/later_tasks_list.html",
@@ -348,6 +367,7 @@ async def reorder_tasks(
later_task_ids: str = Form(""),
next_task_id: int | None = Form(None),
):
"""Reorder LATER tasks and optionally promote one to NEXT."""
# Reorder LATER tasks
if later_task_ids:
ids_in_order = [int(x.strip()) for x in later_task_ids.split(",") if x.strip()]

View File

@@ -119,75 +119,84 @@ class BaseAgent(ABC):
"""
pass
async def run(self, message: str) -> str:
"""Run the agent with a message.
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_TRANSIENT = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
Retries on transient failures (connection errors, timeouts) with
exponential backoff. GPU contention from concurrent Ollama
requests causes ReadError / ReadTimeout — these are transient
and should be retried, not raised immediately (#70).
async def run(self, message: str, *, max_retries: int = 3) -> str:
"""Run the agent with a message, retrying on transient failures.
Returns:
Agent response
GPU contention from concurrent Ollama requests causes ReadError /
ReadTimeout — these are transient and retried with exponential
backoff (#70).
"""
max_retries = 3
last_exception = None
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_transient = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
response = await self._run_with_retries(message, max_retries)
await self._emit_response_event(message, response)
return response
async def _run_with_retries(self, message: str, max_retries: int) -> str:
"""Execute agent.run() with retry logic for transient errors."""
for attempt in range(1, max_retries + 1):
try:
result = self.agent.run(message, stream=False)
response = result.content if hasattr(result, "content") else str(result)
break # Success, exit the retry loop
except _transient as exc:
last_exception = exc
if attempt < max_retries:
# Contention backoff — longer waits because the GPU
# needs time to finish the other request.
wait = min(2**attempt, 16)
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting %ds before retry...",
attempt,
max_retries,
type(exc).__name__,
wait,
)
await asyncio.sleep(wait)
else:
logger.error(
"Ollama unreachable after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
return result.content if hasattr(result, "content") else str(result)
except self._TRANSIENT as exc:
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=True,
)
await asyncio.sleep(min(2**attempt, 16))
except Exception as exc:
last_exception = exc
if attempt < max_retries:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
else:
logger.error(
"Agent run failed after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=False,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
# Unreachable — _handle_retry_or_raise raises on last attempt.
raise RuntimeError("retry loop exited unexpectedly") # pragma: no cover
# Emit completion event
@staticmethod
def _handle_retry_or_raise(
exc: Exception,
attempt: int,
max_retries: int,
*,
transient: bool,
) -> None:
"""Log a retry warning or raise after exhausting attempts."""
if attempt < max_retries:
if transient:
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting before retry...",
attempt,
max_retries,
type(exc).__name__,
)
else:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
else:
label = "Ollama unreachable" if transient else "Agent run failed"
logger.error("%s after %d attempts: %s", label, max_retries, exc)
raise exc
async def _emit_response_event(self, message: str, response: str) -> None:
"""Publish a completion event to the event bus if connected."""
if self.event_bus:
await self.event_bus.publish(
Event(
@@ -197,8 +206,6 @@ class BaseAgent(ABC):
)
)
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools

View File

@@ -270,20 +270,8 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return f"Failed to create issue via MCP: {exc}"
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
# Background gradient effect — concentric circles
def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None: # noqa: F821
"""Draw radial gradient background with concentric circles."""
for i in range(size // 2, 0, -4):
g = int(25 + (i / (size // 2)) * 30)
draw.ellipse(
@@ -291,33 +279,45 @@ def _generate_avatar_image() -> bytes:
fill=(10, g, 20),
)
# Wizard hat (triangle)
def _draw_wizard(draw: ImageDraw.ImageDraw) -> None: # noqa: F821
"""Draw wizard hat, face, eyes, smile, monogram, and robe."""
hat_color = (100, 50, 160) # purple
draw.polygon(
[(256, 40), (160, 220), (352, 220)],
fill=hat_color,
outline=(180, 130, 255),
)
hat_outline = (180, 130, 255)
gold = (220, 190, 50)
pupil = (30, 30, 60)
# Hat brim
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=(180, 130, 255))
# Hat + brim
draw.polygon([(256, 40), (160, 220), (352, 220)], fill=hat_color, outline=hat_outline)
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=hat_outline)
# Face circle
# Face
draw.ellipse([190, 220, 322, 370], fill=(60, 180, 100), outline=(80, 220, 120))
# Eyes
# Eyes (whites + pupils)
draw.ellipse([220, 275, 248, 310], fill=(255, 255, 255))
draw.ellipse([264, 275, 292, 310], fill=(255, 255, 255))
draw.ellipse([228, 285, 242, 300], fill=(30, 30, 60))
draw.ellipse([272, 285, 286, 300], fill=(30, 30, 60))
draw.ellipse([228, 285, 242, 300], fill=pupil)
draw.ellipse([272, 285, 286, 300], fill=pupil)
# Smile
draw.arc([225, 300, 287, 355], start=10, end=170, fill=(30, 30, 60), width=3)
draw.arc([225, 300, 287, 355], start=10, end=170, fill=pupil, width=3)
# Stars around the hat
# "T" monogram on hat
draw.text((243, 100), "T", fill=gold)
# Robe
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _draw_stars(draw: ImageDraw.ImageDraw) -> None: # noqa: F821
"""Draw decorative gold stars around the wizard hat."""
gold = (220, 190, 50)
star_positions = [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]
for sx, sy in star_positions:
for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]:
r = 8
draw.polygon(
[
@@ -333,18 +333,26 @@ def _generate_avatar_image() -> bytes:
fill=gold,
)
# "T" monogram on the hat
draw.text((243, 100), "T", fill=gold)
# Robe / body
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
import io
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
_draw_background(draw, size)
_draw_wizard(draw)
_draw_stars(draw)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()

View File

@@ -78,83 +78,88 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists but new one doesn't fully)
if not has_memories:
if "memories" not in tables:
logger.info("Migration: Creating unified memories table")
# Schema will be created above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old facts table
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
# Schema will be created by _ensure_schema above
conn.commit()
return
_migrate_episodes(conn, tables)
_migrate_chunks(conn, tables)
_drop_legacy_tables(conn, tables)
conn.commit()
def _migrate_episodes(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate episodes table rows into the unified memories table."""
if "episodes" not in tables:
return
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate chunks table rows into the unified memories table as vault_chunk."""
if "chunks" not in tables:
return
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_tables(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Drop old facts table if it exists."""
if "facts" not in tables:
return
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")

View File

@@ -341,6 +341,11 @@ class ThinkingEngine:
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
@@ -352,7 +357,7 @@ class ThinkingEngine:
if not content:
return None
thought = self._store_thought(content, seed_type)
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
@@ -1173,14 +1178,25 @@ class ThinkingEngine:
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(self, content: str, seed_type: str) -> Thought:
"""Persist a thought to SQLite."""
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=datetime.now(UTC).isoformat(),
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
@@ -1261,6 +1277,52 @@ class ThinkingEngine:
logger.debug("Failed to broadcast thought: %s", exc)
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Fetch thought rows matching *query* with optional *seed_type* filter."""
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
def _format_thought_results(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable summary string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
@@ -1278,58 +1340,17 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 50))
try:
engine = thinking_engine
db_path = engine._db_path
# Build query with optional seed_type filter
with _get_conn(db_path) as conn:
if seed_type:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
# Format results
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
return _format_thought_results(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)

View File

@@ -361,6 +361,53 @@ class TestRun:
assert response == "ok"
# ── _handle_retry_or_raise ────────────────────────────────────────────────
class TestHandleRetryOrRaise:
def test_raises_on_last_attempt(self):
BaseAgent = _make_base_class()
with pytest.raises(ValueError, match="boom"):
BaseAgent._handle_retry_or_raise(
ValueError("boom"),
attempt=3,
max_retries=3,
transient=False,
)
def test_raises_on_last_attempt_transient(self):
BaseAgent = _make_base_class()
exc = httpx.ConnectError("down")
with pytest.raises(httpx.ConnectError):
BaseAgent._handle_retry_or_raise(
exc,
attempt=3,
max_retries=3,
transient=True,
)
def test_no_raise_on_early_attempt(self):
BaseAgent = _make_base_class()
# Should return None (no raise) on non-final attempt
result = BaseAgent._handle_retry_or_raise(
ValueError("retry me"),
attempt=1,
max_retries=3,
transient=False,
)
assert result is None
def test_no_raise_on_early_transient(self):
BaseAgent = _make_base_class()
result = BaseAgent._handle_retry_or_raise(
httpx.ReadTimeout("busy"),
attempt=2,
max_retries=3,
transient=True,
)
assert result is None
# ── get_capabilities / get_status ────────────────────────────────────────────