diff --git a/src/timmy/mcp_tools.py b/src/timmy/mcp_tools.py index 62b8b9a..3971406 100644 --- a/src/timmy/mcp_tools.py +++ b/src/timmy/mcp_tools.py @@ -21,6 +21,10 @@ Usage:: from __future__ import annotations import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from PIL import ImageDraw import os import shutil import sqlite3 @@ -270,7 +274,7 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = " return f"Failed to create issue via MCP: {exc}" -def _draw_background(draw: "ImageDraw.ImageDraw", size: int) -> None: +def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None: """Draw radial gradient background with concentric circles.""" for i in range(size // 2, 0, -4): g = int(25 + (i / (size // 2)) * 30) @@ -280,7 +284,7 @@ def _draw_background(draw: "ImageDraw.ImageDraw", size: int) -> None: ) -def _draw_wizard(draw: "ImageDraw.ImageDraw") -> None: +def _draw_wizard(draw: ImageDraw.ImageDraw) -> None: """Draw wizard hat, face, eyes, smile, monogram, and robe.""" hat_color = (100, 50, 160) # purple hat_outline = (180, 130, 255) @@ -314,7 +318,7 @@ def _draw_wizard(draw: "ImageDraw.ImageDraw") -> None: ) -def _draw_stars(draw: "ImageDraw.ImageDraw") -> None: +def _draw_stars(draw: ImageDraw.ImageDraw) -> None: """Draw decorative gold stars around the wizard hat.""" gold = (220, 190, 50) for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]: diff --git a/src/timmy/thinking.py b/src/timmy/thinking.py index 72866ab..6bab20c 100644 --- a/src/timmy/thinking.py +++ b/src/timmy/thinking.py @@ -1277,6 +1277,53 @@ class ThinkingEngine: logger.debug("Failed to broadcast thought: %s", exc) +def _query_thoughts( + db_path: Path, query: str, seed_type: str | None, limit: int +) -> list[sqlite3.Row]: + """Run the thought-search SQL and return matching rows.""" + pattern = f"%{query}%" + with _get_conn(db_path) as conn: + if seed_type: + return conn.execute( + """ + SELECT id, content, seed_type, created_at + FROM thoughts + WHERE content LIKE ? AND seed_type = ? + ORDER BY created_at DESC + LIMIT ? + """, + (pattern, seed_type, limit), + ).fetchall() + return conn.execute( + """ + SELECT id, content, seed_type, created_at + FROM thoughts + WHERE content LIKE ? + ORDER BY created_at DESC + LIMIT ? + """, + (pattern, limit), + ).fetchall() + + +def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str: + """Format thought rows into a human-readable string.""" + lines = [f'Found {len(rows)} thought(s) matching "{query}":'] + if seed_type: + lines[0] += f' [seed_type="{seed_type}"]' + lines.append("") + + for row in rows: + ts = datetime.fromisoformat(row["created_at"]) + local_ts = ts.astimezone() + time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0") + seed = row["seed_type"] + content = row["content"].replace("\n", " ") # Flatten newlines for display + lines.append(f"[{time_str}] ({seed}) {content[:150]}") + + return "\n".join(lines) + + def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str: """Search Timmy's thought history for reflections matching a query. @@ -1294,58 +1341,17 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) - Formatted string with matching thoughts, newest first, including timestamps and seed types. Returns a helpful message if no matches found. """ - # Clamp limit to reasonable bounds limit = max(1, min(limit, 50)) try: - engine = thinking_engine - db_path = engine._db_path - - # Build query with optional seed_type filter - with _get_conn(db_path) as conn: - if seed_type: - rows = conn.execute( - """ - SELECT id, content, seed_type, created_at - FROM thoughts - WHERE content LIKE ? AND seed_type = ? - ORDER BY created_at DESC - LIMIT ? - """, - (f"%{query}%", seed_type, limit), - ).fetchall() - else: - rows = conn.execute( - """ - SELECT id, content, seed_type, created_at - FROM thoughts - WHERE content LIKE ? - ORDER BY created_at DESC - LIMIT ? - """, - (f"%{query}%", limit), - ).fetchall() + rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit) if not rows: if seed_type: return f'No thoughts found matching "{query}" with seed_type="{seed_type}".' return f'No thoughts found matching "{query}".' - # Format results - lines = [f'Found {len(rows)} thought(s) matching "{query}":'] - if seed_type: - lines[0] += f' [seed_type="{seed_type}"]' - lines.append("") - - for row in rows: - ts = datetime.fromisoformat(row["created_at"]) - local_ts = ts.astimezone() - time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0") - seed = row["seed_type"] - content = row["content"].replace("\n", " ") # Flatten newlines for display - lines.append(f"[{time_str}] ({seed}) {content[:150]}") - - return "\n".join(lines) + return _format_thought_rows(rows, query, seed_type) except Exception as exc: logger.warning("Thought search failed: %s", exc)