Compare commits

...

23 Commits

Author SHA1 Message Date
kimi
ddb9c7d8ca refactor: break up search_memories() into focused helpers
Extract _build_memory_filter(), _fetch_memory_candidates(),
_row_to_entry(), and _score_and_rank() from the 82-line
search_memories() function for better readability and testability.

Fixes #554

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:17:49 -04:00
f361893fdd [loop-cycle-951] refactor: break up _migrate_schema() (#552) (#558) 2026-03-19 21:11:02 -04:00
7ad0ee17b6 refactor: break up shell.py::run() into helpers (#551)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:04:10 -04:00
29220b6bdd refactor: break up api_chat() into helpers (#547)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:02:04 -04:00
2849dba756 [loop-cycle-948] refactor: break up _gather_system_snapshot() into helpers (#540) (#549) 2026-03-19 20:52:13 -04:00
e11e07f117 [loop-cycle-947] refactor: break up self_reflect() into focused helpers (#505) (#546) 2026-03-19 20:49:18 -04:00
50c8a5428e refactor: break up api_chat() into helpers (#544)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:49:04 -04:00
7da434c85b [loop-cycle-946] refactor: complete airllm removal (#486) (#545) 2026-03-19 20:46:20 -04:00
88e59f7c17 refactor: break up chat_agent() into helpers (#542)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:38:46 -04:00
aa5e9c3176 refactor: break up get_memory_status() into helpers (#537)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:30:29 -04:00
1b4fe65650 fix: cache thinking agent and add timeouts to prevent loop pane death (#535)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:27:25 -04:00
2d69f73d9d fix: add timeout to thinking/loop-QA schedulers (#530)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:18:31 -04:00
ff1e43c235 [loop-cycle-545] fix: queue auto-hygiene — filter closed issues on read (#524) (#529) 2026-03-19 20:10:05 -04:00
b331aa6139 refactor: break up capture_error() into testable helpers (#523)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:03:28 -04:00
b45b543f2d refactor: break up create_timmy() into testable helpers (#520)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:51:59 -04:00
7c823ab59c refactor: break up think_once() into testable helpers (#518)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:43:26 -04:00
9f2728f529 refactor: break up lifespan() into testable helpers (#515)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:30:32 -04:00
cd3dc5d989 refactor: break up CascadeRouter.complete() into focused helpers (#510)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:24:36 -04:00
e4de539bf3 fix: extract ollama_url normalization into shared utility (#508)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:18:22 -04:00
b2057f72e1 [loop-cycle] refactor: break up run_agentic_loop into testable helpers (#504) (#509) 2026-03-19 19:15:38 -04:00
5f52dd54c0 [loop-cycle-932] fix: add logging to bare except Exception blocks (#484) (#501) 2026-03-19 19:05:02 -04:00
9ceffd61d1 [loop-cycle-544] fix: use settings.ollama_url fallback in _call_ollama (#490) (#498) 2026-03-19 16:18:39 -04:00
015d858be5 fix: auto-detect issue number in cycle retro from git branch (#495)
## Summary
- `cycle_retro.py` now auto-detects issue number from the git branch name (e.g. `kimi/issue-492` → `492`) when `--issue` is not provided
- `backfill_retro.py` now skips the PR number suffix Gitea appends to titles so it does not confuse PR numbers with issue numbers
- Added tests for both fixes

Fixes #492

Co-authored-by: kimi <kimi@localhost>
Reviewed-on: http://localhost:3000/rockachopa/Timmy-time-dashboard/pulls/495
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 16:13:35 -04:00
30 changed files with 1680 additions and 1462 deletions

View File

@@ -94,12 +94,17 @@ def extract_cycle_number(title: str) -> int | None:
return int(m.group(1)) if m else None
def extract_issue_number(title: str, body: str) -> int | None:
# Try body first (usually has "closes #N")
def extract_issue_number(title: str, body: str, pr_number: int | None = None) -> int | None:
"""Extract the issue number from PR body/title, ignoring the PR number itself.
Gitea appends "(#N)" to PR titles where N is the PR number — skip that
so we don't confuse it with the linked issue.
"""
for text in [body or "", title]:
m = ISSUE_RE.search(text)
if m:
return int(m.group(1))
for m in ISSUE_RE.finditer(text):
num = int(m.group(1))
if num != pr_number:
return num
return None
@@ -140,7 +145,7 @@ def main():
else:
cycle_counter = max(cycle_counter, cycle)
issue = extract_issue_number(title, body)
issue = extract_issue_number(title, body, pr_number=pr_num)
issue_type = classify_pr(title, body)
duration = estimate_duration(pr)
diff = get_pr_diff_stats(token, pr_num)

View File

@@ -44,6 +44,8 @@ from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
@@ -56,6 +58,23 @@ EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
# How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50
# Branch patterns that encode an issue number, e.g. kimi/issue-492
BRANCH_ISSUE_RE = re.compile(r"issue[/-](\d+)", re.IGNORECASE)
def detect_issue_from_branch() -> int | None:
"""Try to extract an issue number from the current git branch name."""
try:
branch = subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL,
text=True,
).strip()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
m = BRANCH_ISSUE_RE.search(branch)
return int(m.group(1)) if m else None
# ── Epoch turnover ────────────────────────────────────────────────────────
@@ -230,6 +249,10 @@ def update_summary() -> None:
def main() -> None:
args = parse_args()
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()
# Reject idle cycles — no issue and no duration means nothing happened
if not args.issue and args.duration == 0:
print(f"[retro] Cycle {args.cycle} skipped — idle (no issue, no duration)")

View File

@@ -18,13 +18,19 @@ Exit codes:
from __future__ import annotations
import json
import os
import sys
import time
import urllib.request
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Backoff sequence: 60s, 120s, 240s, 600s max
BACKOFF_BASE = 60
@@ -32,19 +38,81 @@ BACKOFF_MAX = 600
BACKOFF_MULTIPLIER = 2
def _get_token() -> str:
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "").strip()
if not token and TOKEN_FILE.exists():
token = TOKEN_FILE.read_text().strip()
return token
def _fetch_open_issue_numbers() -> set[int] | None:
"""Fetch open issue numbers from Gitea. Returns None on failure."""
token = _get_token()
if not token:
return None
try:
numbers: set[int] = set()
page = 1
while True:
url = (
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
f"?state=open&type=issues&limit=50&page={page}"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
if not data:
break
for issue in data:
numbers.add(issue["number"])
if len(data) < 50:
break
page += 1
return numbers
except Exception:
return None
def load_queue() -> list[dict]:
"""Load queue.json and return ready items."""
"""Load queue.json and return ready items, filtering out closed issues."""
if not QUEUE_FILE.exists():
return []
try:
data = json.loads(QUEUE_FILE.read_text())
if isinstance(data, list):
return [item for item in data if item.get("ready")]
return []
if not isinstance(data, list):
return []
ready = [item for item in data if item.get("ready")]
if not ready:
return []
# Filter out issues that are no longer open (auto-hygiene)
open_numbers = _fetch_open_issue_numbers()
if open_numbers is not None:
before = len(ready)
ready = [item for item in ready if item.get("issue") in open_numbers]
removed = before - len(ready)
if removed > 0:
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
return []
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
"""Rewrite queue.json without closed issues."""
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
try:
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
except OSError:
pass
def load_idle_state() -> dict:
"""Load persistent idle state."""
if not IDLE_STATE_FILE.exists():

View File

@@ -10,6 +10,11 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
APP_START_TIME: _datetime = _datetime.now(UTC)
def normalize_ollama_url(url: str) -> str:
"""Replace localhost with 127.0.0.1 to avoid IPv6 resolution delays."""
return url.replace("localhost", "127.0.0.1")
class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class."""
@@ -19,6 +24,11 @@ class Settings(BaseSettings):
# Ollama host — override with OLLAMA_URL env var or .env file
ollama_url: str = "http://localhost:11434"
@property
def normalized_ollama_url(self) -> str:
"""Return ollama_url with localhost replaced by 127.0.0.1."""
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware.
@@ -244,6 +254,7 @@ class Settings(BaseSettings):
# When enabled, the agent starts an internal thought loop on server start.
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
thinking_timeout_seconds: int = 120 # max wall-clock time per thinking cycle
thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought
thinking_issue_every: int = 20 # file Gitea issues from thoughts every Nth thought
thinking_memory_check_every: int = 50 # check memory status every Nth thought
@@ -392,7 +403,7 @@ def check_ollama_model_available(model_name: str) -> bool:
import json
import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -155,7 +155,17 @@ async def _thinking_scheduler() -> None:
while True:
try:
if settings.thinking_enabled:
await thinking_engine.think_once()
await asyncio.wait_for(
thinking_engine.think_once(),
timeout=settings.thinking_timeout_seconds,
)
except TimeoutError:
logger.warning(
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Thinking scheduler error: %s", exc)
@@ -175,7 +185,10 @@ async def _loop_qa_scheduler() -> None:
while True:
try:
if settings.loop_qa_enabled:
result = await loop_qa_orchestrator.run_next_test()
result = await asyncio.wait_for(
loop_qa_orchestrator.run_next_test(),
timeout=settings.thinking_timeout_seconds,
)
if result:
status = "PASS" if result["success"] else "FAIL"
logger.info(
@@ -184,6 +197,13 @@ async def _loop_qa_scheduler() -> None:
status,
result.get("details", "")[:80],
)
except TimeoutError:
logger.warning(
"Loop QA test timed out after %ds",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Loop QA scheduler error: %s", exc)
@@ -329,33 +349,35 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
# Validate security config (no-op in test mode)
def _startup_init() -> None:
"""Validate config and enable event persistence."""
from config import validate_startup
validate_startup()
# Enable event persistence (unified EventBus + swarm event_log)
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
# Create all background tasks without waiting for them
briefing_task = asyncio.create_task(_briefing_scheduler())
thinking_task = asyncio.create_task(_thinking_scheduler())
loop_qa_task = asyncio.create_task(_loop_qa_scheduler())
presence_task = asyncio.create_task(_presence_watcher())
# Initialize Spark Intelligence engine
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
# Auto-prune old vector store memories on startup
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0:
try:
from timmy.memory_system import prune_memories
@@ -373,7 +395,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
# Auto-prune old thoughts on startup
if settings.thoughts_prune_days > 0:
try:
from timmy.thinking import thinking_engine
@@ -391,7 +412,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Thought auto-prune skipped: %s", exc)
# Auto-prune old system events on startup
if settings.events_prune_days > 0:
try:
from swarm.event_log import prune_old_events
@@ -409,7 +429,6 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Event auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0:
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
@@ -425,37 +444,18 @@ async def lifespan(app: FastAPI):
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Start chat integrations in background
chat_task = asyncio.create_task(_start_chat_integrations_background())
# Register session logger with error capture (breaks infrastructure → timmy circular dep)
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
pass
logger.info("✓ Dashboard ready for requests")
yield
# Cleanup on shutdown
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
) -> None:
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
await discord_bot.stop()
await telegram_bot.stop()
# Close MCP tool server sessions
try:
from timmy.mcp_tools import close_mcp_sessions
@@ -465,13 +465,42 @@ async def lifespan(app: FastAPI):
await workshop_heartbeat.stop()
for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]:
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
for task in bg_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
app = FastAPI(

View File

@@ -71,27 +71,87 @@ async def clear_history(request: Request):
)
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
def _validate_message(message: str) -> str:
"""Strip and validate chat input; raise HTTPException on bad input."""
from fastapi import HTTPException
message = message.strip()
if not message:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="Message cannot be empty")
if len(message) > MAX_MESSAGE_LENGTH:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="Message too long")
return message
# Record user activity so the thinking engine knows we're not idle
def _record_user_activity() -> None:
"""Notify the thinking engine that the user is active."""
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
pass
logger.debug("Failed to record user input for thinking engine")
def _extract_tool_actions(run_output) -> list[dict]:
"""If Agno paused the run for tool confirmation, build approval items."""
from timmy.approvals import create_item
tool_actions: list[dict] = []
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if not (is_paused and getattr(run_output, "active_requirements", None)):
return tool_actions
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
return tool_actions
def _log_exchange(
message: str, response_text: str | None, error_text: str | None, timestamp: str
) -> None:
"""Append user message and agent/error reply to the in-memory log."""
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
message = _validate_message(message)
_record_user_activity()
timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None
@@ -104,54 +164,15 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = f"Chat error: {exc}"
run_output = None
# Check if Agno paused the run for tool confirmation
tool_actions = []
tool_actions: list[dict] = []
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from timmy.approvals import create_item
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
tool_actions = _extract_tool_actions(run_output)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response_text = _clean_response(raw_content or "")
if not response_text and not tool_actions:
response_text = None # let error template show if needed
response_text = None
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
_log_exchange(message, response_text, error_text, timestamp)
return templates.TemplateResponse(
request,

View File

@@ -31,6 +31,93 @@ _UPLOAD_DIR = str(Path(settings.repo_root) / "data" / "chat-uploads")
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB
# ── POST /api/chat — helpers ─────────────────────────────────────────────────
async def _parse_chat_body(request: Request) -> tuple[dict | None, JSONResponse | None]:
"""Parse and validate the JSON request body.
Returns (body, None) on success or (None, error_response) on failure.
"""
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return None, JSONResponse(status_code=413, content={"error": "Request body too large"})
try:
body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return None, JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return None, JSONResponse(status_code=400, content={"error": "messages array is required"})
return body, None
def _extract_user_message(messages: list[dict]) -> str | None:
"""Return the text of the last user message, or *None* if absent."""
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
return " ".join(text_parts).strip() or None
text = str(content).strip()
return text or None
return None
def _build_context_prefix() -> str:
"""Build the system-context preamble injected before the user message."""
now = datetime.now()
return (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
def _notify_thinking_engine() -> None:
"""Record user activity so the thinking engine knows we're not idle."""
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
async def _process_chat(user_msg: str) -> dict | JSONResponse:
"""Send *user_msg* to the agent, log the exchange, and return a response."""
_notify_thinking_engine()
timestamp = datetime.now().strftime("%H:%M:%S")
try:
response_text = await agent_chat(
_build_context_prefix() + user_msg,
session_id="mobile",
)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
# ── POST /api/chat ────────────────────────────────────────────────────────────
@@ -44,78 +131,15 @@ async def api_chat(request: Request):
Response:
{"reply": "...", "timestamp": "HH:MM:SS"}
"""
# Enforce request body size limit
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return JSONResponse(status_code=413, content={"error": "Request body too large"})
body, err = await _parse_chat_body(request)
if err:
return err
try:
body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return JSONResponse(status_code=400, content={"error": "messages array is required"})
# Extract the latest user message text
last_user_msg = None
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
# Handle multimodal content arrays — extract text parts
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
last_user_msg = " ".join(text_parts).strip()
else:
last_user_msg = str(content).strip()
break
if not last_user_msg:
user_msg = _extract_user_message(body["messages"])
if not user_msg:
return JSONResponse(status_code=400, content={"error": "No user message found"})
# Record user activity so the thinking engine knows we're not idle
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
pass
timestamp = datetime.now().strftime("%H:%M:%S")
try:
# Inject context (same pattern as the HTMX chat handler in agents.py)
now = datetime.now()
context_prefix = (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
response_text = await agent_chat(
context_prefix + last_user_msg,
session_id="mobile",
)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
return await _process_chat(user_msg)
# ── POST /api/upload ──────────────────────────────────────────────────────────

View File

@@ -65,7 +65,7 @@ def _check_ollama_sync() -> DependencyStatus:
try:
import urllib.request
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",

View File

@@ -166,7 +166,7 @@ async def api_briefing_status():
if cached:
last_generated = cached.generated_at.isoformat()
except Exception:
pass
logger.debug("Failed to read briefing cache")
return JSONResponse(
{
@@ -190,6 +190,7 @@ async def api_memory_status():
stats = get_memory_stats()
indexed_files = stats.get("total_entries", 0)
except Exception:
logger.debug("Failed to get memory stats")
indexed_files = 0
return JSONResponse(
@@ -215,7 +216,7 @@ async def api_swarm_status():
).fetchone()
pending_tasks = row["cnt"] if row else 0
except Exception:
pass
logger.debug("Failed to count pending tasks")
return JSONResponse(
{

View File

@@ -221,7 +221,7 @@ async def _heartbeat(websocket: WebSocket) -> None:
await asyncio.sleep(_HEARTBEAT_INTERVAL)
await websocket.send_text(json.dumps({"type": "ping"}))
except Exception:
pass # connection gone — receive loop will clean up
logger.debug("Heartbeat stopped — connection gone")
@router.websocket("/ws")
@@ -250,7 +250,7 @@ async def world_ws(websocket: WebSocket) -> None:
raw = await websocket.receive_text()
await _handle_client_message(raw)
except Exception:
pass
logger.debug("WebSocket receive loop ended")
finally:
ping_task.cancel()
if websocket in _ws_clients:
@@ -265,6 +265,7 @@ async def _broadcast(message: str) -> None:
try:
await ws.send_text(message)
except Exception:
logger.debug("Pruning dead WebSocket client")
dead.append(ws)
for ws in dead:
if ws in _ws_clients:
@@ -340,7 +341,7 @@ async def _bark_and_broadcast(visitor_text: str) -> None:
pip_familiar.on_event("visitor_spoke")
except Exception:
pass # Pip is optional
logger.debug("Pip familiar notification failed (optional)")
_refresh_ground(visitor_text)
_tick_commitments()

View File

@@ -100,36 +100,14 @@ def _get_git_context() -> dict:
return {"branch": "unknown", "commit": "unknown"}
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]:
"""Extract formatted traceback, affected file, and line number.
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
Tuple of (traceback_string, affected_file, affected_line).
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
# Format the stack trace
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
# Extract file/line from traceback
tb_obj = exc.__traceback__
affected_file = "unknown"
affected_line = 0
@@ -139,9 +117,18 @@ def capture_error(
affected_file = tb_obj.tb_frame.f_code.co_filename
affected_line = tb_obj.tb_lineno
git_ctx = _get_git_context()
return tb_str, affected_file, affected_line
# 1. Log to event_log
def _log_error_event(
exc: Exception,
source: str,
error_hash: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> None:
"""Log the captured error to the event log."""
try:
from swarm.event_log import EventType, log_event
@@ -161,8 +148,18 @@ def capture_error(
except Exception as log_exc:
logger.debug("Failed to log error event: %s", log_exc)
# 2. Create bug report task
task_id = None
def _create_bug_report(
exc: Exception,
source: str,
context: dict | None,
error_hash: str,
tb_str: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> str | None:
"""Create a bug report task and return the task ID (or None on failure)."""
try:
from swarm.task_queue.models import create_task
@@ -195,7 +192,6 @@ def capture_error(
)
task_id = task.id
# Log the creation event
try:
from swarm.event_log import EventType, log_event
@@ -210,12 +206,16 @@ def capture_error(
)
except Exception as exc:
logger.warning("Bug report screenshot error: %s", exc)
pass
return task_id
except Exception as task_exc:
logger.debug("Failed to create bug report task: %s", task_exc)
return None
# 3. Send notification
def _notify_bug_report(exc: Exception, source: str) -> None:
"""Send a push notification about the captured error."""
try:
from infrastructure.notifications.push import notifier
@@ -224,11 +224,12 @@ def capture_error(
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
category="system",
)
except Exception as exc:
logger.warning("Bug report notification error: %s", exc)
pass
except Exception as notify_exc:
logger.warning("Bug report notification error: %s", notify_exc)
# 4. Record in session logger (via registered callback)
def _record_to_session(exc: Exception, source: str) -> None:
"""Record the error via the registered session callback."""
if _error_recorder is not None:
try:
_error_recorder(
@@ -238,4 +239,50 @@ def capture_error(
except Exception as log_exc:
logger.warning("Bug report session logging error: %s", log_exc)
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
tb_str, affected_file, affected_line = _extract_traceback_info(exc)
git_ctx = _get_git_context()
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
task_id = _create_bug_report(
exc,
source,
context,
error_hash,
tb_str,
affected_file,
affected_line,
git_ctx,
)
_notify_bug_report(exc, source)
_record_to_session(exc, source)
return task_id

View File

@@ -144,6 +144,65 @@ class ShellHand:
return None
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into a copy of the current environment."""
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
return run_env
async def _execute_subprocess(
self,
command: str,
effective_timeout: int,
cwd: str | None,
run_env: dict,
start: float,
) -> ShellResult:
"""Run *command* as a subprocess with timeout enforcement."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
async def run(
self,
command: str,
@@ -164,7 +223,6 @@ class ShellHand:
"""
start = time.time()
# Validate
validation_error = self._validate_command(command)
if validation_error:
return ShellResult(
@@ -178,52 +236,8 @@ class ShellHand:
cwd = working_dir or self._working_dir
try:
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
run_env = self._build_run_env(env)
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc)

View File

@@ -13,7 +13,7 @@ import logging
from dataclasses import dataclass, field
from enum import Enum, auto
from config import settings
from config import normalize_ollama_url, settings
logger = logging.getLogger(__name__)
@@ -307,7 +307,7 @@ class MultiModalManager:
import json
import urllib.request
url = self.ollama_url.replace("localhost", "127.0.0.1")
url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",
@@ -462,7 +462,7 @@ class MultiModalManager:
logger.info("Pulling model: %s", model_name)
url = self.ollama_url.replace("localhost", "127.0.0.1")
url = normalize_ollama_url(self.ollama_url)
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",

View File

@@ -388,6 +388,101 @@ class CascadeRouter:
return None
def _select_model(
self, provider: Provider, model: str | None, content_type: ContentType
) -> tuple[str | None, bool]:
"""Select the best model for the request, with vision fallback.
Returns:
Tuple of (selected_model, is_fallback_model).
"""
selected_model = model or provider.get_default_model()
is_fallback = False
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
fallback = self._get_fallback_model(provider, selected_model, content_type)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
return selected_model, is_fallback
async def _attempt_with_retry(
self,
provider: Provider,
messages: list[dict],
model: str | None,
temperature: float,
max_tokens: int | None,
content_type: ContentType,
) -> dict:
"""Try a provider with retries, returning the result dict.
Raises:
RuntimeError: If all retry attempts fail.
Returns error strings collected during retries via the exception message.
"""
errors: list[str] = []
for attempt in range(self.config.max_retries_per_provider):
try:
return await self._try_provider(
provider=provider,
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s",
provider.name,
attempt + 1,
error_msg,
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
raise RuntimeError("; ".join(errors))
def _is_provider_available(self, provider: Provider) -> bool:
"""Check if a provider should be tried (enabled + circuit breaker)."""
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
return False
if provider.status == ProviderStatus.UNHEALTHY:
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
return False
return True
async def complete(
self,
messages: list[dict],
@@ -414,7 +509,6 @@ class CascadeRouter:
Raises:
RuntimeError: If all providers fail
"""
# Detect content type for multi-modal routing
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
@@ -422,93 +516,34 @@ class CascadeRouter:
errors = []
for provider in self.providers:
# Skip disabled providers
if not provider.enabled:
logger.debug("Skipping %s (disabled)", provider.name)
if not self._is_provider_available(provider):
continue
# Skip unhealthy providers (circuit breaker)
if provider.status == ProviderStatus.UNHEALTHY:
# Check if circuit breaker can close
if self._can_close_circuit(provider):
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
logger.info("Circuit breaker half-open for %s", provider.name)
else:
logger.debug("Skipping %s (circuit open)", provider.name)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
# Determine which model to use
selected_model = model or provider.get_default_model()
is_fallback_model = False
try:
result = await self._attempt_with_retry(
provider,
messages,
selected_model,
temperature,
max_tokens,
content_type,
)
except RuntimeError as exc:
errors.append(str(exc))
self._record_failure(provider)
continue
# For non-text content, check if model supports it
if content_type != ContentType.TEXT and selected_model:
if provider.type == "ollama" and self._mm_manager:
from infrastructure.models.multimodal import ModelCapability
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
# Check if selected model supports the required capability
if content_type == ContentType.VISION:
supports = self._mm_manager.model_supports(
selected_model, ModelCapability.VISION
)
if not supports:
# Find fallback model
fallback = self._get_fallback_model(
provider, selected_model, content_type
)
if fallback:
logger.info(
"Model %s doesn't support vision, falling back to %s",
selected_model,
fallback,
)
selected_model = fallback
is_fallback_model = True
else:
logger.warning(
"No vision-capable model found on %s, trying anyway",
provider.name,
)
# Try this provider
for attempt in range(self.config.max_retries_per_provider):
try:
result = await self._try_provider(
provider=provider,
messages=messages,
model=selected_model,
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
# Success! Update metrics and return
self._record_success(provider, result.get("latency_ms", 0))
return {
"content": result["content"],
"provider": provider.name,
"model": result.get(
"model", selected_model or provider.get_default_model()
),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
}
except Exception as exc:
error_msg = str(exc)
logger.warning(
"Provider %s attempt %d failed: %s", provider.name, attempt + 1, error_msg
)
errors.append(f"{provider.name}: {error_msg}")
if attempt < self.config.max_retries_per_provider - 1:
await asyncio.sleep(self.config.retry_delay_seconds)
# All retries failed for this provider
self._record_failure(provider)
# All providers failed
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
async def _try_provider(
@@ -574,7 +609,7 @@ class CascadeRouter:
"""Call Ollama API with multi-modal support."""
import aiohttp
url = f"{provider.url}/api/chat"
url = f"{provider.url or settings.ollama_url}/api/chat"
# Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages)

View File

@@ -1 +1 @@
"""Timmy — Core AI agent (Ollama/AirLLM backends, CLI, prompts)."""
"""Timmy — Core AI agent (Ollama/Grok/Claude backends, CLI, prompts)."""

View File

@@ -26,12 +26,12 @@ from timmy.prompts import get_system_prompt
from timmy.tools import create_full_toolkit
if TYPE_CHECKING:
from timmy.backends import ClaudeBackend, GrokBackend, TimmyAirLLMAgent
from timmy.backends import ClaudeBackend, GrokBackend
logger = logging.getLogger(__name__)
# Union type for callers that want to hint the return type.
TimmyAgent = Union[Agent, "TimmyAirLLMAgent", "GrokBackend", "ClaudeBackend"]
TimmyAgent = Union[Agent, "GrokBackend", "ClaudeBackend"]
# Models known to be too small for reliable tool calling.
# These hallucinate tool calls as text, invoke tools randomly,
@@ -63,7 +63,7 @@ def _pull_model(model_name: str) -> bool:
logger.info("Pulling model: %s", model_name)
url = settings.ollama_url.replace("localhost", "127.0.0.1")
url = settings.normalized_ollama_url
req = urllib.request.Request(
f"{url}/api/pull",
method="POST",
@@ -172,107 +172,34 @@ def _warmup_model(model_name: str) -> bool:
def _resolve_backend(requested: str | None) -> str:
"""Return the backend name to use, resolving 'auto' and explicit overrides.
"""Return the backend name to use.
Priority (highest lowest):
Priority (highest -> lowest):
1. CLI flag passed directly to create_timmy()
2. TIMMY_MODEL_BACKEND env var / .env setting
3. 'ollama' (safe default no surprises)
'auto' triggers Apple Silicon detection: uses AirLLM if both
is_apple_silicon() and airllm_available() return True.
3. 'ollama' (safe default -- no surprises)
"""
if requested is not None:
return requested
configured = settings.timmy_model_backend # "ollama" | "airllm" | "grok" | "claude" | "auto"
if configured != "auto":
return configured
# "auto" path — lazy import to keep startup fast and tests clean.
from timmy.backends import airllm_available, is_apple_silicon
if is_apple_silicon() and airllm_available():
return "airllm"
return "ollama"
return settings.timmy_model_backend # "ollama" | "grok" | "claude"
def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
model_size: str | None = None,
*,
skip_mcp: bool = False,
session_id: str = "unknown",
) -> TimmyAgent:
"""Instantiate the agent — Ollama or AirLLM, same public interface.
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
"""Assemble the tools list based on model capability and MCP flags.
Args:
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "airllm" | "auto" | None (reads config/env).
model_size: AirLLM size — "8b" | "70b" | "405b" | None (reads config).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
task cancellation.
Returns an Agno Agent or backend-specific agent — all expose
print_response(message, stream).
Returns a list of Toolkit / MCPTools objects, or an empty list.
"""
resolved = _resolve_backend(backend)
size = model_size or "70b"
if resolved == "claude":
from timmy.backends import ClaudeBackend
return ClaudeBackend()
if resolved == "grok":
from timmy.backends import GrokBackend
return GrokBackend()
if resolved == "airllm":
from timmy.backends import TimmyAirLLMAgent
return TimmyAirLLMAgent(model_size=size)
# Default: Ollama via Agno.
# Resolve model with automatic pulling and fallback
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
require_vision=False,
auto_pull=True,
)
# If Ollama is completely unreachable, fail loudly.
# Sovereignty: never silently send data to a cloud API.
# Use --backend claude explicitly if you want cloud inference.
if not _check_model_available(model_name):
logger.error(
"Ollama unreachable and no local models available. "
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
)
if is_fallback:
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
# Conditionally include tools — small models get none
toolkit = create_full_toolkit() if use_tools else None
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
return []
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
tools_list: list = [create_full_toolkit()]
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
# scopes that conflict with asyncio background task cancellation (#72).
if use_tools and not skip_mcp:
if not skip_mcp:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
@@ -286,34 +213,46 @@ def create_timmy(
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
# Select prompt tier based on tool capability
return tools_list
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context."""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
# Try to load memory context
try:
from timmy.memory_system import memory_system
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
# Smaller budget for small models — expanded prompt uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
full_prompt = (
return (
f"{base_prompt}\n\n"
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
f"{memory_context}"
)
else:
full_prompt = base_prompt
except Exception as exc:
logger.warning("Failed to load memory context: %s", exc)
full_prompt = base_prompt
return base_prompt
def _create_ollama_agent(
*,
db_file: str,
model_name: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with Ollama backend and warm up the model."""
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
agent = Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
@@ -330,6 +269,67 @@ def create_timmy(
return agent
def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
*,
skip_mcp: bool = False,
session_id: str = "unknown",
) -> TimmyAgent:
"""Instantiate the agent — Ollama, Grok, or Claude.
Args:
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "grok" | "claude" | None (reads config/env).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
task cancellation.
Returns an Agno Agent or backend-specific agent — all expose
print_response(message, stream).
"""
resolved = _resolve_backend(backend)
if resolved == "claude":
from timmy.backends import ClaudeBackend
return ClaudeBackend()
if resolved == "grok":
from timmy.backends import GrokBackend
return GrokBackend()
# Default: Ollama via Agno.
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
require_vision=False,
auto_pull=True,
)
if not _check_model_available(model_name):
logger.error(
"Ollama unreachable and no local models available. "
"Start Ollama with 'ollama serve' or use --backend claude explicitly."
)
if is_fallback:
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
full_prompt = _build_prompt(use_tools, session_id)
return _create_ollama_agent(
db_file=db_file,
model_name=model_name,
tools_list=tools_list,
full_prompt=full_prompt,
use_tools=use_tools,
)
class TimmyWithMemory:
"""Agent wrapper with explicit three-tier memory management."""

View File

@@ -95,6 +95,126 @@ def _parse_steps(plan_text: str) -> list[str]:
return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
# ---------------------------------------------------------------------------
# Extracted helpers
# ---------------------------------------------------------------------------
def _extract_content(run_result) -> str:
"""Extract text content from an agent run result."""
return run_result.content if hasattr(run_result, "content") else str(run_result)
def _clean(text: str) -> str:
"""Clean a model response using session's response cleaner."""
from timmy.session import _clean_response
return _clean_response(text)
async def _plan_task(
agent, task: str, session_id: str, max_steps: int
) -> tuple[list[str], bool] | str:
"""Run the planning phase — returns (steps, was_truncated) or error string."""
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = _extract_content(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
return f"Planning failed: {exc}"
steps = _parse_steps(plan_text)
if not steps:
return "Planning produced no steps."
planned_count = len(steps)
steps = steps[:max_steps]
return steps, planned_count > len(steps)
async def _execute_step(
agent,
task: str,
step_desc: str,
step_num: int,
total_steps: int,
recent_results: list[str],
session_id: str,
) -> AgenticStep:
"""Execute a single step, returning an AgenticStep."""
step_start = time.monotonic()
context = (
f"Task: {task}\n"
f"Step {step_num}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent_results[-2:] if recent_results else []}\n\n"
f"Execute this step and report what you did."
)
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{step_num}"
)
step_result = _clean(_extract_content(step_run))
return AgenticStep(
step_num=step_num,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
async def _adapt_step(
agent,
step_desc: str,
step_num: int,
error: Exception,
step_start: float,
session_id: str,
) -> AgenticStep:
"""Attempt adaptation after a step failure."""
adapt_prompt = (
f"Step {step_num} failed with error: {error}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
adapt_run = await asyncio.to_thread(
agent.run, adapt_prompt, stream=False, session_id=f"{session_id}_adapt{step_num}"
)
adapt_result = _clean(_extract_content(adapt_run))
return AgenticStep(
step_num=step_num,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
def _summarize(result: AgenticResult, total_steps: int, was_truncated: bool) -> None:
"""Fill in summary and final status on the result object (mutates in place)."""
completed = sum(1 for s in result.steps if s.status == "completed")
adapted = sum(1 for s in result.steps if s.status == "adapted")
failed = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed}/{total_steps} steps"]
if adapted:
parts.append(f"{adapted} adapted")
if failed:
parts.append(f"{failed} failed")
result.summary = f"{result.task}: {', '.join(parts)}."
if was_truncated or len(result.steps) < total_steps or failed:
result.status = "partial"
else:
result.status = "completed"
# ---------------------------------------------------------------------------
# Core loop
# ---------------------------------------------------------------------------
@@ -125,88 +245,41 @@ async def run_agentic_loop(
task_id = str(uuid.uuid4())[:8]
start_time = time.monotonic()
agent = _get_loop_agent()
result = AgenticResult(task_id=task_id, task=task, summary="")
# ── Phase 1: Planning ──────────────────────────────────────────────────
plan_prompt = (
f"Break this task into numbered steps (max {max_steps}). "
f"Return ONLY a numbered list, nothing else.\n\n"
f"Task: {task}"
)
try:
plan_run = await asyncio.to_thread(
agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
)
plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.error("Agentic loop: planning failed: %s", exc)
# Phase 1: Planning
plan = await _plan_task(agent, task, session_id, max_steps)
if isinstance(plan, str):
result.status = "failed"
result.summary = f"Planning failed: {exc}"
result.summary = plan
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
steps = _parse_steps(plan_text)
if not steps:
result.status = "failed"
result.summary = "Planning produced no steps."
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
return result
# Enforce max_steps — track if we truncated
planned_steps = len(steps)
steps = steps[:max_steps]
steps, was_truncated = plan
total_steps = len(steps)
was_truncated = planned_steps > total_steps
# Broadcast plan
await _broadcast_progress(
"agentic.plan_ready",
{
"task_id": task_id,
"task": task,
"steps": steps,
"total": total_steps,
},
{"task_id": task_id, "task": task, "steps": steps, "total": total_steps},
)
# ── Phase 2: Execution ─────────────────────────────────────────────────
# Phase 2: Execution
completed_results: list[str] = []
for i, step_desc in enumerate(steps, 1):
step_start = time.monotonic()
recent = completed_results[-2:] if completed_results else []
context = (
f"Task: {task}\n"
f"Step {i}/{total_steps}: {step_desc}\n"
f"Recent progress: {recent}\n\n"
f"Execute this step and report what you did."
)
try:
step_run = await asyncio.to_thread(
agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
)
step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
# Clean the response
from timmy.session import _clean_response
step_result = _clean_response(step_result)
step = AgenticStep(
step_num=i,
description=step_desc,
result=step_result,
status="completed",
duration_ms=int((time.monotonic() - step_start) * 1000),
step = await _execute_step(
agent,
task,
step_desc,
i,
total_steps,
completed_results,
session_id,
)
result.steps.append(step)
completed_results.append(f"Step {i}: {step_result[:200]}")
# Broadcast progress
completed_results.append(f"Step {i}: {step.result[:200]}")
await _broadcast_progress(
"agentic.step_complete",
{
@@ -214,46 +287,18 @@ async def run_agentic_loop(
"step": i,
"total": total_steps,
"description": step_desc,
"result": step_result[:200],
"result": step.result[:200],
},
)
if on_progress:
await on_progress(step_desc, i, total_steps)
except Exception as exc: # broad catch intentional: agent.run can raise any error
logger.warning("Agentic loop step %d failed: %s", i, exc)
# ── Adaptation: ask model to adapt ─────────────────────────────
adapt_prompt = (
f"Step {i} failed with error: {exc}\n"
f"Original step was: {step_desc}\n"
f"Adapt the plan and try an alternative approach for this step."
)
try:
adapt_run = await asyncio.to_thread(
agent.run,
adapt_prompt,
stream=False,
session_id=f"{session_id}_adapt{i}",
)
adapt_result = (
adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
)
from timmy.session import _clean_response
adapt_result = _clean_response(adapt_result)
step = AgenticStep(
step_num=i,
description=f"[Adapted] {step_desc}",
result=adapt_result,
status="adapted",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
step = await _adapt_step(agent, step_desc, i, exc, step_start, session_id)
result.steps.append(step)
completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
completed_results.append(f"Step {i} (adapted): {step.result[:200]}")
await _broadcast_progress(
"agentic.step_adapted",
{
@@ -262,46 +307,26 @@ async def run_agentic_loop(
"total": total_steps,
"description": step_desc,
"error": str(exc),
"adaptation": adapt_result[:200],
"adaptation": step.result[:200],
},
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", i, total_steps)
except Exception as adapt_exc: # broad catch intentional: agent.run can raise any error
except Exception as adapt_exc: # broad catch intentional
logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
step = AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
result.steps.append(
AgenticStep(
step_num=i,
description=step_desc,
result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
status="failed",
duration_ms=int((time.monotonic() - step_start) * 1000),
)
)
result.steps.append(step)
completed_results.append(f"Step {i}: FAILED")
# ── Phase 3: Summary ───────────────────────────────────────────────────
completed_count = sum(1 for s in result.steps if s.status == "completed")
adapted_count = sum(1 for s in result.steps if s.status == "adapted")
failed_count = sum(1 for s in result.steps if s.status == "failed")
parts = [f"Completed {completed_count}/{total_steps} steps"]
if adapted_count:
parts.append(f"{adapted_count} adapted")
if failed_count:
parts.append(f"{failed_count} failed")
result.summary = f"{task}: {', '.join(parts)}."
# Determine final status
if was_truncated:
result.status = "partial"
elif len(result.steps) < total_steps:
result.status = "partial"
elif any(s.status == "failed" for s in result.steps):
result.status = "partial"
else:
result.status = "completed"
# Phase 3: Summary
_summarize(result, total_steps, was_truncated)
result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
await _broadcast_progress(

View File

@@ -1,11 +1,10 @@
"""LLM backends — AirLLM (local big models), Grok (xAI), and Claude (Anthropic).
"""LLM backends — Grok (xAI) and Claude (Anthropic).
Provides drop-in replacements for the Agno Agent that expose the same
run(message, stream) → RunResult interface used by the dashboard and the
print_response(message, stream) interface used by the CLI.
Backends:
- TimmyAirLLMAgent: Local 8B/70B/405B via AirLLM (Apple Silicon or PyTorch)
- GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium)
- ClaudeBackend: Anthropic Claude API — lightweight cloud fallback
@@ -16,21 +15,11 @@ import logging
import platform
import time
from dataclasses import dataclass
from typing import Literal
from timmy.prompts import get_system_prompt
logger = logging.getLogger(__name__)
# HuggingFace model IDs for each supported size.
_AIRLLM_MODELS: dict[str, str] = {
"8b": "meta-llama/Meta-Llama-3.1-8B-Instruct",
"70b": "meta-llama/Meta-Llama-3.1-70B-Instruct",
"405b": "meta-llama/Meta-Llama-3.1-405B-Instruct",
}
ModelSize = Literal["8b", "70b", "405b"]
@dataclass
class RunResult:
@@ -45,108 +34,6 @@ def is_apple_silicon() -> bool:
return platform.system() == "Darwin" and platform.machine() == "arm64"
def airllm_available() -> bool:
"""Return True when the airllm package is importable."""
try:
import airllm # noqa: F401
return True
except ImportError:
return False
class TimmyAirLLMAgent:
"""Thin AirLLM wrapper compatible with both dashboard and CLI call sites.
Exposes:
run(message, stream) → RunResult(content=...) [dashboard]
print_response(message, stream) → None [CLI]
Maintains a rolling 10-turn in-memory history so Timmy remembers the
conversation within a session — no SQLite needed at this layer.
"""
def __init__(self, model_size: str = "70b") -> None:
model_id = _AIRLLM_MODELS.get(model_size)
if model_id is None:
raise ValueError(
f"Unknown model size {model_size!r}. Choose from: {list(_AIRLLM_MODELS)}"
)
if is_apple_silicon():
from airllm import AirLLMMLX # type: ignore[import]
self._model = AirLLMMLX(model_id)
else:
from airllm import AutoModel # type: ignore[import]
self._model = AutoModel.from_pretrained(model_id)
self._history: list[str] = []
self._model_size = model_size
# ── public interface (mirrors Agno Agent) ────────────────────────────────
def run(self, message: str, *, stream: bool = False) -> RunResult:
"""Run inference and return a structured result (matches Agno Agent.run()).
`stream` is accepted for API compatibility; AirLLM always generates
the full output in one pass.
"""
prompt = self._build_prompt(message)
input_tokens = self._model.tokenizer(
[prompt],
return_tensors="pt",
padding=True,
truncation=True,
max_length=2048,
)
output = self._model.generate(
**input_tokens,
max_new_tokens=512,
use_cache=True,
do_sample=True,
temperature=0.7,
)
# Decode only the newly generated tokens, not the prompt.
input_len = input_tokens["input_ids"].shape[1]
response = self._model.tokenizer.decode(
output[0][input_len:], skip_special_tokens=True
).strip()
self._history.append(f"User: {message}")
self._history.append(f"Timmy: {response}")
return RunResult(content=response)
def print_response(self, message: str, *, stream: bool = True) -> None:
"""Run inference and render the response to stdout (CLI interface)."""
result = self.run(message, stream=stream)
self._render(result.content)
# ── private helpers ──────────────────────────────────────────────────────
def _build_prompt(self, message: str) -> str:
context = get_system_prompt(tools_enabled=False, session_id="airllm") + "\n\n"
# Include the last 10 turns (5 exchanges) for continuity.
if self._history:
context += "\n".join(self._history[-10:]) + "\n\n"
return context + f"User: {message}\nTimmy:"
@staticmethod
def _render(text: str) -> None:
"""Print response with rich markdown when available, plain text otherwise."""
try:
from rich.console import Console
from rich.markdown import Markdown
Console().print(Markdown(text))
except ImportError:
print(text)
# ── Grok (xAI) Backend ─────────────────────────────────────────────────────
# Premium cloud augmentation — opt-in only, never the default path.
@@ -187,7 +74,7 @@ class GrokBackend:
Uses the OpenAI-compatible SDK to connect to xAI's API.
Only activated when GROK_ENABLED=true and XAI_API_KEY is set.
Exposes the same interface as TimmyAirLLMAgent and Agno Agent:
Exposes the same interface as Agno Agent:
run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI]
health_check() → dict [monitoring]
@@ -437,8 +324,7 @@ CLAUDE_MODELS: dict[str, str] = {
class ClaudeBackend:
"""Anthropic Claude backend — cloud fallback when local models are offline.
Uses the official Anthropic SDK. Same interface as GrokBackend and
TimmyAirLLMAgent:
Uses the official Anthropic SDK. Same interface as GrokBackend:
run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI]
health_check() → dict [monitoring]

View File

@@ -22,13 +22,13 @@ _BACKEND_OPTION = typer.Option(
None,
"--backend",
"-b",
help="Inference backend: 'ollama' (default) | 'airllm' | 'auto'",
help="Inference backend: 'ollama' (default) | 'grok' | 'claude'",
)
_MODEL_SIZE_OPTION = typer.Option(
None,
"--model-size",
"-s",
help="AirLLM model size when --backend airllm: '8b' | '70b' | '405b'",
help="Model size (reserved for future use).",
)

View File

@@ -98,6 +98,73 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
@@ -110,78 +177,16 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists)
if not has_memories and (has_episodes or has_chunks or has_facts):
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old tables
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
if "episodes" in tables and has_memories:
_migrate_episodes(conn)
if "chunks" in tables and has_memories:
_migrate_chunks(conn)
if "facts" in tables:
_drop_legacy_table(conn, "facts")
conn.commit()
@@ -298,6 +303,86 @@ def store_memory(
return entry
def _build_memory_filter(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build WHERE clause and params for memory queries."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_rank(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
limit: int,
) -> list[MemoryEntry]:
"""Score candidates by similarity and return top results."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]
def search_memories(
query: str,
limit: int = 10,
@@ -320,66 +405,9 @@ def search_memories(
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
# Build query with filters
conditions = []
params = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
# Fetch candidates (we'll do in-memory similarity for now)
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(limit * 3) # Get more candidates for ranking
with get_connection() as conn:
rows = conn.execute(query_sql, params).fetchall()
# Compute similarity scores
results = []
for row in rows:
entry = MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
else:
# Fallback: check for keyword overlap
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
# Sort by relevance and return top results
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]
where_clause, params = _build_memory_filter(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
return _score_and_rank(rows, query, query_embedding, min_relevance, limit)
def delete_memory(memory_id: str) -> bool:
@@ -636,7 +664,7 @@ class HotMemory:
if len(lines) > 1:
return "\n".join(lines)
except Exception:
pass
logger.debug("DB context read failed, falling back to file")
# Fallback to file if DB unavailable
if self.path.exists():

View File

@@ -323,6 +323,75 @@ def session_history(query: str, role: str = "", limit: int = 10) -> str:
_LOW_CONFIDENCE_THRESHOLD = 0.5
def _categorize_entries(
entries: list[dict],
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
"""Split session entries into messages, errors, timmy msgs, user msgs."""
messages = [e for e in entries if e.get("type") == "message"]
errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
return messages, errors, timmy_msgs, user_msgs
def _find_low_confidence(timmy_msgs: list[dict]) -> list[dict]:
"""Return Timmy responses below the confidence threshold."""
return [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
def _find_repeated_topics(user_msgs: list[dict], top_n: int = 5) -> list[tuple[str, int]]:
"""Identify frequently mentioned words in user messages."""
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
return sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:top_n]
def _format_reflection_section(
title: str,
items: list[dict],
formatter: object,
empty_msg: str,
) -> list[str]:
"""Format a titled section with items, or an empty-state message."""
if items:
lines = [f"### {title} ({len(items)})"]
for item in items[:5]:
lines.append(formatter(item)) # type: ignore[operator]
lines.append("")
return lines
return [f"### {title}\n{empty_msg}\n"]
def _build_insights(
low_conf: list[dict],
errors: list[dict],
repeated: list[tuple[str, int]],
) -> list[str]:
"""Generate actionable insight bullets from analysis results."""
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
insights.append(
f'User frequently asks about "{repeated[0][0]}" — consider deepening knowledge here.'
)
return insights or ["Conversations look healthy. Keep up the good work."]
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
@@ -343,35 +412,12 @@ def self_reflect(limit: int = 30) -> str:
if not entries:
return "No conversation history to reflect on yet."
# Categorize entries
messages = [e for e in entries if e.get("type") == "message"]
errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
# 1. Low-confidence responses
low_conf = [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
# 2. Identify repeated user topics (simple word frequency)
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
repeated = sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:5]
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
# Build reflection report
sections: list[str] = ["## Self-Reflection Report\n"]
sections.append(
f"Reviewed {len(entries)} recent entries: "
f"{len(user_msgs)} user messages, "
@@ -379,32 +425,27 @@ def self_reflect(limit: int = 30) -> str:
f"{len(errors)} errors.\n"
)
# Low confidence
if low_conf:
sections.append(f"### Low-Confidence Responses ({len(low_conf)})")
for m in low_conf[:5]:
ts = (m.get("timestamp") or "?")[:19]
conf = m.get("confidence", 0)
text = (m.get("content") or "")[:120]
sections.append(f"- [{ts}] confidence={conf:.0%}: {text}")
sections.append("")
else:
sections.append(
"### Low-Confidence Responses\nNone found — all responses above threshold.\n"
sections.extend(
_format_reflection_section(
"Low-Confidence Responses",
low_conf,
lambda m: (
f"- [{(m.get('timestamp') or '?')[:19]}] "
f"confidence={m.get('confidence', 0):.0%}: "
f"{(m.get('content') or '')[:120]}"
),
"None found — all responses above threshold.",
)
)
sections.extend(
_format_reflection_section(
"Errors",
errors,
lambda e: f"- [{(e.get('timestamp') or '?')[:19]}] {(e.get('error') or '')[:120]}",
"No errors recorded.",
)
)
# Errors
if errors:
sections.append(f"### Errors ({len(errors)})")
for e in errors[:5]:
ts = (e.get("timestamp") or "?")[:19]
err = (e.get("error") or "")[:120]
sections.append(f"- [{ts}] {err}")
sections.append("")
else:
sections.append("### Errors\nNo errors recorded.\n")
# Repeated topics
if repeated:
sections.append("### Recurring Topics")
for word, count in repeated:
@@ -413,22 +454,8 @@ def self_reflect(limit: int = 30) -> str:
else:
sections.append("### Recurring Topics\nNo strong patterns detected.\n")
# Actionable summary
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
top_topic = repeated[0][0]
insights.append(
f'User frequently asks about "{top_topic}" — consider deepening knowledge here.'
)
if not insights:
insights.append("Conversations look healthy. Keep up the good work.")
sections.append("### Insights")
for insight in insights:
for insight in _build_insights(low_conf, errors, repeated):
sections.append(f"- {insight}")
return "\n".join(sections)

View File

@@ -232,6 +232,90 @@ class ThinkingEngine:
return False # Disabled — never idle
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
"""Assemble the context needed for a thinking cycle.
Returns:
(memory_context, system_context, recent_thoughts)
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
return memory_context, system_context, recent_thoughts
async def _generate_novel_thought(
self,
prompt: str | None,
memory_context: str,
system_context: str,
recent_thoughts: list["Thought"],
) -> tuple[str | None, str]:
"""Run the dedup-retry loop to produce a novel thought.
Returns:
(content, seed_type) — content is None if no novel thought produced.
"""
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None, seed_type
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
return content, seed_type # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None, seed_type
return None, seed_type
async def _process_thinking_result(self, thought: "Thought") -> None:
"""Run all post-hooks after a thought is stored."""
self._maybe_check_memory()
await self._maybe_distill()
await self._maybe_file_issues()
await self._check_workspace()
self._maybe_check_memory_status()
self._update_memory(thought)
self._log_event(thought)
self._write_journal(thought)
await self._broadcast(thought)
async def think_once(self, prompt: str | None = None) -> Thought | None:
"""Execute one thinking cycle.
@@ -257,91 +341,21 @@ class ThinkingEngine:
)
return None
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
content: str | None = None
seed_type: str = "freeform"
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
if prompt:
seed_type = "prompted"
seed_context = f"Journal prompt: {prompt}"
else:
seed_type, seed_context = self._gather_seed()
continuity = self._build_continuity_context()
full_prompt = _THINKING_PROMPT.format(
memory_context=memory_context,
system_context=system_context,
seed_context=seed_context,
continuity_context=continuity,
)
try:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
content = raw.strip()
# Dedup: reject thoughts too similar to recent ones
if not self._is_too_similar(content, recent_thoughts):
break # Good — novel thought
if attempt < self._MAX_DEDUP_RETRIES:
logger.info(
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
attempt + 1,
self._MAX_DEDUP_RETRIES + 1,
)
content = None # Will retry
else:
logger.warning(
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
prompt,
memory_context,
system_context,
recent_thoughts,
)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
# Post-hook: check memory status periodically
self._maybe_check_memory()
# Post-hook: distill facts from recent thoughts periodically
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
await self._maybe_file_issues()
# Post-hook: check workspace for new messages from Hermes
await self._check_workspace()
# Post-hook: proactive memory status audit
self._maybe_check_memory_status()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
# Log to swarm event system
self._log_event(thought)
# Append to daily journal file
self._write_journal(thought)
# Broadcast to WebSocket clients
await self._broadcast(thought)
await self._process_thinking_result(thought)
logger.info(
"Thought [%s] (%s): %s",
@@ -758,23 +772,10 @@ class ThinkingEngine:
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
# ── System snapshot helpers ────────────────────────────────────────────
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
parts: list[str] = []
# Current local time
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts.append(
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
)
# Thought count today (cheap DB query)
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
@@ -782,66 +783,94 @@ class ThinkingEngine:
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
parts.append(f"Thoughts today: {count}")
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
pass
return None
# Recent chat activity (in-memory, no I/O)
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
parts.append(f"Chat messages this session: {len(messages)}")
last = messages[-1]
parts.append(f'Last chat ({last.role}): "{last.content[:80]}"')
else:
parts.append("No chat messages this session")
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
pass
return []
# Task queue (lightweight DB query)
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
summary = get_task_summary_for_briefing()
running = summary.get("running", 0)
pending = summary.get("pending_approval", 0)
done = summary.get("completed", 0)
failed = summary.get("failed", 0)
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
parts.append(
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
pass
return None
# Workspace updates (file-based communication with Hermes)
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Count entries (assuming each entry starts with a timestamp or header)
line_count = len([line for line in new_corr.splitlines() if line.strip()])
parts.append(
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
pass
return []
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snap_chat_activity())
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""
@@ -1110,21 +1139,37 @@ class ThinkingEngine:
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72). The thinking engine doesn't need Gitea or
filesystem tools — it only needs the LLM.
background tasks (#72) and to prevent per-call resource leaks (httpx
clients, SQLite connections, model warmups) that caused the thinking
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text.
"""
from timmy.agent import create_timmy
import asyncio
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
agent = create_timmy(skip_mcp=True)
run = await agent.arun(prompt, stream=False)
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw

View File

@@ -26,7 +26,7 @@ def get_system_info() -> dict[str, Any]:
- python_version: Python version
- platform: OS platform
- model: Current Ollama model (queried from API)
- model_backend: Configured backend (ollama/airllm/grok)
- model_backend: Configured backend (ollama/grok/claude)
- ollama_url: Ollama host URL
- repo_root: Repository root path
- grok_enabled: Whether GROK is enabled
@@ -127,54 +127,48 @@ def check_ollama_health() -> dict[str, Any]:
return result
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
# Check tier 1: Hot memory
def _hot_memory_info(repo_root: Path) -> dict[str, Any]:
"""Tier 1: Hot memory (MEMORY.md) status."""
memory_md = repo_root / "MEMORY.md"
tier1_exists = memory_md.exists()
tier1_content = ""
if tier1_exists:
tier1_content = memory_md.read_text()[:500] # First 500 chars
tier1_content = memory_md.read_text()[:500]
# Check tier 2: Vault
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = []
if tier2_exists:
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
tier1_info: dict[str, Any] = {
info: dict[str, Any] = {
"exists": tier1_exists,
"path": str(memory_md),
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
}
if tier1_exists:
lines = memory_md.read_text().splitlines()
tier1_info["line_count"] = len(lines)
tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
info["line_count"] = len(lines)
info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
return info
def _vault_info(repo_root: Path) -> dict[str, Any]:
"""Tier 2: Vault (memory/ directory tree) status."""
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else []
# Vault — scan all subdirs under memory/
vault_root = repo_root / "memory"
vault_info: dict[str, Any] = {
info: dict[str, Any] = {
"exists": tier2_exists,
"path": str(vault_path),
"file_count": len(tier2_files),
"files": tier2_files[:10],
}
if vault_root.exists():
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
return info
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
def _semantic_memory_info(repo_root: Path) -> dict[str, Any]:
"""Tier 3: Semantic memory (vector DB) status."""
info: dict[str, Any] = {"available": False}
try:
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
@@ -184,14 +178,16 @@ def get_memory_status() -> dict[str, Any]:
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
info["available"] = True
info["vector_count"] = count[0] if count else 0
except Exception as exc:
logger.debug("Memory status query failed: %s", exc)
pass
return info
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
def _journal_info(repo_root: Path) -> dict[str, Any]:
"""Self-coding journal statistics."""
info: dict[str, Any] = {"available": False}
try:
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
@@ -203,7 +199,7 @@ def get_memory_status() -> dict[str, Any]:
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
@@ -212,13 +208,24 @@ def get_memory_status() -> dict[str, Any]:
}
except Exception as exc:
logger.debug("Journal stats query failed: %s", exc)
pass
return info
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
return {
"tier1_hot_memory": tier1_info,
"tier2_vault": vault_info,
"tier3_semantic": tier3_info,
"self_coding_journal": journal_info,
"tier1_hot_memory": _hot_memory_info(repo_root),
"tier2_vault": _vault_info(repo_root),
"tier3_semantic": _semantic_memory_info(repo_root),
"self_coding_journal": _journal_info(repo_root),
}

View File

@@ -18,7 +18,6 @@ except ImportError:
# agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers.
for _mod in [
"airllm",
"mcp",
"mcp.client",
"mcp.client.stdio",

View File

@@ -10,12 +10,10 @@ Categories:
M3xx iOS keyboard & zoom prevention
M4xx HTMX robustness (double-submit, sync)
M5xx Safe-area / notch support
M6xx AirLLM backend interface contract
"""
import re
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# ── helpers ───────────────────────────────────────────────────────────────────
@@ -206,147 +204,3 @@ def test_M505_dvh_units_used():
"""Dynamic viewport height (dvh) accounts for collapsing browser chrome."""
css = _css()
assert "dvh" in css
# ── M6xx — AirLLM backend interface contract ──────────────────────────────────
def test_M601_airllm_agent_has_run_method():
"""TimmyAirLLMAgent must expose run() so the dashboard route can call it."""
from timmy.backends import TimmyAirLLMAgent
assert hasattr(TimmyAirLLMAgent, "run"), (
"TimmyAirLLMAgent is missing run() — dashboard will fail with AirLLM backend"
)
def test_M602_airllm_run_returns_content_attribute():
"""run() must return an object with a .content attribute (Agno RunResponse compat)."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
result = agent.run("test")
assert hasattr(result, "content"), "run() result must have a .content attribute"
assert isinstance(result.content, str)
def test_M603_airllm_run_updates_history():
"""run() must update _history so multi-turn context is preserved."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Acknowledged."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
assert len(agent._history) == 0
agent.run("hello")
assert len(agent._history) == 2
assert any("hello" in h for h in agent._history)
def test_M604_airllm_print_response_delegates_to_run():
"""print_response must use run() so both interfaces share one inference path."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import RunResult, TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
with (
patch.object(agent, "run", return_value=RunResult(content="ok")) as mock_run,
patch.object(agent, "_render"),
):
agent.print_response("hello", stream=True)
mock_run.assert_called_once_with("hello", stream=True)
def test_M605_health_status_passes_model_to_template(client):
"""Health status partial must receive the configured model name, not a hardcoded string."""
from config import settings
with patch(
"dashboard.routes.health.check_ollama",
new_callable=AsyncMock,
return_value=True,
):
response = client.get("/health/status")
# Model name should come from settings, not be hardcoded
assert response.status_code == 200
model_short = settings.ollama_model.split(":")[0]
assert model_short in response.text
# ── M7xx — XSS prevention ─────────────────────────────────────────────────────
def _mobile_html() -> str:
"""Read the mobile template source."""
path = Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "mobile.html"
return path.read_text()
def _swarm_live_html() -> str:
"""Read the swarm live template source."""
path = (
Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "swarm_live.html"
)
return path.read_text()
def test_M701_mobile_chat_no_raw_message_interpolation():
"""mobile.html must not interpolate ${message} directly into innerHTML — XSS risk."""
html = _mobile_html()
# The vulnerable pattern is `${message}` inside a template literal assigned to innerHTML
# After the fix, message must only appear via textContent assignment
assert "textContent = message" in html or "textContent=message" in html, (
"mobile.html still uses innerHTML + ${message} interpolation — XSS vulnerability"
)
def test_M702_mobile_chat_user_input_not_in_innerhtml_template_literal():
"""${message} must not appear inside a backtick string that is assigned to innerHTML."""
html = _mobile_html()
# Find all innerHTML += `...` blocks and verify none contain ${message}
blocks = re.findall(r"innerHTML\s*\+=?\s*`([^`]*)`", html, re.DOTALL)
for block in blocks:
assert "${message}" not in block, (
"innerHTML template literal still contains ${message} — XSS vulnerability"
)
def test_M703_swarm_live_agent_name_not_interpolated_in_innerhtml():
"""swarm_live.html must not put ${agent.name} inside innerHTML template literals."""
html = _swarm_live_html()
blocks = re.findall(r"innerHTML\s*=\s*agents\.map\([^;]+\)\.join\([^)]*\)", html, re.DOTALL)
assert len(blocks) == 0, (
"swarm_live.html still uses innerHTML=agents.map(…) with interpolated agent data — XSS vulnerability"
)
def test_M704_swarm_live_uses_textcontent_for_agent_data():
"""swarm_live.html must use textContent (not innerHTML) to set agent name/description."""
html = _swarm_live_html()
assert "textContent" in html, (
"swarm_live.html does not use textContent — agent data may be raw-interpolated into DOM"
)

View File

@@ -5,9 +5,14 @@ from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_create_bug_report,
_dedup_cache,
_extract_traceback_info,
_get_git_context,
_is_duplicate,
_log_error_event,
_notify_bug_report,
_record_to_session,
_stack_hash,
capture_error,
)
@@ -193,3 +198,91 @@ class TestCaptureError:
def teardown_method(self):
_dedup_cache.clear()
class TestExtractTracebackInfo:
"""Test _extract_traceback_info helper."""
def test_returns_three_tuple(self):
try:
raise ValueError("extract test")
except ValueError as e:
tb_str, affected_file, affected_line = _extract_traceback_info(e)
assert "ValueError" in tb_str
assert "extract test" in tb_str
assert affected_file.endswith(".py")
assert affected_line > 0
def test_file_points_to_raise_site(self):
try:
_make_exception()
except ValueError as e:
_, affected_file, _ = _extract_traceback_info(e)
assert "test_error_capture" in affected_file
class TestLogErrorEvent:
"""Test _log_error_event helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("log test")
except RuntimeError as e:
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
class TestCreateBugReport:
"""Test _create_bug_report helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("report test")
except RuntimeError as e:
result = _create_bug_report(
e, "test", None, "abc123", "traceback...", "file.py", 42, {}
)
# May return None if swarm deps unavailable — that's fine
assert result is None or isinstance(result, str)
def test_with_context(self):
try:
raise RuntimeError("ctx test")
except RuntimeError as e:
result = _create_bug_report(e, "test", {"path": "/api"}, "abc", "tb", "f.py", 1, {})
assert result is None or isinstance(result, str)
class TestNotifyBugReport:
"""Test _notify_bug_report helper."""
def test_does_not_crash(self):
try:
raise RuntimeError("notify test")
except RuntimeError as e:
_notify_bug_report(e, "test")
class TestRecordToSession:
"""Test _record_to_session helper."""
def test_does_not_crash_without_recorder(self):
try:
raise RuntimeError("session test")
except RuntimeError as e:
_record_to_session(e, "test")
def test_calls_registered_recorder(self):
from infrastructure.error_capture import register_error_recorder
calls = []
register_error_recorder(lambda **kwargs: calls.append(kwargs))
try:
try:
raise RuntimeError("callback test")
except RuntimeError as e:
_record_to_session(e, "test_source")
assert len(calls) == 1
assert "RuntimeError" in calls[0]["error"]
assert calls[0]["context"] == "test_source"
finally:
register_error_recorder(None)

View File

@@ -0,0 +1,86 @@
"""Tests for scripts/cycle_retro.py issue auto-detection."""
from __future__ import annotations
# Import the module under test — it's a script so we import the helpers directly
import importlib
import subprocess
from pathlib import Path
from unittest.mock import patch
import pytest
SCRIPTS_DIR = Path(__file__).resolve().parent.parent.parent / "scripts"
@pytest.fixture(autouse=True)
def _add_scripts_to_path(monkeypatch):
monkeypatch.syspath_prepend(str(SCRIPTS_DIR))
@pytest.fixture()
def mod():
"""Import cycle_retro as a module."""
return importlib.import_module("cycle_retro")
class TestDetectIssueFromBranch:
def test_kimi_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="kimi/issue-492\n"):
assert mod.detect_issue_from_branch() == 492
def test_plain_issue_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="issue-123\n"):
assert mod.detect_issue_from_branch() == 123
def test_issue_slash_number(self, mod):
with patch.object(subprocess, "check_output", return_value="fix/issue/55\n"):
assert mod.detect_issue_from_branch() == 55
def test_no_issue_in_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="main\n"):
assert mod.detect_issue_from_branch() is None
def test_feature_branch(self, mod):
with patch.object(subprocess, "check_output", return_value="feature/add-widget\n"):
assert mod.detect_issue_from_branch() is None
def test_git_not_available(self, mod):
with patch.object(subprocess, "check_output", side_effect=FileNotFoundError):
assert mod.detect_issue_from_branch() is None
def test_git_fails(self, mod):
with patch.object(
subprocess,
"check_output",
side_effect=subprocess.CalledProcessError(1, "git"),
):
assert mod.detect_issue_from_branch() is None
class TestBackfillExtractIssueNumber:
"""Tests for backfill_retro.extract_issue_number PR-number filtering."""
@pytest.fixture()
def backfill(self):
return importlib.import_module("backfill_retro")
def test_body_has_issue(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "Fixes #490", pr_number=491) == 490
def test_title_skips_pr_number(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "", pr_number=491) is None
def test_title_with_issue_and_pr(self, backfill):
# [loop-cycle-538] refactor: ... (#459) (#481)
assert (
backfill.extract_issue_number(
"[loop-cycle-538] refactor: remove dead airllm (#459) (#481)",
"",
pr_number=481,
)
== 459
)
def test_no_pr_number_provided(self, backfill):
assert backfill.extract_issue_number("fix: foo (#491)", "") == 491

View File

@@ -81,7 +81,6 @@ def test_create_timmy_respects_custom_ollama_url():
mock_settings.ollama_url = custom_url
mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
from timmy.agent import create_timmy
@@ -91,33 +90,6 @@ def test_create_timmy_respects_custom_ollama_url():
assert kwargs["host"] == custom_url
# ── AirLLM path ──────────────────────────────────────────────────────────────
def test_create_timmy_airllm_returns_airllm_agent():
"""backend='airllm' must return a TimmyAirLLMAgent, not an Agno Agent."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.agent import create_timmy
from timmy.backends import TimmyAirLLMAgent
result = create_timmy(backend="airllm", model_size="8b")
assert isinstance(result, TimmyAirLLMAgent)
def test_create_timmy_airllm_does_not_call_agno_agent():
"""When using the airllm backend, Agno Agent should never be instantiated."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.backends.is_apple_silicon", return_value=False),
):
from timmy.agent import create_timmy
create_timmy(backend="airllm", model_size="8b")
MockAgent.assert_not_called()
def test_create_timmy_explicit_ollama_ignores_autodetect():
"""backend='ollama' must always use Ollama, even on Apple Silicon."""
with (
@@ -141,7 +113,6 @@ def test_create_timmy_explicit_ollama_ignores_autodetect():
def test_resolve_backend_explicit_takes_priority():
from timmy.agent import _resolve_backend
assert _resolve_backend("airllm") == "airllm"
assert _resolve_backend("ollama") == "ollama"
@@ -152,39 +123,6 @@ def test_resolve_backend_defaults_to_ollama_without_config():
assert _resolve_backend(None) == "ollama"
def test_resolve_backend_auto_uses_airllm_on_apple_silicon():
"""'auto' on Apple Silicon with airllm stubbed → 'airllm'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=True),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "airllm"
def test_resolve_backend_auto_falls_back_on_non_apple():
"""'auto' on non-Apple Silicon → 'ollama'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "ollama"
# ── _model_supports_tools ────────────────────────────────────────────────────
def test_model_supports_tools_llama32_returns_false():
"""llama3.2 (3B) is too small for reliable tool calling."""
from timmy.agent import _model_supports_tools
@@ -259,7 +197,6 @@ def test_create_timmy_includes_tools_for_large_model():
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
mock_settings.telemetry_enabled = False
from timmy.agent import create_timmy
@@ -444,6 +381,150 @@ def test_get_effective_ollama_model_walks_fallback_chain():
assert result == "fb-2"
# ── _build_tools_list ─────────────────────────────────────────────────────
def test_build_tools_list_empty_when_tools_disabled():
"""Small models get an empty tools list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
assert result == []
def test_build_tools_list_includes_toolkit_when_enabled():
"""Tool-capable models get the full toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
assert mock_toolkit in result
def test_build_tools_list_skips_mcp_when_flagged():
"""skip_mcp=True must not call MCP factories."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
mock_gitea.assert_not_called()
mock_fs.assert_not_called()
def test_build_tools_list_includes_mcp_when_not_skipped():
"""skip_mcp=False should attempt MCP tool creation."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
mock_gitea.assert_called_once()
mock_fs.assert_called_once()
# ── _build_prompt ─────────────────────────────────────────────────────────
def test_build_prompt_includes_base_prompt():
"""Prompt should always contain the base system prompt."""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""Memory context should be appended when available."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "User prefers dark mode."
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "GROUNDED CONTEXT" in result
assert "dark mode" in result
def test_build_prompt_truncates_long_memory():
"""Long memory context should be truncated."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "x" * 10000
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "[truncated]" in result
def test_build_prompt_survives_memory_failure():
"""Prompt should fall back to base when memory fails."""
mock_memory = MagicMock()
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
# Memory context should NOT be appended (the db locked error was caught)
assert "db locked" not in result
# ── _create_ollama_agent ──────────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent must pass the expected kwargs to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.1",
tools_list=[MagicMock()],
full_prompt="test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "test prompt"
assert kwargs["markdown"] is False
def test_create_ollama_agent_none_tools_when_empty():
"""Empty tools_list should pass tools=None to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.2",
tools_list=[],
full_prompt="test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None
def test_no_hardcoded_fallback_constants_in_agent():
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
import timmy.agent as agent_mod

View File

@@ -1,10 +1,7 @@
"""Tests for src/timmy/backends.py — AirLLM wrapper and helpers."""
"""Tests for src/timmy/backends.py — backend helpers and classes."""
import sys
from unittest.mock import MagicMock, patch
import pytest
# ── is_apple_silicon ──────────────────────────────────────────────────────────
@@ -38,183 +35,6 @@ def test_is_apple_silicon_false_on_intel_mac():
assert is_apple_silicon() is False
# ── airllm_available ─────────────────────────────────────────────────────────
def test_airllm_available_true_when_stub_in_sys_modules():
# conftest already stubs 'airllm' — importable → True.
from timmy.backends import airllm_available
assert airllm_available() is True
def test_airllm_available_false_when_not_importable():
# Temporarily remove the stub to simulate airllm not installed.
saved = sys.modules.pop("airllm", None)
try:
from timmy.backends import airllm_available
assert airllm_available() is False
finally:
if saved is not None:
sys.modules["airllm"] = saved
# ── TimmyAirLLMAgent construction ────────────────────────────────────────────
def test_airllm_agent_raises_on_unknown_size():
from timmy.backends import TimmyAirLLMAgent
with pytest.raises(ValueError, match="Unknown model size"):
TimmyAirLLMAgent(model_size="3b")
def test_airllm_agent_uses_automodel_on_non_apple():
"""Non-Apple-Silicon path uses AutoModel.from_pretrained."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
# sys.modules["airllm"] is a MagicMock; AutoModel.from_pretrained was called.
assert sys.modules["airllm"].AutoModel.from_pretrained.called
def test_airllm_agent_uses_mlx_on_apple_silicon():
"""Apple Silicon path uses AirLLMMLX, not AutoModel."""
with patch("timmy.backends.is_apple_silicon", return_value=True):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
assert sys.modules["airllm"].AirLLMMLX.called
def test_airllm_agent_resolves_correct_model_id_for_70b():
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import _AIRLLM_MODELS, TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="70b")
sys.modules["airllm"].AutoModel.from_pretrained.assert_called_with(_AIRLLM_MODELS["70b"])
# ── TimmyAirLLMAgent.print_response ──────────────────────────────────────────
def _make_agent(model_size: str = "8b") -> "TimmyAirLLMAgent": # noqa: F821
"""Helper: create an agent with a fully mocked underlying model."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size=model_size)
# Replace the underlying model with a clean mock that returns predictable output.
mock_model = MagicMock()
mock_tokenizer = MagicMock()
# tokenizer() returns a dict-like object with an "input_ids" tensor mock.
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 10] # shape[1] = prompt token count = 10
token_dict = {"input_ids": input_ids_mock}
mock_tokenizer.return_value = token_dict
# generate() returns a list of token sequences.
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(15))] # 15 tokens total
agent._model = mock_model
return agent
def test_print_response_calls_generate():
agent = _make_agent()
agent.print_response("What is sovereignty?", stream=True)
agent._model.generate.assert_called_once()
def test_print_response_decodes_only_generated_tokens():
agent = _make_agent()
agent.print_response("Hello", stream=False)
# decode should be called with tokens starting at index 10 (prompt length).
decode_call = agent._model.tokenizer.decode.call_args
token_slice = decode_call[0][0]
assert list(token_slice) == list(range(10, 15))
def test_print_response_updates_history():
agent = _make_agent()
agent.print_response("First message")
assert any("First message" in turn for turn in agent._history)
assert any("Timmy:" in turn for turn in agent._history)
def test_print_response_history_included_in_second_prompt():
agent = _make_agent()
agent.print_response("First")
# Build the prompt for the second call — history should appear.
prompt = agent._build_prompt("Second")
assert "First" in prompt
assert "Second" in prompt
def test_print_response_stream_flag_accepted():
"""stream=False should not raise — it's accepted for API compatibility."""
agent = _make_agent()
agent.print_response("hello", stream=False) # no error
# ── Prompt formatting tests ────────────────────────────────────────────────
def test_airllm_prompt_contains_formatted_model_name():
"""AirLLM prompt should have actual model name, not literal {model_name}."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "llama3.2:3b"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the actual model name, not the placeholder
assert "{model_name}" not in prompt
assert "llama3.2:3b" in prompt
def test_airllm_prompt_gets_lite_tier():
"""AirLLM should get LITE tier prompt (tools_enabled=False)."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# LITE tier should NOT have TOOL USAGE section
assert "TOOL USAGE" not in prompt
# LITE tier should have the basic rules
assert "Be brief by default" in prompt
def test_airllm_prompt_contains_session_id():
"""AirLLM prompt should have session_id formatted, not placeholder."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the session_id, not the placeholder
assert '{session_id}"' not in prompt
assert 'session "airllm"' in prompt
# ── ClaudeBackend ─────────────────────────────────────────────────────────

View File

@@ -107,19 +107,7 @@ def test_chat_new_session_uses_unique_id():
def test_chat_passes_backend_option():
"""chat --backend airllm must forward the backend to create_timmy."""
mock_run_output = MagicMock()
mock_run_output.content = "OK"
mock_run_output.status = "COMPLETED"
mock_run_output.active_requirements = []
mock_timmy = MagicMock()
mock_timmy.run.return_value = mock_run_output
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["chat", "test", "--backend", "airllm"])
mock_create.assert_called_once_with(backend="airllm", model_size=None, session_id="cli")
pass
def test_chat_cleans_response():