Compare commits

...

4 Commits

Author SHA1 Message Date
kimi
f32b5a9e4d fix: remove stale model_size kwarg from CLI create_timmy() calls
The model_size parameter was removed from create_timmy() but three CLI
commands (think, chat, status) still passed it, causing TypeError crashes.

Fixes #604

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 12:48:00 -04:00
6214ad3225 refactor: extract helpers from run_self_tests() (#601)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:40:44 -04:00
5f5da2163f [loop-cycle] refactor: extract helpers from _handle_tool_confirmation (#592) (#600) 2026-03-20 12:32:24 -04:00
0029c34bb1 refactor: break up search_thoughts() into focused helpers (#597)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:26:51 -04:00
5 changed files with 137 additions and 118 deletions

View File

@@ -37,6 +37,35 @@ def _is_interactive() -> bool:
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _prompt_interactive(req, tool_name: str, tool_args: dict) -> None:
"""Display tool details and prompt the human for approval."""
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
if typer.confirm("Allow this action?", default=False):
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
"""Auto-approve allowlisted tools; reject everything else."""
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
@@ -51,6 +80,7 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
decide = _prompt_interactive if interactive else _decide_autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
@@ -66,39 +96,10 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
for req in reqs:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
decide(req, tool_name, tool_args)
# Resume the run so the agent sees the confirmation result
try:
@@ -138,7 +139,7 @@ def think(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
@@ -201,7 +202,7 @@ def chat(
session_id = str(uuid.uuid4())
else:
session_id = _CLI_SESSION_ID
timmy = create_timmy(backend=backend, model_size=model_size, session_id=session_id)
timmy = create_timmy(backend=backend, session_id=session_id)
# Use agent.run() so we can intercept paused runs for tool confirmation.
run_output = timmy.run(message_str, stream=False, session_id=session_id)
@@ -278,7 +279,7 @@ def status(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Print Timmy's operational status."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(STATUS_PROMPT, stream=False, session_id=_CLI_SESSION_ID)

View File

@@ -21,6 +21,10 @@ Usage::
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from PIL import ImageDraw
import os
import shutil
import sqlite3
@@ -270,7 +274,7 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return f"Failed to create issue via MCP: {exc}"
def _draw_background(draw: "ImageDraw.ImageDraw", size: int) -> None:
def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None:
"""Draw radial gradient background with concentric circles."""
for i in range(size // 2, 0, -4):
g = int(25 + (i / (size // 2)) * 30)
@@ -280,7 +284,7 @@ def _draw_background(draw: "ImageDraw.ImageDraw", size: int) -> None:
)
def _draw_wizard(draw: "ImageDraw.ImageDraw") -> None:
def _draw_wizard(draw: ImageDraw.ImageDraw) -> None:
"""Draw wizard hat, face, eyes, smile, monogram, and robe."""
hat_color = (100, 50, 160) # purple
hat_outline = (180, 130, 255)
@@ -314,7 +318,7 @@ def _draw_wizard(draw: "ImageDraw.ImageDraw") -> None:
)
def _draw_stars(draw: "ImageDraw.ImageDraw") -> None:
def _draw_stars(draw: ImageDraw.ImageDraw) -> None:
"""Draw decorative gold stars around the wizard hat."""
gold = (220, 190, 50)
for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]:

View File

@@ -1277,6 +1277,53 @@ class ThinkingEngine:
logger.debug("Failed to broadcast thought: %s", exc)
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
@@ -1294,58 +1341,17 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 50))
try:
engine = thinking_engine
db_path = engine._db_path
# Build query with optional seed_type filter
with _get_conn(db_path) as conn:
if seed_type:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
# Format results
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)

View File

@@ -326,6 +326,46 @@ def get_live_system_status() -> dict[str, Any]:
return result
def _build_pytest_cmd(venv_python: Path, scope: str) -> list[str]:
"""Build the pytest command list for the given scope."""
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
cmd.append(scope)
return cmd
def _parse_pytest_output(output: str) -> dict[str, int]:
"""Extract passed/failed/error counts from pytest output."""
import re
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
return {"passed": passed, "failed": failed, "errors": errors}
def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[str, Any]:
"""Run Timmy's own test suite and report results.
@@ -349,49 +389,17 @@ def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[s
if not venv_python.exists():
return {"success": False, "error": f"No venv found at {venv_python}"}
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
# Unit tests only — skip functional/e2e/integration
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
# Specific path
cmd.append(scope)
cmd = _build_pytest_cmd(venv_python, scope)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120, cwd=repo)
output = result.stdout + result.stderr
# Parse pytest output for counts
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
import re
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
counts = _parse_pytest_output(output)
return {
"success": result.returncode == 0,
"passed": passed,
"failed": failed,
"errors": errors,
"total": passed + failed + errors,
**counts,
"total": counts["passed"] + counts["failed"] + counts["errors"],
"return_code": result.returncode,
"summary": output[-2000:] if len(output) > 2000 else output,
}

View File

@@ -55,14 +55,14 @@ def test_think_sends_topic_to_agent():
)
def test_think_passes_model_size_option():
"""think --model-size 70b must forward the model size to create_timmy."""
def test_think_ignores_model_size_option():
"""think --model-size is accepted but not forwarded to create_timmy."""
mock_timmy = MagicMock()
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["think", "topic", "--model-size", "70b"])
mock_create.assert_called_once_with(backend=None, model_size="70b", session_id="cli")
mock_create.assert_called_once_with(backend=None, session_id="cli")
# ---------------------------------------------------------------------------