Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
3de7db770f refactor: break up think_once() into _generate_thought() and _finalize_thought()
All checks were successful
Tests / lint (pull_request) Successful in 4s
Tests / test (pull_request) Successful in 1m11s
Extracts the dedup retry loop and post-hook/broadcast pipeline into
focused helpers, reducing think_once() from 118 lines to ~20.

Fixes #513

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:41:28 -04:00
22 changed files with 1281 additions and 1209 deletions

View File

@@ -18,19 +18,13 @@ Exit codes:
from __future__ import annotations from __future__ import annotations
import json import json
import os
import sys import sys
import time import time
import urllib.request
from pathlib import Path from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json" QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json" IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Backoff sequence: 60s, 120s, 240s, 600s max # Backoff sequence: 60s, 120s, 240s, 600s max
BACKOFF_BASE = 60 BACKOFF_BASE = 60
@@ -38,81 +32,19 @@ BACKOFF_MAX = 600
BACKOFF_MULTIPLIER = 2 BACKOFF_MULTIPLIER = 2
def _get_token() -> str:
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "").strip()
if not token and TOKEN_FILE.exists():
token = TOKEN_FILE.read_text().strip()
return token
def _fetch_open_issue_numbers() -> set[int] | None:
"""Fetch open issue numbers from Gitea. Returns None on failure."""
token = _get_token()
if not token:
return None
try:
numbers: set[int] = set()
page = 1
while True:
url = (
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
f"?state=open&type=issues&limit=50&page={page}"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
if not data:
break
for issue in data:
numbers.add(issue["number"])
if len(data) < 50:
break
page += 1
return numbers
except Exception:
return None
def load_queue() -> list[dict]: def load_queue() -> list[dict]:
"""Load queue.json and return ready items, filtering out closed issues.""" """Load queue.json and return ready items."""
if not QUEUE_FILE.exists(): if not QUEUE_FILE.exists():
return [] return []
try: try:
data = json.loads(QUEUE_FILE.read_text()) data = json.loads(QUEUE_FILE.read_text())
if not isinstance(data, list): if isinstance(data, list):
return [] return [item for item in data if item.get("ready")]
ready = [item for item in data if item.get("ready")] return []
if not ready:
return []
# Filter out issues that are no longer open (auto-hygiene)
open_numbers = _fetch_open_issue_numbers()
if open_numbers is not None:
before = len(ready)
ready = [item for item in ready if item.get("issue") in open_numbers]
removed = before - len(ready)
if removed > 0:
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError): except (json.JSONDecodeError, OSError):
return [] return []
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
"""Rewrite queue.json without closed issues."""
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
try:
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
except OSError:
pass
def load_idle_state() -> dict: def load_idle_state() -> dict:
"""Load persistent idle state.""" """Load persistent idle state."""
if not IDLE_STATE_FILE.exists(): if not IDLE_STATE_FILE.exists():

View File

@@ -254,7 +254,6 @@ class Settings(BaseSettings):
# When enabled, the agent starts an internal thought loop on server start. # When enabled, the agent starts an internal thought loop on server start.
thinking_enabled: bool = True thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts thinking_interval_seconds: int = 300 # 5 minutes between thoughts
thinking_timeout_seconds: int = 120 # max wall-clock time per thinking cycle
thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought
thinking_issue_every: int = 20 # file Gitea issues from thoughts every Nth thought thinking_issue_every: int = 20 # file Gitea issues from thoughts every Nth thought
thinking_memory_check_every: int = 50 # check memory status every Nth thought thinking_memory_check_every: int = 50 # check memory status every Nth thought

View File

@@ -155,17 +155,7 @@ async def _thinking_scheduler() -> None:
while True: while True:
try: try:
if settings.thinking_enabled: if settings.thinking_enabled:
await asyncio.wait_for( await thinking_engine.think_once()
thinking_engine.think_once(),
timeout=settings.thinking_timeout_seconds,
)
except TimeoutError:
logger.warning(
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc: except Exception as exc:
logger.error("Thinking scheduler error: %s", exc) logger.error("Thinking scheduler error: %s", exc)
@@ -185,10 +175,7 @@ async def _loop_qa_scheduler() -> None:
while True: while True:
try: try:
if settings.loop_qa_enabled: if settings.loop_qa_enabled:
result = await asyncio.wait_for( result = await loop_qa_orchestrator.run_next_test()
loop_qa_orchestrator.run_next_test(),
timeout=settings.thinking_timeout_seconds,
)
if result: if result:
status = "PASS" if result["success"] else "FAIL" status = "PASS" if result["success"] else "FAIL"
logger.info( logger.info(
@@ -197,13 +184,6 @@ async def _loop_qa_scheduler() -> None:
status, status,
result.get("details", "")[:80], result.get("details", "")[:80],
) )
except TimeoutError:
logger.warning(
"Loop QA test timed out after %ds",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc: except Exception as exc:
logger.error("Loop QA scheduler error: %s", exc) logger.error("Loop QA scheduler error: %s", exc)

View File

@@ -71,20 +71,21 @@ async def clear_history(request: Request):
) )
def _validate_message(message: str) -> str: @router.post("/default/chat", response_class=HTMLResponse)
"""Strip and validate chat input; raise HTTPException on bad input.""" async def chat_agent(request: Request, message: str = Form(...)):
from fastapi import HTTPException """Chat — synchronous response with native Agno tool confirmation."""
message = message.strip() message = message.strip()
if not message: if not message:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="Message cannot be empty") raise HTTPException(status_code=400, detail="Message cannot be empty")
if len(message) > MAX_MESSAGE_LENGTH: if len(message) > MAX_MESSAGE_LENGTH:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="Message too long") raise HTTPException(status_code=422, detail="Message too long")
return message
# Record user activity so the thinking engine knows we're not idle
def _record_user_activity() -> None:
"""Notify the thinking engine that the user is active."""
try: try:
from timmy.thinking import thinking_engine from timmy.thinking import thinking_engine
@@ -92,67 +93,6 @@ def _record_user_activity() -> None:
except Exception: except Exception:
logger.debug("Failed to record user input for thinking engine") logger.debug("Failed to record user input for thinking engine")
def _extract_tool_actions(run_output) -> list[dict]:
"""If Agno paused the run for tool confirmation, build approval items."""
from timmy.approvals import create_item
tool_actions: list[dict] = []
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if not (is_paused and getattr(run_output, "active_requirements", None)):
return tool_actions
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
return tool_actions
def _log_exchange(
message: str, response_text: str | None, error_text: str | None, timestamp: str
) -> None:
"""Append user message and agent/error reply to the in-memory log."""
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
message = _validate_message(message)
_record_user_activity()
timestamp = datetime.now().strftime("%H:%M:%S") timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None response_text = None
error_text = None error_text = None
@@ -164,15 +104,54 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = f"Chat error: {exc}" error_text = f"Chat error: {exc}"
run_output = None run_output = None
tool_actions: list[dict] = [] # Check if Agno paused the run for tool confirmation
tool_actions = []
if run_output is not None: if run_output is not None:
tool_actions = _extract_tool_actions(run_output) status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from timmy.approvals import create_item
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
raw_content = run_output.content if hasattr(run_output, "content") else "" raw_content = run_output.content if hasattr(run_output, "content") else ""
response_text = _clean_response(raw_content or "") response_text = _clean_response(raw_content or "")
if not response_text and not tool_actions: if not response_text and not tool_actions:
response_text = None response_text = None # let error template show if needed
_log_exchange(message, response_text, error_text, timestamp) message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
return templates.TemplateResponse( return templates.TemplateResponse(
request, request,

View File

@@ -31,93 +31,6 @@ _UPLOAD_DIR = str(Path(settings.repo_root) / "data" / "chat-uploads")
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB _MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB
# ── POST /api/chat — helpers ─────────────────────────────────────────────────
async def _parse_chat_body(request: Request) -> tuple[dict | None, JSONResponse | None]:
"""Parse and validate the JSON request body.
Returns (body, None) on success or (None, error_response) on failure.
"""
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return None, JSONResponse(status_code=413, content={"error": "Request body too large"})
try:
body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return None, JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return None, JSONResponse(status_code=400, content={"error": "messages array is required"})
return body, None
def _extract_user_message(messages: list[dict]) -> str | None:
"""Return the text of the last user message, or *None* if absent."""
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
return " ".join(text_parts).strip() or None
text = str(content).strip()
return text or None
return None
def _build_context_prefix() -> str:
"""Build the system-context preamble injected before the user message."""
now = datetime.now()
return (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
def _notify_thinking_engine() -> None:
"""Record user activity so the thinking engine knows we're not idle."""
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
async def _process_chat(user_msg: str) -> dict | JSONResponse:
"""Send *user_msg* to the agent, log the exchange, and return a response."""
_notify_thinking_engine()
timestamp = datetime.now().strftime("%H:%M:%S")
try:
response_text = await agent_chat(
_build_context_prefix() + user_msg,
session_id="mobile",
)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
# ── POST /api/chat ──────────────────────────────────────────────────────────── # ── POST /api/chat ────────────────────────────────────────────────────────────
@@ -131,15 +44,78 @@ async def api_chat(request: Request):
Response: Response:
{"reply": "...", "timestamp": "HH:MM:SS"} {"reply": "...", "timestamp": "HH:MM:SS"}
""" """
body, err = await _parse_chat_body(request) # Enforce request body size limit
if err: content_length = request.headers.get("content-length")
return err if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return JSONResponse(status_code=413, content={"error": "Request body too large"})
user_msg = _extract_user_message(body["messages"]) try:
if not user_msg: body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return JSONResponse(status_code=400, content={"error": "messages array is required"})
# Extract the latest user message text
last_user_msg = None
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
# Handle multimodal content arrays — extract text parts
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
last_user_msg = " ".join(text_parts).strip()
else:
last_user_msg = str(content).strip()
break
if not last_user_msg:
return JSONResponse(status_code=400, content={"error": "No user message found"}) return JSONResponse(status_code=400, content={"error": "No user message found"})
return await _process_chat(user_msg) # Record user activity so the thinking engine knows we're not idle
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
timestamp = datetime.now().strftime("%H:%M:%S")
try:
# Inject context (same pattern as the HTMX chat handler in agents.py)
now = datetime.now()
context_prefix = (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
response_text = await agent_chat(
context_prefix + last_user_msg,
session_id="mobile",
)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
# ── POST /api/upload ────────────────────────────────────────────────────────── # ── POST /api/upload ──────────────────────────────────────────────────────────

View File

@@ -100,14 +100,36 @@ def _get_git_context() -> dict:
return {"branch": "unknown", "commit": "unknown"} return {"branch": "unknown", "commit": "unknown"}
def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]: def capture_error(
"""Extract formatted traceback, affected file, and line number. exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns: Returns:
Tuple of (traceback_string, affected_file, affected_line). Task ID of the created bug report, or None if deduplicated/disabled
""" """
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
# Format the stack trace
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
# Extract file/line from traceback
tb_obj = exc.__traceback__ tb_obj = exc.__traceback__
affected_file = "unknown" affected_file = "unknown"
affected_line = 0 affected_line = 0
@@ -117,18 +139,9 @@ def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]:
affected_file = tb_obj.tb_frame.f_code.co_filename affected_file = tb_obj.tb_frame.f_code.co_filename
affected_line = tb_obj.tb_lineno affected_line = tb_obj.tb_lineno
return tb_str, affected_file, affected_line git_ctx = _get_git_context()
# 1. Log to event_log
def _log_error_event(
exc: Exception,
source: str,
error_hash: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> None:
"""Log the captured error to the event log."""
try: try:
from swarm.event_log import EventType, log_event from swarm.event_log import EventType, log_event
@@ -148,18 +161,8 @@ def _log_error_event(
except Exception as log_exc: except Exception as log_exc:
logger.debug("Failed to log error event: %s", log_exc) logger.debug("Failed to log error event: %s", log_exc)
# 2. Create bug report task
def _create_bug_report( task_id = None
exc: Exception,
source: str,
context: dict | None,
error_hash: str,
tb_str: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> str | None:
"""Create a bug report task and return the task ID (or None on failure)."""
try: try:
from swarm.task_queue.models import create_task from swarm.task_queue.models import create_task
@@ -192,6 +195,7 @@ def _create_bug_report(
) )
task_id = task.id task_id = task.id
# Log the creation event
try: try:
from swarm.event_log import EventType, log_event from swarm.event_log import EventType, log_event
@@ -206,16 +210,12 @@ def _create_bug_report(
) )
except Exception as exc: except Exception as exc:
logger.warning("Bug report screenshot error: %s", exc) logger.warning("Bug report screenshot error: %s", exc)
pass
return task_id
except Exception as task_exc: except Exception as task_exc:
logger.debug("Failed to create bug report task: %s", task_exc) logger.debug("Failed to create bug report task: %s", task_exc)
return None
# 3. Send notification
def _notify_bug_report(exc: Exception, source: str) -> None:
"""Send a push notification about the captured error."""
try: try:
from infrastructure.notifications.push import notifier from infrastructure.notifications.push import notifier
@@ -224,12 +224,11 @@ def _notify_bug_report(exc: Exception, source: str) -> None:
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}", message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
category="system", category="system",
) )
except Exception as notify_exc: except Exception as exc:
logger.warning("Bug report notification error: %s", notify_exc) logger.warning("Bug report notification error: %s", exc)
pass
# 4. Record in session logger (via registered callback)
def _record_to_session(exc: Exception, source: str) -> None:
"""Record the error via the registered session callback."""
if _error_recorder is not None: if _error_recorder is not None:
try: try:
_error_recorder( _error_recorder(
@@ -239,50 +238,4 @@ def _record_to_session(exc: Exception, source: str) -> None:
except Exception as log_exc: except Exception as log_exc:
logger.warning("Bug report session logging error: %s", log_exc) logger.warning("Bug report session logging error: %s", log_exc)
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
tb_str, affected_file, affected_line = _extract_traceback_info(exc)
git_ctx = _get_git_context()
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
task_id = _create_bug_report(
exc,
source,
context,
error_hash,
tb_str,
affected_file,
affected_line,
git_ctx,
)
_notify_bug_report(exc, source)
_record_to_session(exc, source)
return task_id return task_id

View File

@@ -144,65 +144,6 @@ class ShellHand:
return None return None
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into a copy of the current environment."""
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
return run_env
async def _execute_subprocess(
self,
command: str,
effective_timeout: int,
cwd: str | None,
run_env: dict,
start: float,
) -> ShellResult:
"""Run *command* as a subprocess with timeout enforcement."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
async def run( async def run(
self, self,
command: str, command: str,
@@ -223,6 +164,7 @@ class ShellHand:
""" """
start = time.time() start = time.time()
# Validate
validation_error = self._validate_command(command) validation_error = self._validate_command(command)
if validation_error: if validation_error:
return ShellResult( return ShellResult(
@@ -236,8 +178,52 @@ class ShellHand:
cwd = working_dir or self._working_dir cwd = working_dir or self._working_dir
try: try:
run_env = self._build_run_env(env) import os
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
run_env = os.environ.copy()
if env:
run_env.update(env)
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
except Exception as exc: except Exception as exc:
latency = (time.time() - start) * 1000 latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc) logger.warning("Shell command failed: %s%s", command, exc)

View File

@@ -1 +1 @@
"""Timmy — Core AI agent (Ollama/Grok/Claude backends, CLI, prompts).""" """Timmy — Core AI agent (Ollama/AirLLM backends, CLI, prompts)."""

View File

@@ -26,12 +26,12 @@ from timmy.prompts import get_system_prompt
from timmy.tools import create_full_toolkit from timmy.tools import create_full_toolkit
if TYPE_CHECKING: if TYPE_CHECKING:
from timmy.backends import ClaudeBackend, GrokBackend from timmy.backends import ClaudeBackend, GrokBackend, TimmyAirLLMAgent
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Union type for callers that want to hint the return type. # Union type for callers that want to hint the return type.
TimmyAgent = Union[Agent, "GrokBackend", "ClaudeBackend"] TimmyAgent = Union[Agent, "TimmyAirLLMAgent", "GrokBackend", "ClaudeBackend"]
# Models known to be too small for reliable tool calling. # Models known to be too small for reliable tool calling.
# These hallucinate tool calls as text, invoke tools randomly, # These hallucinate tool calls as text, invoke tools randomly,
@@ -172,34 +172,107 @@ def _warmup_model(model_name: str) -> bool:
def _resolve_backend(requested: str | None) -> str: def _resolve_backend(requested: str | None) -> str:
"""Return the backend name to use. """Return the backend name to use, resolving 'auto' and explicit overrides.
Priority (highest -> lowest): Priority (highest lowest):
1. CLI flag passed directly to create_timmy() 1. CLI flag passed directly to create_timmy()
2. TIMMY_MODEL_BACKEND env var / .env setting 2. TIMMY_MODEL_BACKEND env var / .env setting
3. 'ollama' (safe default -- no surprises) 3. 'ollama' (safe default no surprises)
'auto' triggers Apple Silicon detection: uses AirLLM if both
is_apple_silicon() and airllm_available() return True.
""" """
if requested is not None: if requested is not None:
return requested return requested
return settings.timmy_model_backend # "ollama" | "grok" | "claude" configured = settings.timmy_model_backend # "ollama" | "airllm" | "grok" | "claude" | "auto"
if configured != "auto":
return configured
# "auto" path — lazy import to keep startup fast and tests clean.
from timmy.backends import airllm_available, is_apple_silicon
if is_apple_silicon() and airllm_available():
return "airllm"
return "ollama"
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list: def create_timmy(
"""Assemble the tools list based on model capability and MCP flags. db_file: str = "timmy.db",
backend: str | None = None,
model_size: str | None = None,
*,
skip_mcp: bool = False,
session_id: str = "unknown",
) -> TimmyAgent:
"""Instantiate the agent — Ollama or AirLLM, same public interface.
Returns a list of Toolkit / MCPTools objects, or an empty list. Args:
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "airllm" | "auto" | None (reads config/env).
model_size: AirLLM size — "8b" | "70b" | "405b" | None (reads config).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
task cancellation.
Returns an Agno Agent or backend-specific agent — all expose
print_response(message, stream).
""" """
resolved = _resolve_backend(backend)
size = model_size or "70b"
if resolved == "claude":
from timmy.backends import ClaudeBackend
return ClaudeBackend()
if resolved == "grok":
from timmy.backends import GrokBackend
return GrokBackend()
if resolved == "airllm":
from timmy.backends import TimmyAirLLMAgent
return TimmyAirLLMAgent(model_size=size)
# Default: Ollama via Agno.
# Resolve model with automatic pulling and fallback
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
require_vision=False,
auto_pull=True,
)
# If Ollama is completely unreachable, fail loudly.
# Sovereignty: never silently send data to a cloud API.
# Use --backend claude explicitly if you want cloud inference.
if not _check_model_available(model_name):
logger.error(
"Ollama unreachable and no local models available. "
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
)
if is_fallback:
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
# Conditionally include tools — small models get none
toolkit = create_full_toolkit() if use_tools else None
if not use_tools: if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name) logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
return []
tools_list: list = [create_full_toolkit()] # Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun()). # Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel # Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72). # scopes that conflict with asyncio background task cancellation (#72).
if not skip_mcp: if use_tools and not skip_mcp:
try: try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
@@ -213,46 +286,34 @@ def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
except Exception as exc: except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc) logger.debug("MCP tools unavailable: %s", exc)
return tools_list # Select prompt tier based on tool capability
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context."""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id) base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
# Try to load memory context
try: try:
from timmy.memory_system import memory_system from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context() memory_context = memory_system.get_system_context()
if memory_context: if memory_context:
# Smaller budget for small models — expanded prompt uses more tokens # Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
max_context = 2000 if not use_tools else 8000 max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context: if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]" memory_context = memory_context[:max_context] + "\n... [truncated]"
return ( full_prompt = (
f"{base_prompt}\n\n" f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n" f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}" f"{memory_context}"
) )
else:
full_prompt = base_prompt
except Exception as exc: except Exception as exc:
logger.warning("Failed to load memory context: %s", exc) logger.warning("Failed to load memory context: %s", exc)
full_prompt = base_prompt
return base_prompt
def _create_ollama_agent(
*,
db_file: str,
model_name: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with Ollama backend and warm up the model."""
model_kwargs = {} model_kwargs = {}
if settings.ollama_num_ctx > 0: if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx} model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
agent = Agent( agent = Agent(
name="Agent", name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs), model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
@@ -269,67 +330,6 @@ def _create_ollama_agent(
return agent return agent
def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
*,
skip_mcp: bool = False,
session_id: str = "unknown",
) -> TimmyAgent:
"""Instantiate the agent — Ollama, Grok, or Claude.
Args:
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "grok" | "claude" | None (reads config/env).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
task cancellation.
Returns an Agno Agent or backend-specific agent — all expose
print_response(message, stream).
"""
resolved = _resolve_backend(backend)
if resolved == "claude":
from timmy.backends import ClaudeBackend
return ClaudeBackend()
if resolved == "grok":
from timmy.backends import GrokBackend
return GrokBackend()
# Default: Ollama via Agno.
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
require_vision=False,
auto_pull=True,
)
if not _check_model_available(model_name):
logger.error(
"Ollama unreachable and no local models available. "
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
)
if is_fallback:
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
full_prompt = _build_prompt(use_tools, session_id)
return _create_ollama_agent(
db_file=db_file,
model_name=model_name,
tools_list=tools_list,
full_prompt=full_prompt,
use_tools=use_tools,
)
class TimmyWithMemory: class TimmyWithMemory:
"""Agent wrapper with explicit three-tier memory management.""" """Agent wrapper with explicit three-tier memory management."""

View File

@@ -1,10 +1,11 @@
"""LLM backends — Grok (xAI) and Claude (Anthropic). """LLM backends — AirLLM (local big models), Grok (xAI), and Claude (Anthropic).
Provides drop-in replacements for the Agno Agent that expose the same Provides drop-in replacements for the Agno Agent that expose the same
run(message, stream) → RunResult interface used by the dashboard and the run(message, stream) → RunResult interface used by the dashboard and the
print_response(message, stream) interface used by the CLI. print_response(message, stream) interface used by the CLI.
Backends: Backends:
- TimmyAirLLMAgent: Local 8B/70B/405B via AirLLM (Apple Silicon or PyTorch)
- GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium) - GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium)
- ClaudeBackend: Anthropic Claude API — lightweight cloud fallback - ClaudeBackend: Anthropic Claude API — lightweight cloud fallback
@@ -15,11 +16,21 @@ import logging
import platform import platform
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal
from timmy.prompts import get_system_prompt from timmy.prompts import get_system_prompt
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# HuggingFace model IDs for each supported size.
_AIRLLM_MODELS: dict[str, str] = {
"8b": "meta-llama/Meta-Llama-3.1-8B-Instruct",
"70b": "meta-llama/Meta-Llama-3.1-70B-Instruct",
"405b": "meta-llama/Meta-Llama-3.1-405B-Instruct",
}
ModelSize = Literal["8b", "70b", "405b"]
@dataclass @dataclass
class RunResult: class RunResult:
@@ -34,6 +45,108 @@ def is_apple_silicon() -> bool:
return platform.system() == "Darwin" and platform.machine() == "arm64" return platform.system() == "Darwin" and platform.machine() == "arm64"
def airllm_available() -> bool:
"""Return True when the airllm package is importable."""
try:
import airllm # noqa: F401
return True
except ImportError:
return False
class TimmyAirLLMAgent:
"""Thin AirLLM wrapper compatible with both dashboard and CLI call sites.
Exposes:
run(message, stream) → RunResult(content=...) [dashboard]
print_response(message, stream) → None [CLI]
Maintains a rolling 10-turn in-memory history so Timmy remembers the
conversation within a session — no SQLite needed at this layer.
"""
def __init__(self, model_size: str = "70b") -> None:
model_id = _AIRLLM_MODELS.get(model_size)
if model_id is None:
raise ValueError(
f"Unknown model size {model_size!r}. Choose from: {list(_AIRLLM_MODELS)}"
)
if is_apple_silicon():
from airllm import AirLLMMLX # type: ignore[import]
self._model = AirLLMMLX(model_id)
else:
from airllm import AutoModel # type: ignore[import]
self._model = AutoModel.from_pretrained(model_id)
self._history: list[str] = []
self._model_size = model_size
# ── public interface (mirrors Agno Agent) ────────────────────────────────
def run(self, message: str, *, stream: bool = False) -> RunResult:
"""Run inference and return a structured result (matches Agno Agent.run()).
`stream` is accepted for API compatibility; AirLLM always generates
the full output in one pass.
"""
prompt = self._build_prompt(message)
input_tokens = self._model.tokenizer(
[prompt],
return_tensors="pt",
padding=True,
truncation=True,
max_length=2048,
)
output = self._model.generate(
**input_tokens,
max_new_tokens=512,
use_cache=True,
do_sample=True,
temperature=0.7,
)
# Decode only the newly generated tokens, not the prompt.
input_len = input_tokens["input_ids"].shape[1]
response = self._model.tokenizer.decode(
output[0][input_len:], skip_special_tokens=True
).strip()
self._history.append(f"User: {message}")
self._history.append(f"Timmy: {response}")
return RunResult(content=response)
def print_response(self, message: str, *, stream: bool = True) -> None:
"""Run inference and render the response to stdout (CLI interface)."""
result = self.run(message, stream=stream)
self._render(result.content)
# ── private helpers ──────────────────────────────────────────────────────
def _build_prompt(self, message: str) -> str:
context = get_system_prompt(tools_enabled=False, session_id="airllm") + "\n\n"
# Include the last 10 turns (5 exchanges) for continuity.
if self._history:
context += "\n".join(self._history[-10:]) + "\n\n"
return context + f"User: {message}\nTimmy:"
@staticmethod
def _render(text: str) -> None:
"""Print response with rich markdown when available, plain text otherwise."""
try:
from rich.console import Console
from rich.markdown import Markdown
Console().print(Markdown(text))
except ImportError:
print(text)
# ── Grok (xAI) Backend ───────────────────────────────────────────────────── # ── Grok (xAI) Backend ─────────────────────────────────────────────────────
# Premium cloud augmentation — opt-in only, never the default path. # Premium cloud augmentation — opt-in only, never the default path.
@@ -74,7 +187,7 @@ class GrokBackend:
Uses the OpenAI-compatible SDK to connect to xAI's API. Uses the OpenAI-compatible SDK to connect to xAI's API.
Only activated when GROK_ENABLED=true and XAI_API_KEY is set. Only activated when GROK_ENABLED=true and XAI_API_KEY is set.
Exposes the same interface as Agno Agent: Exposes the same interface as TimmyAirLLMAgent and Agno Agent:
run(message, stream) → RunResult [dashboard] run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI] print_response(message, stream) → None [CLI]
health_check() → dict [monitoring] health_check() → dict [monitoring]
@@ -324,7 +437,8 @@ CLAUDE_MODELS: dict[str, str] = {
class ClaudeBackend: class ClaudeBackend:
"""Anthropic Claude backend — cloud fallback when local models are offline. """Anthropic Claude backend — cloud fallback when local models are offline.
Uses the official Anthropic SDK. Same interface as GrokBackend: Uses the official Anthropic SDK. Same interface as GrokBackend and
TimmyAirLLMAgent:
run(message, stream) → RunResult [dashboard] run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI] print_response(message, stream) → None [CLI]
health_check() → dict [monitoring] health_check() → dict [monitoring]

View File

@@ -22,13 +22,13 @@ _BACKEND_OPTION = typer.Option(
None, None,
"--backend", "--backend",
"-b", "-b",
help="Inference backend: 'ollama' (default) | 'grok' | 'claude'", help="Inference backend: 'ollama' (default) | 'airllm' | 'auto'",
) )
_MODEL_SIZE_OPTION = typer.Option( _MODEL_SIZE_OPTION = typer.Option(
None, None,
"--model-size", "--model-size",
"-s", "-s",
help="Model size (reserved for future use).", help="AirLLM model size when --backend airllm: '8b' | '70b' | '405b'",
) )

View File

@@ -98,73 +98,6 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
return {row[1] for row in cursor.fetchall()} return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None: def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table. """Migrate from old three-table schema to unified memories table.
@@ -177,16 +110,78 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
tables = {row[0] for row in cursor.fetchall()} tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
if not has_memories and (tables & {"episodes", "chunks", "facts"}): # Check if we need to migrate (old schema exists)
if not has_memories and (has_episodes or has_chunks or has_facts):
logger.info("Migration: Creating unified memories table") logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
if "episodes" in tables and has_memories: # Migrate episodes -> memories
_migrate_episodes(conn) if has_episodes and has_memories:
if "chunks" in tables and has_memories: logger.info("Migration: Converting episodes table to memories")
_migrate_chunks(conn) try:
if "facts" in tables: cols = _get_table_columns(conn, "episodes")
_drop_legacy_table(conn, "facts") context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old tables
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
conn.commit() conn.commit()
@@ -303,86 +298,6 @@ def store_memory(
return entry return entry
def _build_memory_filter(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build WHERE clause and params for memory queries."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_rank(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
limit: int,
) -> list[MemoryEntry]:
"""Score candidates by similarity and return top results."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]
def search_memories( def search_memories(
query: str, query: str,
limit: int = 10, limit: int = 10,
@@ -405,9 +320,66 @@ def search_memories(
List of MemoryEntry objects sorted by relevance List of MemoryEntry objects sorted by relevance
""" """
query_embedding = embed_text(query) query_embedding = embed_text(query)
where_clause, params = _build_memory_filter(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3) # Build query with filters
return _score_and_rank(rows, query, query_embedding, min_relevance, limit) conditions = []
params = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
# Fetch candidates (we'll do in-memory similarity for now)
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(limit * 3) # Get more candidates for ranking
with get_connection() as conn:
rows = conn.execute(query_sql, params).fetchall()
# Compute similarity scores
results = []
for row in rows:
entry = MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
else:
# Fallback: check for keyword overlap
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
# Sort by relevance and return top results
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]
def delete_memory(memory_id: str) -> bool: def delete_memory(memory_id: str) -> bool:

View File

@@ -323,75 +323,6 @@ def session_history(query: str, role: str = "", limit: int = 10) -> str:
_LOW_CONFIDENCE_THRESHOLD = 0.5 _LOW_CONFIDENCE_THRESHOLD = 0.5
def _categorize_entries(
entries: list[dict],
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
"""Split session entries into messages, errors, timmy msgs, user msgs."""
messages = [e for e in entries if e.get("type") == "message"]
errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
return messages, errors, timmy_msgs, user_msgs
def _find_low_confidence(timmy_msgs: list[dict]) -> list[dict]:
"""Return Timmy responses below the confidence threshold."""
return [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
def _find_repeated_topics(user_msgs: list[dict], top_n: int = 5) -> list[tuple[str, int]]:
"""Identify frequently mentioned words in user messages."""
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
return sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:top_n]
def _format_reflection_section(
title: str,
items: list[dict],
formatter: object,
empty_msg: str,
) -> list[str]:
"""Format a titled section with items, or an empty-state message."""
if items:
lines = [f"### {title} ({len(items)})"]
for item in items[:5]:
lines.append(formatter(item)) # type: ignore[operator]
lines.append("")
return lines
return [f"### {title}\n{empty_msg}\n"]
def _build_insights(
low_conf: list[dict],
errors: list[dict],
repeated: list[tuple[str, int]],
) -> list[str]:
"""Generate actionable insight bullets from analysis results."""
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
insights.append(
f'User frequently asks about "{repeated[0][0]}" — consider deepening knowledge here.'
)
return insights or ["Conversations look healthy. Keep up the good work."]
def self_reflect(limit: int = 30) -> str: def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior. """Review recent conversations and reflect on Timmy's own behavior.
@@ -412,12 +343,35 @@ def self_reflect(limit: int = 30) -> str:
if not entries: if not entries:
return "No conversation history to reflect on yet." return "No conversation history to reflect on yet."
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries) # Categorize entries
low_conf = _find_low_confidence(timmy_msgs) messages = [e for e in entries if e.get("type") == "message"]
repeated = _find_repeated_topics(user_msgs) errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
# 1. Low-confidence responses
low_conf = [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
# 2. Identify repeated user topics (simple word frequency)
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
repeated = sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:5]
# Build reflection report # Build reflection report
sections: list[str] = ["## Self-Reflection Report\n"] sections: list[str] = ["## Self-Reflection Report\n"]
sections.append( sections.append(
f"Reviewed {len(entries)} recent entries: " f"Reviewed {len(entries)} recent entries: "
f"{len(user_msgs)} user messages, " f"{len(user_msgs)} user messages, "
@@ -425,27 +379,32 @@ def self_reflect(limit: int = 30) -> str:
f"{len(errors)} errors.\n" f"{len(errors)} errors.\n"
) )
sections.extend( # Low confidence
_format_reflection_section( if low_conf:
"Low-Confidence Responses", sections.append(f"### Low-Confidence Responses ({len(low_conf)})")
low_conf, for m in low_conf[:5]:
lambda m: ( ts = (m.get("timestamp") or "?")[:19]
f"- [{(m.get('timestamp') or '?')[:19]}] " conf = m.get("confidence", 0)
f"confidence={m.get('confidence', 0):.0%}: " text = (m.get("content") or "")[:120]
f"{(m.get('content') or '')[:120]}" sections.append(f"- [{ts}] confidence={conf:.0%}: {text}")
), sections.append("")
"None found — all responses above threshold.", else:
sections.append(
"### Low-Confidence Responses\nNone found — all responses above threshold.\n"
) )
)
sections.extend(
_format_reflection_section(
"Errors",
errors,
lambda e: f"- [{(e.get('timestamp') or '?')[:19]}] {(e.get('error') or '')[:120]}",
"No errors recorded.",
)
)
# Errors
if errors:
sections.append(f"### Errors ({len(errors)})")
for e in errors[:5]:
ts = (e.get("timestamp") or "?")[:19]
err = (e.get("error") or "")[:120]
sections.append(f"- [{ts}] {err}")
sections.append("")
else:
sections.append("### Errors\nNo errors recorded.\n")
# Repeated topics
if repeated: if repeated:
sections.append("### Recurring Topics") sections.append("### Recurring Topics")
for word, count in repeated: for word, count in repeated:
@@ -454,8 +413,22 @@ def self_reflect(limit: int = 30) -> str:
else: else:
sections.append("### Recurring Topics\nNo strong patterns detected.\n") sections.append("### Recurring Topics\nNo strong patterns detected.\n")
# Actionable summary
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
top_topic = repeated[0][0]
insights.append(
f'User frequently asks about "{top_topic}" — consider deepening knowledge here.'
)
if not insights:
insights.append("Conversations look healthy. Keep up the good work.")
sections.append("### Insights") sections.append("### Insights")
for insight in _build_insights(low_conf, errors, repeated): for insight in insights:
sections.append(f"- {insight}") sections.append(f"- {insight}")
return "\n".join(sections) return "\n".join(sections)

View File

@@ -232,29 +232,58 @@ class ThinkingEngine:
return False # Disabled — never idle return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout) return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]: async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Assemble the context needed for a thinking cycle. """Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
content, seed_type = await self._generate_thought(prompt)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._finalize_thought(thought)
return thought
async def _generate_thought(self, prompt: str | None = None) -> tuple[str | None, str]:
"""Generate novel thought content via the dedup retry loop.
Gathers context, builds the LLM prompt, calls the agent, and
retries with a fresh seed if the result is too similar to recent
thoughts.
Returns: Returns:
(memory_context, system_context, recent_thoughts) A (content, seed_type) tuple. *content* is ``None`` when the
cycle should be skipped (agent failure, empty response, or
all retries exhausted).
""" """
memory_context = self._load_memory_context() memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot() system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5) recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought( content: str | None = None
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list["Thought"],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform" seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1): for attempt in range(self._MAX_DEDUP_RETRIES + 1):
@@ -287,7 +316,7 @@ class ThinkingEngine:
# Dedup: reject thoughts too similar to recent ones # Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts): if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES: if attempt < self._MAX_DEDUP_RETRIES:
logger.info( logger.info(
@@ -295,6 +324,7 @@ class ThinkingEngine:
attempt + 1, attempt + 1,
self._MAX_DEDUP_RETRIES + 1, self._MAX_DEDUP_RETRIES + 1,
) )
content = None # Will retry
else: else:
logger.warning( logger.warning(
"Thought still repetitive after %d retries, discarding", "Thought still repetitive after %d retries, discarding",
@@ -302,10 +332,10 @@ class ThinkingEngine:
) )
return None, seed_type return None, seed_type
return None, seed_type return content, seed_type
async def _process_thinking_result(self, thought: "Thought") -> None: async def _finalize_thought(self, thought: Thought) -> None:
"""Run all post-hooks after a thought is stored.""" """Run post-hooks, log, journal, and broadcast a stored thought."""
self._maybe_check_memory() self._maybe_check_memory()
await self._maybe_distill() await self._maybe_distill()
await self._maybe_file_issues() await self._maybe_file_issues()
@@ -316,54 +346,12 @@ class ThinkingEngine:
self._write_journal(thought) self._write_journal(thought)
await self._broadcast(thought) await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
Args:
prompt: Optional custom seed prompt. When provided, overrides
the random seed selection and uses "prompted" as the
seed type — useful for journal prompts from the CLI.
1. Gather a seed context (or use the custom prompt)
2. Build a prompt with continuity from recent thoughts
3. Call the agent
4. Store the thought
5. Log the event and broadcast via WebSocket
"""
if not settings.thinking_enabled:
return None
# Skip idle periods — don't count internal processing as thoughts
if not prompt and self._is_idle():
logger.debug(
"Thinking paused — no user input for %d minutes",
settings.thinking_idle_timeout_minutes,
)
return None
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
logger.info( logger.info(
"Thought [%s] (%s): %s", "Thought [%s] (%s): %s",
thought.id[:8], thought.id[:8],
seed_type, thought.seed_type,
thought.content[:80], thought.content[:80],
) )
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]: def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts.""" """Retrieve the most recent thoughts."""
@@ -772,80 +760,6 @@ class ThinkingEngine:
except Exception as exc: except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc) logger.debug("Thought issue filing skipped: %s", exc)
# ── System snapshot helpers ────────────────────────────────────────────
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
return None
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
last = messages[-1]
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
return []
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
return None
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
if new_corr:
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
return []
def _gather_system_snapshot(self) -> str: def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality. """Gather lightweight real system state for grounding thoughts in reality.
@@ -853,24 +767,83 @@ class ThinkingEngine:
recent chat activity, and task queue status. Never crashes — every recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted. section is independently try/excepted.
""" """
parts: list[str] = []
# Current local time
now = datetime.now().astimezone() now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC" tz = now.strftime("%Z") or "UTC"
parts.append(
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}" f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
] )
thought_line = self._snap_thought_count(now) # Thought count today (cheap DB query)
if thought_line: try:
parts.append(thought_line) today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
parts.append(f"Thoughts today: {count}")
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
pass
parts.extend(self._snap_chat_activity()) # Recent chat activity (in-memory, no I/O)
try:
from infrastructure.chat_store import message_log
task_line = self._snap_task_queue() messages = message_log.all()
if task_line: if messages:
parts.append(task_line) parts.append(f"Chat messages this session: {len(messages)}")
last = messages[-1]
parts.append(f'Last chat ({last.role}): "{last.content[:80]}"')
else:
parts.append("No chat messages this session")
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
pass
parts.extend(self._snap_workspace()) # Task queue (lightweight DB query)
try:
from swarm.task_queue.models import get_task_summary_for_briefing
summary = get_task_summary_for_briefing()
running = summary.get("running", 0)
pending = summary.get("pending_approval", 0)
done = summary.get("completed", 0)
failed = summary.get("failed", 0)
if running or pending or done or failed:
parts.append(
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
pass
# Workspace updates (file-based communication with Hermes)
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Count entries (assuming each entry starts with a timestamp or header)
line_count = len([line for line in new_corr.splitlines() if line.strip()])
parts.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
pass
return "\n".join(parts) if parts else "" return "\n".join(parts) if parts else ""
@@ -1139,37 +1112,21 @@ class ThinkingEngine:
lines.append(f"- [{thought.seed_type}] {snippet}") lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines) return "\n".join(lines)
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str: async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought. """Call Timmy's agent to generate a thought.
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72) and to prevent per-call resource leaks (httpx background tasks (#72). The thinking engine doesn't need Gitea or
clients, SQLite connections, model warmups) that caused the thinking filesystem tools — it only needs the LLM.
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text. downstream parsers (fact distillation, issue filing) receive clean text.
""" """
import asyncio from timmy.agent import create_timmy
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
agent = create_timmy(skip_mcp=True)
run = await agent.arun(prompt, stream=False)
raw = run.content if hasattr(run, "content") else str(run) raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw return _THINK_TAG_RE.sub("", raw) if raw else raw

View File

@@ -26,7 +26,7 @@ def get_system_info() -> dict[str, Any]:
- python_version: Python version - python_version: Python version
- platform: OS platform - platform: OS platform
- model: Current Ollama model (queried from API) - model: Current Ollama model (queried from API)
- model_backend: Configured backend (ollama/grok/claude) - model_backend: Configured backend (ollama/airllm/grok)
- ollama_url: Ollama host URL - ollama_url: Ollama host URL
- repo_root: Repository root path - repo_root: Repository root path
- grok_enabled: Whether GROK is enabled - grok_enabled: Whether GROK is enabled
@@ -127,48 +127,54 @@ def check_ollama_health() -> dict[str, Any]:
return result return result
def _hot_memory_info(repo_root: Path) -> dict[str, Any]: def get_memory_status() -> dict[str, Any]:
"""Tier 1: Hot memory (MEMORY.md) status.""" """Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
# Check tier 1: Hot memory
memory_md = repo_root / "MEMORY.md" memory_md = repo_root / "MEMORY.md"
tier1_exists = memory_md.exists() tier1_exists = memory_md.exists()
tier1_content = "" tier1_content = ""
if tier1_exists: if tier1_exists:
tier1_content = memory_md.read_text()[:500] tier1_content = memory_md.read_text()[:500] # First 500 chars
info: dict[str, Any] = { # Check tier 2: Vault
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = []
if tier2_exists:
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
tier1_info: dict[str, Any] = {
"exists": tier1_exists, "exists": tier1_exists,
"path": str(memory_md), "path": str(memory_md),
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None, "preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
} }
if tier1_exists: if tier1_exists:
lines = memory_md.read_text().splitlines() lines = memory_md.read_text().splitlines()
info["line_count"] = len(lines) tier1_info["line_count"] = len(lines)
info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")] tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
return info
def _vault_info(repo_root: Path) -> dict[str, Any]:
"""Tier 2: Vault (memory/ directory tree) status."""
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else []
# Vault — scan all subdirs under memory/
vault_root = repo_root / "memory" vault_root = repo_root / "memory"
info: dict[str, Any] = { vault_info: dict[str, Any] = {
"exists": tier2_exists, "exists": tier2_exists,
"path": str(vault_path), "path": str(vault_path),
"file_count": len(tier2_files), "file_count": len(tier2_files),
"files": tier2_files[:10], "files": tier2_files[:10],
} }
if vault_root.exists(): if vault_root.exists():
info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()] vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md")) vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
return info
# Tier 3: Semantic memory row count
def _semantic_memory_info(repo_root: Path) -> dict[str, Any]: tier3_info: dict[str, Any] = {"available": False}
"""Tier 3: Semantic memory (vector DB) status."""
info: dict[str, Any] = {"available": False}
try: try:
sem_db = repo_root / "data" / "memory.db" sem_db = repo_root / "data" / "memory.db"
if sem_db.exists(): if sem_db.exists():
@@ -178,16 +184,14 @@ def _semantic_memory_info(repo_root: Path) -> dict[str, Any]:
).fetchone() ).fetchone()
if row and row[0]: if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone() count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
info["available"] = True tier3_info["available"] = True
info["vector_count"] = count[0] if count else 0 tier3_info["vector_count"] = count[0] if count else 0
except Exception as exc: except Exception as exc:
logger.debug("Memory status query failed: %s", exc) logger.debug("Memory status query failed: %s", exc)
return info pass
# Self-coding journal stats
def _journal_info(repo_root: Path) -> dict[str, Any]: journal_info: dict[str, Any] = {"available": False}
"""Self-coding journal statistics."""
info: dict[str, Any] = {"available": False}
try: try:
journal_db = repo_root / "data" / "self_coding.db" journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists(): if journal_db.exists():
@@ -199,7 +203,7 @@ def _journal_info(repo_root: Path) -> dict[str, Any]:
if rows: if rows:
counts = {r["outcome"]: r["cnt"] for r in rows} counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values()) total = sum(counts.values())
info = { journal_info = {
"available": True, "available": True,
"total_attempts": total, "total_attempts": total,
"successes": counts.get("success", 0), "successes": counts.get("success", 0),
@@ -208,24 +212,13 @@ def _journal_info(repo_root: Path) -> dict[str, Any]:
} }
except Exception as exc: except Exception as exc:
logger.debug("Journal stats query failed: %s", exc) logger.debug("Journal stats query failed: %s", exc)
return info pass
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
return { return {
"tier1_hot_memory": _hot_memory_info(repo_root), "tier1_hot_memory": tier1_info,
"tier2_vault": _vault_info(repo_root), "tier2_vault": vault_info,
"tier3_semantic": _semantic_memory_info(repo_root), "tier3_semantic": tier3_info,
"self_coding_journal": _journal_info(repo_root), "self_coding_journal": journal_info,
} }

View File

@@ -18,6 +18,7 @@ except ImportError:
# agno is a core dependency (always installed) — do NOT stub it, or its # agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers. # internal import chains break under xdist parallel workers.
for _mod in [ for _mod in [
"airllm",
"mcp", "mcp",
"mcp.client", "mcp.client",
"mcp.client.stdio", "mcp.client.stdio",

View File

@@ -10,10 +10,12 @@ Categories:
M3xx iOS keyboard & zoom prevention M3xx iOS keyboard & zoom prevention
M4xx HTMX robustness (double-submit, sync) M4xx HTMX robustness (double-submit, sync)
M5xx Safe-area / notch support M5xx Safe-area / notch support
M6xx AirLLM backend interface contract
""" """
import re import re
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# ── helpers ─────────────────────────────────────────────────────────────────── # ── helpers ───────────────────────────────────────────────────────────────────
@@ -204,3 +206,147 @@ def test_M505_dvh_units_used():
"""Dynamic viewport height (dvh) accounts for collapsing browser chrome.""" """Dynamic viewport height (dvh) accounts for collapsing browser chrome."""
css = _css() css = _css()
assert "dvh" in css assert "dvh" in css
# ── M6xx — AirLLM backend interface contract ──────────────────────────────────
def test_M601_airllm_agent_has_run_method():
"""TimmyAirLLMAgent must expose run() so the dashboard route can call it."""
from timmy.backends import TimmyAirLLMAgent
assert hasattr(TimmyAirLLMAgent, "run"), (
"TimmyAirLLMAgent is missing run() — dashboard will fail with AirLLM backend"
)
def test_M602_airllm_run_returns_content_attribute():
"""run() must return an object with a .content attribute (Agno RunResponse compat)."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
result = agent.run("test")
assert hasattr(result, "content"), "run() result must have a .content attribute"
assert isinstance(result.content, str)
def test_M603_airllm_run_updates_history():
"""run() must update _history so multi-turn context is preserved."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Acknowledged."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
assert len(agent._history) == 0
agent.run("hello")
assert len(agent._history) == 2
assert any("hello" in h for h in agent._history)
def test_M604_airllm_print_response_delegates_to_run():
"""print_response must use run() so both interfaces share one inference path."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import RunResult, TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
with (
patch.object(agent, "run", return_value=RunResult(content="ok")) as mock_run,
patch.object(agent, "_render"),
):
agent.print_response("hello", stream=True)
mock_run.assert_called_once_with("hello", stream=True)
def test_M605_health_status_passes_model_to_template(client):
"""Health status partial must receive the configured model name, not a hardcoded string."""
from config import settings
with patch(
"dashboard.routes.health.check_ollama",
new_callable=AsyncMock,
return_value=True,
):
response = client.get("/health/status")
# Model name should come from settings, not be hardcoded
assert response.status_code == 200
model_short = settings.ollama_model.split(":")[0]
assert model_short in response.text
# ── M7xx — XSS prevention ─────────────────────────────────────────────────────
def _mobile_html() -> str:
"""Read the mobile template source."""
path = Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "mobile.html"
return path.read_text()
def _swarm_live_html() -> str:
"""Read the swarm live template source."""
path = (
Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "swarm_live.html"
)
return path.read_text()
def test_M701_mobile_chat_no_raw_message_interpolation():
"""mobile.html must not interpolate ${message} directly into innerHTML — XSS risk."""
html = _mobile_html()
# The vulnerable pattern is `${message}` inside a template literal assigned to innerHTML
# After the fix, message must only appear via textContent assignment
assert "textContent = message" in html or "textContent=message" in html, (
"mobile.html still uses innerHTML + ${message} interpolation — XSS vulnerability"
)
def test_M702_mobile_chat_user_input_not_in_innerhtml_template_literal():
"""${message} must not appear inside a backtick string that is assigned to innerHTML."""
html = _mobile_html()
# Find all innerHTML += `...` blocks and verify none contain ${message}
blocks = re.findall(r"innerHTML\s*\+=?\s*`([^`]*)`", html, re.DOTALL)
for block in blocks:
assert "${message}" not in block, (
"innerHTML template literal still contains ${message} — XSS vulnerability"
)
def test_M703_swarm_live_agent_name_not_interpolated_in_innerhtml():
"""swarm_live.html must not put ${agent.name} inside innerHTML template literals."""
html = _swarm_live_html()
blocks = re.findall(r"innerHTML\s*=\s*agents\.map\([^;]+\)\.join\([^)]*\)", html, re.DOTALL)
assert len(blocks) == 0, (
"swarm_live.html still uses innerHTML=agents.map(…) with interpolated agent data — XSS vulnerability"
)
def test_M704_swarm_live_uses_textcontent_for_agent_data():
"""swarm_live.html must use textContent (not innerHTML) to set agent name/description."""
html = _swarm_live_html()
assert "textContent" in html, (
"swarm_live.html does not use textContent — agent data may be raw-interpolated into DOM"
)

View File

@@ -5,14 +5,9 @@ from datetime import UTC, datetime, timedelta
from unittest.mock import patch from unittest.mock import patch
from infrastructure.error_capture import ( from infrastructure.error_capture import (
_create_bug_report,
_dedup_cache, _dedup_cache,
_extract_traceback_info,
_get_git_context, _get_git_context,
_is_duplicate, _is_duplicate,
_log_error_event,
_notify_bug_report,
_record_to_session,
_stack_hash, _stack_hash,
capture_error, capture_error,
) )
@@ -198,91 +193,3 @@ class TestCaptureError:
def teardown_method(self): def teardown_method(self):
_dedup_cache.clear() _dedup_cache.clear()
class TestExtractTracebackInfo:
"""Test _extract_traceback_info helper."""
def test_returns_three_tuple(self):
try:
raise ValueError("extract test")
except ValueError as e:
tb_str, affected_file, affected_line = _extract_traceback_info(e)
assert "ValueError" in tb_str
assert "extract test" in tb_str
assert affected_file.endswith(".py")
assert affected_line > 0
def test_file_points_to_raise_site(self):
try:
_make_exception()
except ValueError as e:
_, affected_file, _ = _extract_traceback_info(e)
assert "test_error_capture" in affected_file
class TestLogErrorEvent:
"""Test _log_error_event helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("log test")
except RuntimeError as e:
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
class TestCreateBugReport:
"""Test _create_bug_report helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("report test")
except RuntimeError as e:
result = _create_bug_report(
e, "test", None, "abc123", "traceback...", "file.py", 42, {}
)
# May return None if swarm deps unavailable — that's fine
assert result is None or isinstance(result, str)
def test_with_context(self):
try:
raise RuntimeError("ctx test")
except RuntimeError as e:
result = _create_bug_report(e, "test", {"path": "/api"}, "abc", "tb", "f.py", 1, {})
assert result is None or isinstance(result, str)
class TestNotifyBugReport:
"""Test _notify_bug_report helper."""
def test_does_not_crash(self):
try:
raise RuntimeError("notify test")
except RuntimeError as e:
_notify_bug_report(e, "test")
class TestRecordToSession:
"""Test _record_to_session helper."""
def test_does_not_crash_without_recorder(self):
try:
raise RuntimeError("session test")
except RuntimeError as e:
_record_to_session(e, "test")
def test_calls_registered_recorder(self):
from infrastructure.error_capture import register_error_recorder
calls = []
register_error_recorder(lambda **kwargs: calls.append(kwargs))
try:
try:
raise RuntimeError("callback test")
except RuntimeError as e:
_record_to_session(e, "test_source")
assert len(calls) == 1
assert "RuntimeError" in calls[0]["error"]
assert calls[0]["context"] == "test_source"
finally:
register_error_recorder(None)

View File

@@ -81,6 +81,7 @@ def test_create_timmy_respects_custom_ollama_url():
mock_settings.ollama_url = custom_url mock_settings.ollama_url = custom_url
mock_settings.ollama_num_ctx = 4096 mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama" mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
from timmy.agent import create_timmy from timmy.agent import create_timmy
@@ -90,6 +91,33 @@ def test_create_timmy_respects_custom_ollama_url():
assert kwargs["host"] == custom_url assert kwargs["host"] == custom_url
# ── AirLLM path ──────────────────────────────────────────────────────────────
def test_create_timmy_airllm_returns_airllm_agent():
"""backend='airllm' must return a TimmyAirLLMAgent, not an Agno Agent."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.agent import create_timmy
from timmy.backends import TimmyAirLLMAgent
result = create_timmy(backend="airllm", model_size="8b")
assert isinstance(result, TimmyAirLLMAgent)
def test_create_timmy_airllm_does_not_call_agno_agent():
"""When using the airllm backend, Agno Agent should never be instantiated."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.backends.is_apple_silicon", return_value=False),
):
from timmy.agent import create_timmy
create_timmy(backend="airllm", model_size="8b")
MockAgent.assert_not_called()
def test_create_timmy_explicit_ollama_ignores_autodetect(): def test_create_timmy_explicit_ollama_ignores_autodetect():
"""backend='ollama' must always use Ollama, even on Apple Silicon.""" """backend='ollama' must always use Ollama, even on Apple Silicon."""
with ( with (
@@ -113,6 +141,7 @@ def test_create_timmy_explicit_ollama_ignores_autodetect():
def test_resolve_backend_explicit_takes_priority(): def test_resolve_backend_explicit_takes_priority():
from timmy.agent import _resolve_backend from timmy.agent import _resolve_backend
assert _resolve_backend("airllm") == "airllm"
assert _resolve_backend("ollama") == "ollama" assert _resolve_backend("ollama") == "ollama"
@@ -123,6 +152,39 @@ def test_resolve_backend_defaults_to_ollama_without_config():
assert _resolve_backend(None) == "ollama" assert _resolve_backend(None) == "ollama"
def test_resolve_backend_auto_uses_airllm_on_apple_silicon():
"""'auto' on Apple Silicon with airllm stubbed → 'airllm'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=True),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "airllm"
def test_resolve_backend_auto_falls_back_on_non_apple():
"""'auto' on non-Apple Silicon → 'ollama'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "ollama"
# ── _model_supports_tools ────────────────────────────────────────────────────
def test_model_supports_tools_llama32_returns_false(): def test_model_supports_tools_llama32_returns_false():
"""llama3.2 (3B) is too small for reliable tool calling.""" """llama3.2 (3B) is too small for reliable tool calling."""
from timmy.agent import _model_supports_tools from timmy.agent import _model_supports_tools
@@ -197,6 +259,7 @@ def test_create_timmy_includes_tools_for_large_model():
mock_settings.ollama_url = "http://localhost:11434" mock_settings.ollama_url = "http://localhost:11434"
mock_settings.ollama_num_ctx = 4096 mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama" mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
mock_settings.telemetry_enabled = False mock_settings.telemetry_enabled = False
from timmy.agent import create_timmy from timmy.agent import create_timmy
@@ -381,150 +444,6 @@ def test_get_effective_ollama_model_walks_fallback_chain():
assert result == "fb-2" assert result == "fb-2"
# ── _build_tools_list ─────────────────────────────────────────────────────
def test_build_tools_list_empty_when_tools_disabled():
"""Small models get an empty tools list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
assert result == []
def test_build_tools_list_includes_toolkit_when_enabled():
"""Tool-capable models get the full toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
assert mock_toolkit in result
def test_build_tools_list_skips_mcp_when_flagged():
"""skip_mcp=True must not call MCP factories."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
mock_gitea.assert_not_called()
mock_fs.assert_not_called()
def test_build_tools_list_includes_mcp_when_not_skipped():
"""skip_mcp=False should attempt MCP tool creation."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
mock_gitea.assert_called_once()
mock_fs.assert_called_once()
# ── _build_prompt ─────────────────────────────────────────────────────────
def test_build_prompt_includes_base_prompt():
"""Prompt should always contain the base system prompt."""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""Memory context should be appended when available."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "User prefers dark mode."
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "GROUNDED CONTEXT" in result
assert "dark mode" in result
def test_build_prompt_truncates_long_memory():
"""Long memory context should be truncated."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "x" * 10000
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "[truncated]" in result
def test_build_prompt_survives_memory_failure():
"""Prompt should fall back to base when memory fails."""
mock_memory = MagicMock()
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
# Memory context should NOT be appended (the db locked error was caught)
assert "db locked" not in result
# ── _create_ollama_agent ──────────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent must pass the expected kwargs to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.1",
tools_list=[MagicMock()],
full_prompt="test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "test prompt"
assert kwargs["markdown"] is False
def test_create_ollama_agent_none_tools_when_empty():
"""Empty tools_list should pass tools=None to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.2",
tools_list=[],
full_prompt="test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None
def test_no_hardcoded_fallback_constants_in_agent(): def test_no_hardcoded_fallback_constants_in_agent():
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS.""" """agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
import timmy.agent as agent_mod import timmy.agent as agent_mod

View File

@@ -1,7 +1,10 @@
"""Tests for src/timmy/backends.py — backend helpers and classes.""" """Tests for src/timmy/backends.py — AirLLM wrapper and helpers."""
import sys
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest
# ── is_apple_silicon ────────────────────────────────────────────────────────── # ── is_apple_silicon ──────────────────────────────────────────────────────────
@@ -35,6 +38,183 @@ def test_is_apple_silicon_false_on_intel_mac():
assert is_apple_silicon() is False assert is_apple_silicon() is False
# ── airllm_available ─────────────────────────────────────────────────────────
def test_airllm_available_true_when_stub_in_sys_modules():
# conftest already stubs 'airllm' — importable → True.
from timmy.backends import airllm_available
assert airllm_available() is True
def test_airllm_available_false_when_not_importable():
# Temporarily remove the stub to simulate airllm not installed.
saved = sys.modules.pop("airllm", None)
try:
from timmy.backends import airllm_available
assert airllm_available() is False
finally:
if saved is not None:
sys.modules["airllm"] = saved
# ── TimmyAirLLMAgent construction ────────────────────────────────────────────
def test_airllm_agent_raises_on_unknown_size():
from timmy.backends import TimmyAirLLMAgent
with pytest.raises(ValueError, match="Unknown model size"):
TimmyAirLLMAgent(model_size="3b")
def test_airllm_agent_uses_automodel_on_non_apple():
"""Non-Apple-Silicon path uses AutoModel.from_pretrained."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
# sys.modules["airllm"] is a MagicMock; AutoModel.from_pretrained was called.
assert sys.modules["airllm"].AutoModel.from_pretrained.called
def test_airllm_agent_uses_mlx_on_apple_silicon():
"""Apple Silicon path uses AirLLMMLX, not AutoModel."""
with patch("timmy.backends.is_apple_silicon", return_value=True):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
assert sys.modules["airllm"].AirLLMMLX.called
def test_airllm_agent_resolves_correct_model_id_for_70b():
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import _AIRLLM_MODELS, TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="70b")
sys.modules["airllm"].AutoModel.from_pretrained.assert_called_with(_AIRLLM_MODELS["70b"])
# ── TimmyAirLLMAgent.print_response ──────────────────────────────────────────
def _make_agent(model_size: str = "8b") -> "TimmyAirLLMAgent": # noqa: F821
"""Helper: create an agent with a fully mocked underlying model."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size=model_size)
# Replace the underlying model with a clean mock that returns predictable output.
mock_model = MagicMock()
mock_tokenizer = MagicMock()
# tokenizer() returns a dict-like object with an "input_ids" tensor mock.
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 10] # shape[1] = prompt token count = 10
token_dict = {"input_ids": input_ids_mock}
mock_tokenizer.return_value = token_dict
# generate() returns a list of token sequences.
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(15))] # 15 tokens total
agent._model = mock_model
return agent
def test_print_response_calls_generate():
agent = _make_agent()
agent.print_response("What is sovereignty?", stream=True)
agent._model.generate.assert_called_once()
def test_print_response_decodes_only_generated_tokens():
agent = _make_agent()
agent.print_response("Hello", stream=False)
# decode should be called with tokens starting at index 10 (prompt length).
decode_call = agent._model.tokenizer.decode.call_args
token_slice = decode_call[0][0]
assert list(token_slice) == list(range(10, 15))
def test_print_response_updates_history():
agent = _make_agent()
agent.print_response("First message")
assert any("First message" in turn for turn in agent._history)
assert any("Timmy:" in turn for turn in agent._history)
def test_print_response_history_included_in_second_prompt():
agent = _make_agent()
agent.print_response("First")
# Build the prompt for the second call — history should appear.
prompt = agent._build_prompt("Second")
assert "First" in prompt
assert "Second" in prompt
def test_print_response_stream_flag_accepted():
"""stream=False should not raise — it's accepted for API compatibility."""
agent = _make_agent()
agent.print_response("hello", stream=False) # no error
# ── Prompt formatting tests ────────────────────────────────────────────────
def test_airllm_prompt_contains_formatted_model_name():
"""AirLLM prompt should have actual model name, not literal {model_name}."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "llama3.2:3b"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the actual model name, not the placeholder
assert "{model_name}" not in prompt
assert "llama3.2:3b" in prompt
def test_airllm_prompt_gets_lite_tier():
"""AirLLM should get LITE tier prompt (tools_enabled=False)."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# LITE tier should NOT have TOOL USAGE section
assert "TOOL USAGE" not in prompt
# LITE tier should have the basic rules
assert "Be brief by default" in prompt
def test_airllm_prompt_contains_session_id():
"""AirLLM prompt should have session_id formatted, not placeholder."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the session_id, not the placeholder
assert '{session_id}"' not in prompt
assert 'session "airllm"' in prompt
# ── ClaudeBackend ───────────────────────────────────────────────────────── # ── ClaudeBackend ─────────────────────────────────────────────────────────

View File

@@ -107,7 +107,19 @@ def test_chat_new_session_uses_unique_id():
def test_chat_passes_backend_option(): def test_chat_passes_backend_option():
pass """chat --backend airllm must forward the backend to create_timmy."""
mock_run_output = MagicMock()
mock_run_output.content = "OK"
mock_run_output.status = "COMPLETED"
mock_run_output.active_requirements = []
mock_timmy = MagicMock()
mock_timmy.run.return_value = mock_run_output
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["chat", "test", "--backend", "airllm"])
mock_create.assert_called_once_with(backend="airllm", model_size=None, session_id="cli")
def test_chat_cleans_response(): def test_chat_cleans_response():

View File

@@ -250,6 +250,99 @@ def test_continuity_includes_recent(tmp_path):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# _generate_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_generate_thought_returns_content_and_seed_type(tmp_path):
"""_generate_thought should return (content, seed_type) on success."""
from timmy.thinking import SEED_TYPES
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A novel idea."):
content, seed_type = await engine._generate_thought()
assert content == "A novel idea."
assert seed_type in SEED_TYPES
@pytest.mark.asyncio
async def test_generate_thought_with_prompt(tmp_path):
"""_generate_thought(prompt=...) should use 'prompted' seed type."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A prompted idea."):
content, seed_type = await engine._generate_thought(prompt="Reflect on joy")
assert content == "A prompted idea."
assert seed_type == "prompted"
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_agent_failure(tmp_path):
"""_generate_thought should return (None, ...) when the agent fails."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
content, seed_type = await engine._generate_thought()
assert content is None
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_empty(tmp_path):
"""_generate_thought should return (None, ...) when agent returns empty."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value=" "):
content, seed_type = await engine._generate_thought()
assert content is None
# ---------------------------------------------------------------------------
# _finalize_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_finalize_thought_calls_all_hooks(tmp_path):
"""_finalize_thought should call all post-hooks, log, journal, and broadcast."""
engine = _make_engine(tmp_path)
thought = engine._store_thought("Test finalize.", "freeform")
with (
patch.object(engine, "_maybe_check_memory") as m_mem,
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m_distill,
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m_issues,
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m_ws,
patch.object(engine, "_maybe_check_memory_status") as m_status,
patch.object(engine, "_update_memory") as m_update,
patch.object(engine, "_log_event") as m_log,
patch.object(engine, "_write_journal") as m_journal,
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m_broadcast,
):
await engine._finalize_thought(thought)
m_mem.assert_called_once()
m_distill.assert_awaited_once()
m_issues.assert_awaited_once()
m_ws.assert_awaited_once()
m_status.assert_called_once()
m_update.assert_called_once_with(thought)
m_log.assert_called_once_with(thought)
m_journal.assert_called_once_with(thought)
m_broadcast.assert_awaited_once_with(thought)
# ---------------------------------------------------------------------------
# think_once (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_think_once_stores_thought(tmp_path): async def test_think_once_stores_thought(tmp_path):
"""think_once should store a thought in the DB.""" """think_once should store a thought in the DB."""