Merge pull request 'feat: replace custom Gitea with MCP servers' (#14) from claude/sharp-mcnulty into main

Reviewed-on: http://localhost:3000/rockachopa/Timmy-time-dashboard/pulls/14
This commit is contained in:
rockachopa
2026-03-12 21:45:55 -04:00
24 changed files with 664 additions and 1170 deletions

View File

@@ -44,6 +44,7 @@ airllm = { version = ">=2.9.0", optional = true }
pyttsx3 = { version = ">=2.90", optional = true }
sentence-transformers = { version = ">=2.0.0", optional = true }
numpy = { version = ">=1.24.0", optional = true }
mcp = { version = ">=1.0.0", optional = true }
requests = { version = ">=2.31.0", optional = true }
GitPython = { version = ">=3.1.40", optional = true }
pytest = { version = ">=8.0.0", optional = true }
@@ -62,6 +63,7 @@ voice = ["pyttsx3"]
celery = ["celery"]
embeddings = ["sentence-transformers", "numpy"]
git = ["GitPython"]
mcp = ["mcp"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
[tool.poetry.group.dev.dependencies]

View File

@@ -7,6 +7,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class."""
# Display name for the primary agent — override with AGENT_NAME env var
agent_name: str = "Agent"
@@ -219,12 +221,17 @@ class Settings(BaseSettings):
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# Timmy can file issues when he notices bugs or improvement opportunities.
# These values are passed as env vars to the gitea-mcp server process.
gitea_url: str = "http://localhost:3000"
gitea_token: str = "" # GITEA_TOKEN env var; falls back to ~/.config/gitea/token
gitea_repo: str = "rockachopa/Timmy-time-dashboard" # owner/repo
gitea_enabled: bool = True
gitea_timeout: int = 30
# ── MCP Servers ────────────────────────────────────────────────────
# External tool servers connected via Model Context Protocol (stdio).
mcp_gitea_command: str = "gitea-mcp -t stdio"
mcp_filesystem_command: str = "npx -y @modelcontextprotocol/server-filesystem"
mcp_timeout: int = 15
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.
@@ -314,6 +321,18 @@ class Settings(BaseSettings):
path = os.path.dirname(path)
return os.getcwd()
def model_post_init(self, __context) -> None:
"""Post-init: resolve gitea_token from file if not set via env."""
if not self.gitea_token:
token_path = os.path.expanduser("~/.config/gitea/token")
try:
if os.path.isfile(token_path):
token = open(token_path).read().strip() # noqa: SIM115
if token:
self.gitea_token = token
except OSError:
pass
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",

View File

@@ -391,6 +391,14 @@ async def lifespan(app: FastAPI):
await discord_bot.stop()
await telegram_bot.stop()
# Close MCP tool server sessions
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
for task in [briefing_task, thinking_task, chat_task, loop_qa_task]:
if task:
task.cancel()

View File

@@ -1,4 +1,3 @@
import asyncio
import json
import logging
from datetime import datetime
@@ -90,7 +89,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = None
try:
run_output = await asyncio.to_thread(chat_with_tools, message)
run_output = await chat_with_tools(message)
except Exception as exc:
logger.error("Chat error: %s", exc)
error_text = f"Chat error: {exc}"
@@ -181,7 +180,7 @@ async def approve_tool(request: Request, approval_id: str):
req.confirm()
try:
result_run = await asyncio.to_thread(continue_chat, pending["run_output"])
result_run = await continue_chat(pending["run_output"])
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
@@ -220,7 +219,7 @@ async def reject_tool(request: Request, approval_id: str):
req.reject(note="User rejected from dashboard")
# Resume so the agent knows the tool was rejected
try:
await asyncio.to_thread(continue_chat, pending["run_output"])
await continue_chat(pending["run_output"])
except Exception:
pass

View File

@@ -88,7 +88,7 @@ async def api_chat(request: Request):
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
response_text = agent_chat(
response_text = await agent_chat(
context_prefix + last_user_msg,
session_id="mobile",
)

View File

@@ -1,295 +0,0 @@
"""Gitea Hand — issue tracking and self-improvement channel for Timmy.
Provides Gitea API capabilities with:
- Token auth (env var, config, or ~/.config/gitea/token fallback)
- Structured result parsing
- Dedup checks before creating issues
- Graceful degradation: log warning, return fallback, never crash
Follows project conventions:
- Config via ``from config import settings``
- Singleton pattern for module-level import
- Async httpx client (like Paperclip client pattern)
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
@dataclass
class GiteaResult:
"""Result from a Gitea API operation."""
operation: str
success: bool
data: dict = field(default_factory=dict)
error: str = ""
latency_ms: float = 0.0
def _resolve_token() -> str:
"""Resolve Gitea API token from settings or filesystem fallback."""
if settings.gitea_token:
return settings.gitea_token
try:
return _TOKEN_FILE.read_text().strip()
except (FileNotFoundError, PermissionError):
return ""
def _title_similar(a: str, b: str, threshold: float = 0.6) -> bool:
"""Check if two issue titles are similar enough to be duplicates."""
return SequenceMatcher(None, a.lower(), b.lower()).ratio() > threshold
class GiteaHand:
"""Gitea API hand for Timmy.
All methods degrade gracefully — if Gitea is unreachable or the
API call fails, the hand returns a ``GiteaResult(success=False)``
rather than raising.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
repo: str | None = None,
timeout: int | None = None,
) -> None:
self._base_url = (base_url or settings.gitea_url).rstrip("/")
self._token = token or _resolve_token()
self._repo = repo or settings.gitea_repo
self._timeout = timeout or settings.gitea_timeout
if not self._token:
logger.warning(
"Gitea token not configured — set GITEA_TOKEN or place token in %s",
_TOKEN_FILE,
)
else:
logger.info(
"GiteaHand initialised — %s/%s",
self._base_url,
self._repo,
)
@property
def available(self) -> bool:
"""Check if Gitea integration is configured and enabled."""
return bool(settings.gitea_enabled and self._token and self._repo)
def _get_client(self):
"""Create a fresh async HTTP client for the current event loop.
Always creates a new client rather than caching, because tool
functions call us via ``asyncio.run()`` which creates a new loop
each time — a cached client from a previous loop would raise
"Event loop is closed".
"""
import httpx
return httpx.AsyncClient(
base_url=self._base_url,
headers={
"Authorization": f"token {self._token}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=self._timeout,
)
async def _request(self, method: str, path: str, **kwargs) -> GiteaResult:
"""Make an API request with full error handling."""
start = time.time()
operation = f"{method.upper()} {path}"
if not self.available:
return GiteaResult(
operation=operation,
success=False,
error="Gitea not configured (missing token or repo)",
)
client = self._get_client()
try:
resp = await client.request(method, path, **kwargs)
latency = (time.time() - start) * 1000
if resp.status_code >= 400:
error_body = resp.text[:500]
logger.warning(
"Gitea API %s returned %d: %s",
operation,
resp.status_code,
error_body,
)
return GiteaResult(
operation=operation,
success=False,
error=f"HTTP {resp.status_code}: {error_body}",
latency_ms=latency,
)
return GiteaResult(
operation=operation,
success=True,
data=resp.json() if resp.text else {},
latency_ms=latency,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Gitea API %s failed: %s", operation, exc)
return GiteaResult(
operation=operation,
success=False,
error=str(exc),
latency_ms=latency,
)
finally:
await client.aclose()
# ── Issue operations ─────────────────────────────────────────────────
async def create_issue(
self,
title: str,
body: str = "",
labels: list[str] | None = None,
) -> GiteaResult:
"""Create an issue in the configured repository.
Args:
title: Issue title (required).
body: Issue body in markdown.
labels: Optional list of label names (must exist in repo).
Returns:
GiteaResult with issue data (number, html_url, etc.).
"""
owner, repo = self._repo.split("/", 1)
payload: dict = {"title": title, "body": body}
# Resolve label names to IDs if provided
if labels:
label_ids = await self._resolve_label_ids(owner, repo, labels)
if label_ids:
payload["labels"] = label_ids
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues",
json=payload,
)
async def list_issues(
self,
state: str = "open",
labels: list[str] | None = None,
limit: int = 50,
) -> GiteaResult:
"""List issues in the configured repository.
Args:
state: Filter by state ("open", "closed", "all").
labels: Filter by label names.
limit: Max issues to return.
Returns:
GiteaResult with list of issue dicts.
"""
owner, repo = self._repo.split("/", 1)
params: dict = {"state": state, "limit": limit, "type": "issues"}
if labels:
params["labels"] = ",".join(labels)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues",
params=params,
)
async def get_issue(self, number: int) -> GiteaResult:
"""Get a single issue by number."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
)
async def add_comment(self, number: int, body: str) -> GiteaResult:
"""Add a comment to an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues/{number}/comments",
json={"body": body},
)
async def close_issue(self, number: int) -> GiteaResult:
"""Close an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"PATCH",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
json={"state": "closed"},
)
# ── Dedup helper ─────────────────────────────────────────────────────
async def find_duplicate(self, title: str, threshold: float = 0.6) -> dict | None:
"""Check if an open issue with a similar title already exists.
Returns the matching issue dict, or None if no duplicate found.
"""
result = await self.list_issues(state="open", limit=100)
if not result.success or not isinstance(result.data, list):
return None
for issue in result.data:
existing_title = issue.get("title", "")
if _title_similar(title, existing_title, threshold):
return issue
return None
# ── Label helper ─────────────────────────────────────────────────────
async def _resolve_label_ids(self, owner: str, repo: str, label_names: list[str]) -> list[int]:
"""Resolve label names to IDs. Returns empty list on failure."""
result = await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/labels",
)
if not result.success or not isinstance(result.data, list):
return []
name_to_id = {label["name"].lower(): label["id"] for label in result.data}
return [name_to_id[name.lower()] for name in label_names if name.lower() in name_to_id]
# ── Status ───────────────────────────────────────────────────────────
def info(self) -> dict:
"""Return a status summary for the dashboard."""
return {
"base_url": self._base_url,
"repo": self._repo,
"available": self.available,
"has_token": bool(self._token),
}
# ── Module-level singleton ──────────────────────────────────────────────────
gitea_hand = GiteaHand()

View File

@@ -82,9 +82,7 @@ if _DISCORD_UI_AVAILABLE:
req = action.get("requirement")
if req:
req.reject(note="Timed out — auto-rejected")
await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
await continue_chat(action["run_output"], action.get("session_id"))
await action["target"].send(
f"Action `{action['tool_name']}` timed out and was auto-rejected."
@@ -427,9 +425,7 @@ class DiscordVendor(ChatPlatform):
req.confirm()
try:
result_run = await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
result_run = await continue_chat(action["run_output"], action.get("session_id"))
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
@@ -461,7 +457,7 @@ class DiscordVendor(ChatPlatform):
req = action["requirement"]
req.reject(note="User rejected from Discord")
try:
await asyncio.to_thread(continue_chat, action["run_output"], action.get("session_id"))
await continue_chat(action["run_output"], action.get("session_id"))
except Exception:
pass
@@ -539,7 +535,7 @@ class DiscordVendor(ChatPlatform):
try:
async with target.typing():
run_output = await asyncio.wait_for(
asyncio.to_thread(chat_with_tools, content, session_id),
chat_with_tools(content, session_id),
timeout=300,
)
except TimeoutError:

View File

@@ -245,10 +245,30 @@ def create_timmy(
use_tools = _model_supports_tools(model_name)
# Conditionally include tools — small models get none
tools = create_full_toolkit() if use_tools else None
toolkit = create_full_toolkit() if use_tools else None
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun())
if use_tools:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
# Select prompt tier based on tool capability
base_prompt = get_system_prompt(tools_enabled=use_tools)
@@ -278,7 +298,7 @@ def create_timmy(
add_history_to_context=True,
num_history_runs=20,
markdown=True,
tools=[tools] if tools else None,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)

View File

@@ -172,6 +172,8 @@ def interview(
Asks Timmy a series of questions about his identity, capabilities,
values, and operation to verify he is working correctly.
"""
import asyncio
from timmy.interview import InterviewEntry, format_transcript, run_interview
from timmy.session import chat
@@ -179,7 +181,9 @@ def interview(
# Force agent creation by calling chat once with a warm-up prompt
try:
chat("Hello, Timmy. We're about to start your interview.", session_id="interview")
asyncio.run(
chat("Hello, Timmy. We're about to start your interview.", session_id="interview")
)
except Exception as exc:
typer.echo(f"Warning: Initialization issue — {exc}", err=True)
@@ -191,7 +195,7 @@ def interview(
typer.echo("Starting interview...\n")
transcript = run_interview(
chat_fn=lambda msg: chat(msg, session_id="interview"),
chat_fn=lambda msg: asyncio.run(chat(msg, session_id="interview")),
on_answer=_on_answer,
)

222
src/timmy/mcp_tools.py Normal file
View File

@@ -0,0 +1,222 @@
"""MCP tool server factories for Agno agent integration.
Provides factory functions that create ``MCPTools`` instances for external
tool servers (Gitea, Filesystem) using stdio transport. Also provides a
standalone async helper for filing Gitea issues from the thinking engine
without going through the full LLM loop.
Usage::
from timmy.mcp_tools import create_gitea_mcp_tools, create_filesystem_mcp_tools
# In agent creation (added to tools list):
gitea_tools = create_gitea_mcp_tools()
fs_tools = create_filesystem_mcp_tools()
# Direct issue filing (thinking engine):
from timmy.mcp_tools import create_gitea_issue_via_mcp
result = await create_gitea_issue_via_mcp("Bug title", "Body", "bug")
"""
from __future__ import annotations
import logging
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# Module-level cache for the standalone issue-filing session
_issue_session = None
def create_gitea_mcp_tools():
"""Create an MCPTools instance for the Gitea MCP server.
Returns None if Gitea is disabled or not configured (no token).
The returned MCPTools is lazy — Agno connects it on first ``arun()``.
"""
if not settings.gitea_enabled or not settings.gitea_token:
logger.debug("Gitea MCP: disabled or no token configured")
return None
try:
from agno.tools.mcp import MCPTools
# Build command — gitea-mcp expects "-t stdio" for stdio transport
command = settings.mcp_gitea_command
tools = MCPTools(
command=command,
env={
"GITEA_ACCESS_TOKEN": settings.gitea_token,
"GITEA_HOST": settings.gitea_url,
},
include_tools=[
"create_issue",
"list_repo_issues",
"create_issue_comment",
"edit_issue",
],
timeout=settings.mcp_timeout,
)
logger.info("Gitea MCP tools created (lazy connect)")
return tools
except Exception as exc:
logger.warning("Failed to create Gitea MCP tools: %s", exc)
return None
def create_filesystem_mcp_tools():
"""Create an MCPTools instance for the filesystem MCP server.
Returns None if the command is not configured.
Scoped to the project repo_root directory.
"""
try:
from agno.tools.mcp import MCPTools
command = f"{settings.mcp_filesystem_command} {settings.repo_root}"
tools = MCPTools(
command=command,
include_tools=[
"read_file",
"write_file",
"list_directory",
"search_files",
"get_file_info",
"directory_tree",
],
timeout=settings.mcp_timeout,
)
logger.info("Filesystem MCP tools created (lazy connect)")
return tools
except Exception as exc:
logger.warning("Failed to create filesystem MCP tools: %s", exc)
return None
def _bridge_to_work_order(title: str, body: str, category: str) -> None:
"""Create a local work order so the dashboard tracks the issue."""
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
conn.close()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)
async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "") -> str:
"""File a Gitea issue via the MCP server (standalone, no LLM loop).
Used by the thinking engine's ``_maybe_file_issues()`` post-hook.
Manages its own MCPTools session with lazy connect + graceful failure.
Args:
title: Issue title.
body: Issue body (markdown).
labels: Comma-separated label names.
Returns:
Confirmation string or error explanation.
"""
if not settings.gitea_enabled or not settings.gitea_token:
return "Gitea integration is not configured."
try:
from agno.tools.mcp import MCPTools
global _issue_session
if _issue_session is None:
_issue_session = MCPTools(
command=settings.mcp_gitea_command,
env={
"GITEA_ACCESS_TOKEN": settings.gitea_token,
"GITEA_HOST": settings.gitea_url,
},
timeout=settings.mcp_timeout,
)
# Ensure connected
if not getattr(_issue_session, "_connected", False):
await _issue_session.connect()
_issue_session._connected = True
# Append auto-filing signature
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n*Auto-filed by Timmy's thinking engine*"
# Parse owner/repo from settings
owner, repo = settings.gitea_repo.split("/", 1)
# Build tool arguments
args = {
"owner": owner,
"repo": repo,
"title": title,
"body": full_body,
}
# Call the MCP tool directly via the session
result = await _issue_session.call_tool("create_issue", arguments=args)
# Bridge to local work order
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
category = "bug" if "bug" in label_list else "suggestion"
_bridge_to_work_order(title, body, category)
logger.info("Created Gitea issue via MCP: %s", title[:60])
return f"Created issue: {title}\n{result}"
except Exception as exc:
logger.warning("MCP issue creation failed: %s", exc)
return f"Failed to create issue via MCP: {exc}"
async def close_mcp_sessions() -> None:
"""Close any open MCP sessions. Called during app shutdown."""
global _issue_session
if _issue_session is not None:
try:
await _issue_session.disconnect()
except Exception as exc:
logger.debug("MCP session disconnect error: %s", exc)
_issue_session = None

View File

@@ -59,11 +59,12 @@ def _get_agent():
return _agent
def chat(message: str, session_id: str | None = None) -> str:
async def chat(message: str, session_id: str | None = None) -> str:
"""Send a message to Timmy and get a response.
Uses a persistent agent and session_id so Agno's SQLite history
provides multi-turn conversation context.
provides multi-turn conversation context. Uses ``arun()`` so MCP
tool servers are auto-connected.
Args:
message: The user's message.
@@ -80,10 +81,10 @@ def chat(message: str, session_id: str | None = None) -> str:
# Run with session_id so Agno retrieves history from SQLite
try:
run = agent.run(message, stream=False, session_id=sid)
run = await agent.arun(message, stream=False, session_id=sid)
response_text = run.content if hasattr(run, "content") else str(run)
except Exception as exc:
logger.error("Session: agent.run() failed: %s", exc)
logger.error("Session: agent.arun() failed: %s", exc)
return "I'm having trouble reaching my language model right now. Please try again shortly."
# Post-processing: clean up any leaked tool calls or chain-of-thought
@@ -92,13 +93,15 @@ def chat(message: str, session_id: str | None = None) -> str:
return response_text
def chat_with_tools(message: str, session_id: str | None = None):
async def chat_with_tools(message: str, session_id: str | None = None):
"""Send a message and return the full Agno RunOutput.
Callers should check ``run_output.status``:
- ``RunStatus.paused`` — tools need confirmation (see ``run_output.requirements``)
- ``RunStatus.completed`` — response ready in ``run_output.content``
Uses ``arun()`` so MCP tool servers are auto-connected.
Returns:
An Agno ``RunOutput`` object (or a lightweight surrogate on error).
"""
@@ -107,16 +110,16 @@ def chat_with_tools(message: str, session_id: str | None = None):
_extract_facts(message)
try:
return agent.run(message, stream=False, session_id=sid)
return await agent.arun(message, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.run() failed: %s", exc)
logger.error("Session: agent.arun() failed: %s", exc)
# Return a duck-typed object that callers can handle uniformly
return _ErrorRunOutput(
"I'm having trouble reaching my language model right now. Please try again shortly."
)
def continue_chat(run_output, session_id: str | None = None):
async def continue_chat(run_output, session_id: str | None = None):
"""Resume a paused run after tool confirmation / rejection.
Args:
@@ -129,9 +132,9 @@ def continue_chat(run_output, session_id: str | None = None):
agent = _get_agent()
try:
return agent.continue_run(run_response=run_output, stream=False, session_id=sid)
return await agent.acontinue_run(run_response=run_output, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.continue_run() failed: %s", exc)
logger.error("Session: agent.acontinue_run() failed: %s", exc)
return _ErrorRunOutput(f"Error continuing run: {exc}")
@@ -149,7 +152,7 @@ class _ErrorRunOutput:
return []
def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
async def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
"""Send a message and return both cleaned and raw responses.
Backward-compatible wrapper around :func:`chat_with_tools`.
@@ -158,7 +161,7 @@ def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
(cleaned_response, raw_response) — cleaned has tool-call JSON and
chain-of-thought stripped; raw is the model's original output.
"""
run = chat_with_tools(message, session_id)
run = await chat_with_tools(message, session_id)
raw_response = run.content if hasattr(run, "content") and run.content else ""
cleaned = _clean_response(raw_response)
return cleaned, raw_response

View File

@@ -213,7 +213,7 @@ class ThinkingEngine:
)
try:
content = self._call_agent(prompt)
content = await self._call_agent(prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
@@ -226,10 +226,10 @@ class ThinkingEngine:
self._last_thought_id = thought.id
# Post-hook: distill facts from recent thoughts periodically
self._maybe_distill()
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
self._maybe_file_issues()
await self._maybe_file_issues()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
@@ -324,7 +324,7 @@ class ThinkingEngine:
# ── Private helpers ──────────────────────────────────────────────────
def _maybe_distill(self) -> None:
async def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts.
Reads the last N thoughts, asks the LLM to extract any durable facts
@@ -355,7 +355,7 @@ class ThinkingEngine:
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
raw = self._call_agent(distill_prompt)
raw = await self._call_agent(distill_prompt)
if not raw or not raw.strip():
return
@@ -381,13 +381,12 @@ class ThinkingEngine:
except Exception as exc:
logger.debug("Thought distillation skipped: %s", exc)
def _maybe_file_issues(self) -> None:
async def _maybe_file_issues(self) -> None:
"""Every N thoughts, classify recent thoughts and file Gitea issues.
Asks the LLM to review recent thoughts for actionable items —
bugs, broken features, stale state, or improvement opportunities.
Creates Gitea issues for anything worth tracking, with dedup to
avoid flooding.
Creates Gitea issues via MCP for anything worth tracking.
Only runs when:
- Gitea is enabled and configured
@@ -404,9 +403,7 @@ class ThinkingEngine:
return
# Check Gitea availability before spending LLM tokens
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
if not settings.gitea_enabled or not settings.gitea_token:
return
recent = self.get_recent_thoughts(limit=interval)
@@ -437,7 +434,7 @@ class ThinkingEngine:
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
raw = self._call_agent(classify_prompt)
raw = await self._call_agent(classify_prompt)
if not raw or not raw.strip():
return
@@ -452,7 +449,7 @@ class ThinkingEngine:
if not isinstance(items, list) or not items:
return
from timmy.tools_gitea import create_gitea_issue
from timmy.mcp_tools import create_gitea_issue_via_mcp
for item in items[:2]: # Safety cap
if not isinstance(item, dict):
@@ -464,7 +461,7 @@ class ThinkingEngine:
continue
label = category if category in ("bug", "feature") else ""
result = create_gitea_issue(title=title, body=body, labels=label)
result = await create_gitea_issue_via_mcp(title=title, body=body, labels=label)
logger.info("Thought→Issue: %s%s", title[:60], result[:80])
except Exception as exc:
@@ -711,7 +708,7 @@ class ThinkingEngine:
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
def _call_agent(self, prompt: str) -> str:
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Uses a separate session_id to avoid polluting user chat history.
@@ -719,13 +716,13 @@ class ThinkingEngine:
try:
from timmy.session import chat
return chat(prompt, session_id="thinking")
return await chat(prompt, session_id="thinking")
except Exception:
# Fallback: create a fresh agent
from timmy.agent import create_timmy
agent = create_timmy()
run = agent.run(prompt, stream=False)
run = await agent.arun(prompt, stream=False)
return run.content if hasattr(run, "content") else str(run)
def _store_thought(self, content: str, seed_type: str) -> Thought:

View File

@@ -43,8 +43,16 @@ SAFE_TOOLS = frozenset(
"check_ollama_health",
"get_memory_status",
"list_swarm_agents",
"create_gitea_issue",
"list_gitea_issues",
# MCP Gitea tools
"create_issue",
"list_repo_issues",
"create_issue_comment",
"edit_issue",
# MCP filesystem tools (read-only)
"list_directory",
"search_files",
"get_file_info",
"directory_tree",
}
)

View File

@@ -566,15 +566,8 @@ def create_full_toolkit(base_dir: str | Path | None = None):
except Exception:
logger.debug("Delegation tools not available")
# Gitea issue management — sovereign self-improvement channel
try:
from timmy.tools_gitea import create_gitea_issue, list_gitea_issues
toolkit.register(create_gitea_issue, name="create_gitea_issue")
toolkit.register(list_gitea_issues, name="list_gitea_issues")
logger.info("Gitea issue tools registered")
except Exception:
logger.debug("Gitea tools not available")
# Gitea issue management is now provided by the gitea-mcp server
# (wired in as MCPTools in agent.py, not registered here)
return toolkit

View File

@@ -1,209 +0,0 @@
"""Gitea tool functions — Timmy's self-improvement channel.
Provides sync tool wrappers around the async GiteaHand for use as
Agno-registered agent tools. When Timmy notices a bug, stale state,
or improvement opportunity, he can file a Gitea issue directly.
Usage::
from timmy.tools_gitea import create_gitea_issue, list_gitea_issues
# In agent conversation or thinking post-hook:
result = create_gitea_issue(
title="memory_forget tool returns error for valid queries",
body="The memory_forget operation fails silently...",
labels="bug",
)
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
def _run_async(coro):
"""Run an async coroutine from sync context (tool functions must be sync).
When no event loop is running, uses ``asyncio.run()``.
When called from within an existing loop (e.g. FastAPI), spawns a
new thread to avoid "cannot call asyncio.run from running loop".
"""
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop — safe to use asyncio.run()
return asyncio.run(coro)
# Already in an async context — run in a new thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, coro).result(timeout=60)
def _bridge_to_work_order(title: str, body: str, category: str) -> None:
"""Also create a local work order so the dashboard tracks it."""
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
conn.close()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)
def create_gitea_issue(title: str, body: str = "", labels: str = "") -> str:
"""Create an issue in the project's Gitea repository.
Use this when you notice a bug, broken feature, stale state, or
improvement opportunity in your own codebase. Issues are tracked
in Gitea and also bridged to the local work order queue.
Args:
title: Short, descriptive issue title.
body: Detailed description in markdown (symptoms, expected
behaviour, relevant files).
labels: Comma-separated label names (e.g. "bug,thinking-engine").
Returns:
Confirmation with issue URL, or explanation if skipped/failed.
"""
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
return (
"Gitea integration is not configured. "
"Set GITEA_TOKEN or place a token in ~/.config/gitea/token."
)
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
async def _create():
# Dedup check — don't file if a similar issue is already open
duplicate = await gitea_hand.find_duplicate(title)
if duplicate:
number = duplicate.get("number", "?")
url = duplicate.get("html_url", "")
return f"Skipped — similar issue already open: #{number} ({url})"
# Append auto-filing signature
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n🤖 *Auto-filed by Timmy's thinking engine*"
result = await gitea_hand.create_issue(
title=title,
body=full_body,
labels=label_list or None,
)
if not result.success:
return f"Failed to create issue: {result.error}"
issue_number = result.data.get("number", "?")
issue_url = result.data.get("html_url", "")
# Bridge to local work order system
category = "bug" if "bug" in label_list else "suggestion"
_bridge_to_work_order(title, body, category)
# Emit event if bus is available
try:
from infrastructure.events.bus import emit
asyncio.ensure_future(
emit(
"gitea.issue.created",
source="timmy-tools",
data={
"issue_number": issue_number,
"title": title,
"url": issue_url,
},
)
)
except Exception:
pass
logger.info("Created Gitea issue #%s: %s", issue_number, title)
return f"Created issue #{issue_number}: {title}\n{issue_url}"
return _run_async(_create())
def list_gitea_issues(state: str = "open") -> str:
"""List issues in the project's Gitea repository.
Use this to check what issues are already filed before creating
new ones, or to review the current backlog.
Args:
state: Filter by state — "open" (default), "closed", or "all".
Returns:
Formatted list of issues with number, title, and labels.
"""
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
return "Gitea integration is not configured."
async def _list():
result = await gitea_hand.list_issues(state=state, limit=25)
if not result.success:
return f"Failed to list issues: {result.error}"
issues = result.data
if not isinstance(issues, list) or not issues:
return f"No {state} issues found."
lines = [f"## {state.capitalize()} Issues ({len(issues)})\n"]
for issue in issues:
number = issue.get("number", "?")
title = issue.get("title", "Untitled")
labels = ", ".join(label.get("name", "") for label in issue.get("labels", []))
label_str = f" [{labels}]" if labels else ""
lines.append(f"- **#{number}** {title}{label_str}")
return "\n".join(lines)
return _run_async(_list())

View File

@@ -33,6 +33,12 @@ for _mod in [
]:
sys.modules.setdefault(_mod, MagicMock())
# agno.tools.mcp requires the real mcp package; stub the sub-module so
# patch("agno.tools.mcp.MCPTools") resolves without installing mcp.
if "agno.tools.mcp" not in sys.modules:
_agno_mcp_stub = MagicMock()
sys.modules["agno.tools.mcp"] = _agno_mcp_stub
# mcp.registry needs a tool_registry with get_handler (used by timmy.agents.base)
_mcp_reg = sys.modules.get("mcp.registry")
if _mcp_reg is not None and not hasattr(_mcp_reg, "tool_registry"):

View File

@@ -1,6 +1,6 @@
"""Tests for dashboard tool confirmation flow using native Agno RunOutput."""
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
def _mock_completed_run(content="Just a reply."):
@@ -47,7 +47,7 @@ def test_chat_with_tool_call_shows_approval_card(client):
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("dashboard.routes.agents.chat_with_tools", new_callable=AsyncMock, return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run echo hello"})
@@ -76,7 +76,7 @@ def test_chat_tool_card_contains_impact_badge(client):
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("dashboard.routes.agents.chat_with_tools", new_callable=AsyncMock, return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
@@ -90,7 +90,7 @@ def test_chat_tool_card_has_htmx_approve_endpoint(client):
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("dashboard.routes.agents.chat_with_tools", new_callable=AsyncMock, return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
@@ -109,7 +109,7 @@ def _create_pending_tool(client, approval_id="test-approval-123"):
item = _mock_approval_item(approval_id)
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("dashboard.routes.agents.chat_with_tools", new_callable=AsyncMock, return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
@@ -131,7 +131,9 @@ def test_approve_executes_tool_and_returns_result(client):
result_run.content = "Done."
with (
patch("dashboard.routes.agents.continue_chat", return_value=result_run),
patch(
"dashboard.routes.agents.continue_chat", new_callable=AsyncMock, return_value=result_run
),
patch("timmy.approvals.approve"),
):
response = client.post(f"/agents/default/tool/{approval_id}/approve")
@@ -153,7 +155,9 @@ def test_approve_same_id_twice_returns_404(client):
result_run = _mock_completed_run("ok")
with (
patch("dashboard.routes.agents.continue_chat", return_value=result_run),
patch(
"dashboard.routes.agents.continue_chat", new_callable=AsyncMock, return_value=result_run
),
patch("timmy.approvals.approve"),
):
client.post(f"/agents/default/tool/{approval_id}/approve")
@@ -171,7 +175,11 @@ def test_reject_returns_rejected_card(client):
approval_id = _create_pending_tool(client)
with (
patch("dashboard.routes.agents.continue_chat", return_value=_mock_completed_run()),
patch(
"dashboard.routes.agents.continue_chat",
new_callable=AsyncMock,
return_value=_mock_completed_run(),
),
patch("timmy.approvals.reject"),
):
response = client.post(f"/agents/default/tool/{approval_id}/reject")

View File

@@ -104,7 +104,9 @@ def _mock_run(content="Operational and ready."):
def test_chat_agent_success(client):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run()):
with patch(
"dashboard.routes.agents.chat_with_tools", new_callable=AsyncMock, return_value=_mock_run()
):
response = client.post("/agents/default/chat", data={"message": "status?"})
assert response.status_code == 200
@@ -113,7 +115,11 @@ def test_chat_agent_success(client):
def test_chat_agent_shows_user_message(client):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("Acknowledged.")):
with patch(
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
return_value=_mock_run("Acknowledged."),
):
response = client.post("/agents/default/chat", data={"message": "hello there"})
assert "hello there" in response.text
@@ -123,6 +129,7 @@ def test_chat_agent_ollama_offline(client):
# When Ollama is unreachable, chat shows the user message + error.
with patch(
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
side_effect=Exception("Ollama unreachable"),
):
response = client.post("/agents/default/chat", data={"message": "ping"})
@@ -147,7 +154,9 @@ def test_history_empty_shows_init_message(client):
def test_history_records_user_and_agent_messages(client):
with patch(
"dashboard.routes.agents.chat_with_tools", return_value=_mock_run("I am operational.")
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
return_value=_mock_run("I am operational."),
):
client.post("/agents/default/chat", data={"message": "status check"})
@@ -158,6 +167,7 @@ def test_history_records_user_and_agent_messages(client):
def test_history_records_error_when_offline(client):
with patch(
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
side_effect=Exception("Ollama unreachable"),
):
client.post("/agents/default/chat", data={"message": "ping"})
@@ -167,7 +177,11 @@ def test_history_records_error_when_offline(client):
def test_history_clear_resets_to_init_message(client):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("Acknowledged.")):
with patch(
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
return_value=_mock_run("Acknowledged."),
):
client.post("/agents/default/chat", data={"message": "hello"})
response = client.delete("/agents/default/history")
@@ -176,7 +190,11 @@ def test_history_clear_resets_to_init_message(client):
def test_history_empty_after_clear(client):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("OK.")):
with patch(
"dashboard.routes.agents.chat_with_tools",
new_callable=AsyncMock,
return_value=_mock_run("OK."),
):
client.post("/agents/default/chat", data={"message": "test"})
client.delete("/agents/default/history")

View File

@@ -160,7 +160,7 @@ class TestHandleMessageConfirmation:
paused_run = _mock_paused_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: paused_run,
AsyncMock(return_value=paused_run),
)
vendor._client = MagicMock()
@@ -208,7 +208,7 @@ class TestHandleMessageConfirmation:
completed_run = _mock_completed_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: completed_run,
AsyncMock(return_value=completed_run),
)
vendor._client = MagicMock()
@@ -247,7 +247,7 @@ class TestHandleMessageConfirmation:
paused_run = _mock_paused_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: paused_run,
AsyncMock(return_value=paused_run),
)
vendor._client = MagicMock()

View File

@@ -1,352 +0,0 @@
"""Tests for the Gitea Hand.
Covers:
- GiteaResult dataclass defaults
- Token resolution (settings vs filesystem fallback)
- Availability checks
- Title similarity dedup
- HTTP request handling (mocked)
- Info summary
- Graceful degradation on failure
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# GiteaResult dataclass
# ---------------------------------------------------------------------------
def test_gitea_result_defaults():
"""GiteaResult should have sensible defaults."""
from infrastructure.hands.gitea import GiteaResult
r = GiteaResult(operation="GET /issues", success=True)
assert r.data == {}
assert r.error == ""
assert r.latency_ms == 0.0
def test_gitea_result_with_data():
"""GiteaResult should carry data."""
from infrastructure.hands.gitea import GiteaResult
r = GiteaResult(
operation="POST /issues",
success=True,
data={"number": 42, "html_url": "http://localhost:3000/issues/42"},
latency_ms=15.3,
)
assert r.data["number"] == 42
assert r.success is True
# ---------------------------------------------------------------------------
# Title similarity
# ---------------------------------------------------------------------------
def test_title_similar_matches():
"""Similar titles should be detected as duplicates."""
from infrastructure.hands.gitea import _title_similar
assert _title_similar("memory_forget tool fails", "Memory_forget tool fails") is True
assert (
_title_similar(
"MEMORY.md not updating after thoughts",
"MEMORY.md hasn't updated since March 8",
)
is True
)
def test_title_similar_rejects_different():
"""Different titles should not match."""
from infrastructure.hands.gitea import _title_similar
assert _title_similar("fix login page CSS", "add dark mode toggle") is False
assert _title_similar("memory bug", "completely different topic") is False
# ---------------------------------------------------------------------------
# Token resolution
# ---------------------------------------------------------------------------
def test_resolve_token_from_settings():
"""Token from settings should be preferred."""
from infrastructure.hands.gitea import _resolve_token
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_token = "from-settings"
assert _resolve_token() == "from-settings"
def test_resolve_token_from_file(tmp_path):
"""Token should fall back to filesystem."""
from infrastructure.hands.gitea import _resolve_token
token_file = tmp_path / "token"
token_file.write_text("from-file\n")
with (
patch("infrastructure.hands.gitea.settings") as mock_settings,
patch("infrastructure.hands.gitea._TOKEN_FILE", token_file),
):
mock_settings.gitea_token = ""
assert _resolve_token() == "from-file"
def test_resolve_token_missing(tmp_path):
"""Empty string when no token available."""
from infrastructure.hands.gitea import _resolve_token
with (
patch("infrastructure.hands.gitea.settings") as mock_settings,
patch("infrastructure.hands.gitea._TOKEN_FILE", tmp_path / "nonexistent"),
):
mock_settings.gitea_token = ""
assert _resolve_token() == ""
# ---------------------------------------------------------------------------
# Availability
# ---------------------------------------------------------------------------
def test_available_when_configured():
"""Hand should be available when token and repo are set."""
from infrastructure.hands.gitea import GiteaHand
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
assert hand.available is True
def test_not_available_without_token():
"""Hand should not be available without token."""
from infrastructure.hands.gitea import GiteaHand
with (
patch("infrastructure.hands.gitea.settings") as mock_settings,
patch("infrastructure.hands.gitea._resolve_token", return_value=""),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = ""
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="")
assert hand.available is False
def test_not_available_when_disabled():
"""Hand should not be available when disabled."""
from infrastructure.hands.gitea import GiteaHand
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
assert hand.available is False
# ---------------------------------------------------------------------------
# HTTP request handling
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_request_returns_error_when_unavailable():
"""_request should return error when not configured."""
from infrastructure.hands.gitea import GiteaHand
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = ""
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="")
result = await hand._request("GET", "/test")
assert result.success is False
assert "not configured" in result.error
@pytest.mark.asyncio
async def test_create_issue_success():
"""create_issue should POST and return issue data."""
from infrastructure.hands.gitea import GiteaHand
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.text = '{"number": 1, "html_url": "http://localhost:3000/issues/1"}'
mock_response.json.return_value = {
"number": 1,
"html_url": "http://localhost:3000/issues/1",
}
mock_client = AsyncMock()
mock_client.request = AsyncMock(return_value=mock_response)
mock_client.aclose = AsyncMock()
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
hand._get_client = MagicMock(return_value=mock_client)
result = await hand.create_issue("Test issue", "Body text")
assert result.success is True
assert result.data["number"] == 1
# Verify the API call
mock_client.request.assert_called_once()
call_args = mock_client.request.call_args
assert call_args[0] == ("POST", "/api/v1/repos/owner/repo/issues")
@pytest.mark.asyncio
async def test_create_issue_handles_http_error():
"""create_issue should handle HTTP errors gracefully."""
from infrastructure.hands.gitea import GiteaHand
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.request = AsyncMock(return_value=mock_response)
mock_client.aclose = AsyncMock()
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
hand._get_client = MagicMock(return_value=mock_client)
result = await hand.create_issue("Test issue")
assert result.success is False
assert "500" in result.error
@pytest.mark.asyncio
async def test_create_issue_handles_connection_error():
"""create_issue should handle connection errors gracefully."""
from infrastructure.hands.gitea import GiteaHand
mock_client = AsyncMock()
mock_client.request = AsyncMock(side_effect=ConnectionError("refused"))
mock_client.aclose = AsyncMock()
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
hand._get_client = MagicMock(return_value=mock_client)
result = await hand.create_issue("Test issue")
assert result.success is False
assert "refused" in result.error
@pytest.mark.asyncio
async def test_find_duplicate_detects_match():
"""find_duplicate should detect similar open issues."""
from infrastructure.hands.gitea import GiteaHand, GiteaResult
existing_issues = [
{"number": 5, "title": "MEMORY.md not updating", "html_url": "http://example.com/5"},
{"number": 6, "title": "Add dark mode", "html_url": "http://example.com/6"},
]
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
# Mock list_issues
hand.list_issues = AsyncMock(
return_value=GiteaResult(
operation="GET",
success=True,
data=existing_issues,
)
)
dup = await hand.find_duplicate("MEMORY.md hasn't updated")
assert dup is not None
assert dup["number"] == 5
@pytest.mark.asyncio
async def test_find_duplicate_no_match():
"""find_duplicate should return None when no similar issue exists."""
from infrastructure.hands.gitea import GiteaHand, GiteaResult
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
hand.list_issues = AsyncMock(
return_value=GiteaResult(
operation="GET",
success=True,
data=[
{"number": 1, "title": "Completely unrelated issue"},
],
)
)
dup = await hand.find_duplicate("memory_forget tool throws error")
assert dup is None
# ---------------------------------------------------------------------------
# Info summary
# ---------------------------------------------------------------------------
def test_info_returns_summary():
"""info() should return a dict with status."""
from infrastructure.hands.gitea import GiteaHand
with patch("infrastructure.hands.gitea.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_token = "test-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_timeout = 30
hand = GiteaHand(token="test-token")
info = hand.info()
assert "base_url" in info
assert "repo" in info
assert "available" in info
assert info["available"] is True

View File

@@ -246,7 +246,7 @@ def test_create_timmy_includes_tools_for_large_model():
create_timmy()
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] == [mock_toolkit]
assert mock_toolkit in kwargs["tools"]
def test_create_timmy_no_unsupported_agent_kwargs():

View File

@@ -0,0 +1,256 @@
"""Tests for the MCP tools module (factories + issue bridge)."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.mcp_tools import (
_bridge_to_work_order,
close_mcp_sessions,
create_filesystem_mcp_tools,
create_gitea_issue_via_mcp,
create_gitea_mcp_tools,
)
# ---------------------------------------------------------------------------
# create_gitea_mcp_tools
# ---------------------------------------------------------------------------
def test_gitea_mcp_returns_none_when_disabled():
"""Gitea MCP factory returns None when gitea_enabled=False."""
with patch("timmy.mcp_tools.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_token = "some-token"
result = create_gitea_mcp_tools()
assert result is None
def test_gitea_mcp_returns_none_when_no_token():
"""Gitea MCP factory returns None when gitea_token is empty."""
with patch("timmy.mcp_tools.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = create_gitea_mcp_tools()
assert result is None
def test_gitea_mcp_returns_tools_when_configured():
"""Gitea MCP factory returns an MCPTools instance when properly configured."""
mock_mcp = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_timeout = 15
result = create_gitea_mcp_tools()
assert result is mock_mcp
mock_cls.assert_called_once()
call_kwargs = mock_cls.call_args[1]
assert call_kwargs["command"] == "gitea-mcp -t stdio"
assert call_kwargs["env"]["GITEA_ACCESS_TOKEN"] == "tok123"
assert "create_issue" in call_kwargs["include_tools"]
def test_gitea_mcp_graceful_on_import_error():
"""Gitea MCP factory returns None if agno.tools.mcp isn't available."""
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch.dict("sys.modules", {"agno.tools.mcp": None}),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
# This should gracefully return None (import will fail)
result = create_gitea_mcp_tools()
assert result is None
# ---------------------------------------------------------------------------
# create_filesystem_mcp_tools
# ---------------------------------------------------------------------------
def test_filesystem_mcp_returns_tools():
"""Filesystem MCP factory returns an MCPTools instance."""
mock_mcp = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
):
mock_settings.mcp_filesystem_command = "npx -y @modelcontextprotocol/server-filesystem"
mock_settings.repo_root = "/home/user/project"
mock_settings.mcp_timeout = 15
result = create_filesystem_mcp_tools()
assert result is mock_mcp
call_kwargs = mock_cls.call_args[1]
assert "/home/user/project" in call_kwargs["command"]
assert "read_file" in call_kwargs["include_tools"]
# ---------------------------------------------------------------------------
# create_gitea_issue_via_mcp
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_issue_via_mcp_returns_not_configured():
"""Issue creation returns message when Gitea is not configured."""
with patch("timmy.mcp_tools.settings") as mock_settings:
mock_settings.gitea_enabled = False
mock_settings.gitea_token = ""
result = await create_gitea_issue_via_mcp("Test", "Body")
assert "not configured" in result
@pytest.mark.asyncio
async def test_issue_via_mcp_calls_tool():
"""Issue creation calls the MCP tool with correct arguments."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.connect = AsyncMock()
mock_session.call_tool = AsyncMock(return_value="Issue #42 created")
mock_session._connected = False
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_session),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.mcp_timeout = 15
mock_settings.repo_root = "/tmp/test"
# Reset module-level cache
mcp_mod._issue_session = None
result = await create_gitea_issue_via_mcp("Bug title", "Bug body", "bug")
assert "Bug title" in result
mock_session.connect.assert_awaited_once()
mock_session.call_tool.assert_awaited_once()
call_args = mock_session.call_tool.call_args
assert call_args[0][0] == "create_issue"
assert call_args[1]["arguments"]["owner"] == "owner"
assert call_args[1]["arguments"]["repo"] == "repo"
# Clean up
mcp_mod._issue_session = None
@pytest.mark.asyncio
async def test_issue_via_mcp_graceful_failure():
"""Issue creation returns error string on MCP failure."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.connect = AsyncMock(side_effect=ConnectionError("no process"))
mock_session._connected = False
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_session),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.mcp_timeout = 15
mock_settings.repo_root = "/tmp/test"
mcp_mod._issue_session = None
result = await create_gitea_issue_via_mcp("Test", "Body")
assert "Failed" in result
mcp_mod._issue_session = None
# ---------------------------------------------------------------------------
# _bridge_to_work_order
# ---------------------------------------------------------------------------
def test_bridge_to_work_order(tmp_path):
"""Work order bridge creates a record in SQLite."""
import sqlite3
with patch("timmy.mcp_tools.settings") as mock_settings:
mock_settings.repo_root = str(tmp_path)
_bridge_to_work_order("Test title", "Test body", "bug")
db_path = tmp_path / "data" / "work_orders.db"
assert db_path.exists()
conn = sqlite3.connect(str(db_path))
rows = conn.execute("SELECT title, category FROM work_orders").fetchall()
conn.close()
assert len(rows) == 1
assert rows[0][0] == "Test title"
assert rows[0][1] == "bug"
# ---------------------------------------------------------------------------
# close_mcp_sessions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_close_mcp_sessions():
"""close_mcp_sessions disconnects the cached session."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.disconnect = AsyncMock()
mcp_mod._issue_session = mock_session
await close_mcp_sessions()
mock_session.disconnect.assert_awaited_once()
assert mcp_mod._issue_session is None
@pytest.mark.asyncio
async def test_close_mcp_sessions_noop_when_none():
"""close_mcp_sessions is a no-op when no session exists."""
import timmy.mcp_tools as mcp_mod
mcp_mod._issue_session = None
await close_mcp_sessions() # Should not raise
assert mcp_mod._issue_session is None
# ---------------------------------------------------------------------------
# Tool safety integration
# ---------------------------------------------------------------------------
def test_mcp_tools_classified_in_safety():
"""MCP tool names are correctly classified in tool_safety."""
from timmy.tool_safety import DANGEROUS_TOOLS, SAFE_TOOLS, requires_confirmation
# Gitea MCP tools should be safe
assert "create_issue" in SAFE_TOOLS
assert "list_repo_issues" in SAFE_TOOLS
# Filesystem read-only MCP tools should be safe
assert "list_directory" in SAFE_TOOLS
assert "search_files" in SAFE_TOOLS
# write_file is dangerous (filesystem MCP)
assert "write_file" in DANGEROUS_TOOLS
# Verify requires_confirmation logic
assert not requires_confirmation("create_issue")
assert not requires_confirmation("list_directory")
assert requires_confirmation("write_file")

View File

@@ -1,6 +1,6 @@
"""Tests for timmy.session — persistent chat session with response sanitization."""
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -24,67 +24,72 @@ def _reset_session_singleton():
# ---------------------------------------------------------------------------
def test_chat_returns_string():
@pytest.mark.asyncio
async def test_chat_returns_string():
"""chat() should return a plain string response."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="Hello, sir.")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="Hello, sir."))
with patch("timmy.session._get_agent", return_value=mock_agent):
from timmy.session import chat
result = chat("Hi Timmy")
result = await chat("Hi Timmy")
assert isinstance(result, str)
assert "Hello, sir." in result
def test_chat_passes_session_id():
"""chat() should pass the session_id to agent.run()."""
@pytest.mark.asyncio
async def test_chat_passes_session_id():
"""chat() should pass the session_id to agent.arun()."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="OK.")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="OK."))
with patch("timmy.session._get_agent", return_value=mock_agent):
from timmy.session import chat
chat("test", session_id="my-session")
await chat("test", session_id="my-session")
_, kwargs = mock_agent.run.call_args
_, kwargs = mock_agent.arun.call_args
assert kwargs["session_id"] == "my-session"
def test_chat_uses_default_session_id():
@pytest.mark.asyncio
async def test_chat_uses_default_session_id():
"""chat() should use 'dashboard' as the default session_id."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="OK.")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="OK."))
with patch("timmy.session._get_agent", return_value=mock_agent):
from timmy.session import chat
chat("test")
await chat("test")
_, kwargs = mock_agent.run.call_args
_, kwargs = mock_agent.arun.call_args
assert kwargs["session_id"] == "dashboard"
def test_chat_singleton_agent_reused():
@pytest.mark.asyncio
async def test_chat_singleton_agent_reused():
"""Calling chat() multiple times should reuse the same agent instance."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="OK.")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="OK."))
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_factory:
from timmy.session import chat
chat("first message")
chat("second message")
await chat("first message")
await chat("second message")
# Factory called only once (singleton)
mock_factory.assert_called_once()
def test_chat_extracts_user_name():
@pytest.mark.asyncio
async def test_chat_extracts_user_name():
"""chat() should extract user name from message and persist to memory."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="Nice to meet you!")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="Nice to meet you!"))
mock_mem = MagicMock()
@@ -94,15 +99,16 @@ def test_chat_extracts_user_name():
):
from timmy.session import chat
chat("my name is Alex")
await chat("my name is Alex")
mock_mem.update_user_fact.assert_called_once_with("Name", "Alex")
def test_chat_graceful_degradation_on_memory_failure():
@pytest.mark.asyncio
async def test_chat_graceful_degradation_on_memory_failure():
"""chat() should still work if the conversation manager raises."""
mock_agent = MagicMock()
mock_agent.run.return_value = MagicMock(content="I'm operational.")
mock_agent.arun = AsyncMock(return_value=MagicMock(content="I'm operational."))
with (
patch("timmy.session._get_agent", return_value=mock_agent),
@@ -112,7 +118,7 @@ def test_chat_graceful_degradation_on_memory_failure():
from timmy.session import chat
result = chat("test message")
result = await chat("test message")
assert "operational" in result

View File

@@ -1,215 +0,0 @@
"""Tests for Gitea tool functions.
Covers:
- create_gitea_issue tool (success, dedup skip, unavailable)
- list_gitea_issues tool (success, empty, unavailable)
- Work order bridge
- Tool safety classification
"""
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Tool safety classification
# ---------------------------------------------------------------------------
def test_gitea_tools_are_safe():
"""Gitea tools should be classified as safe (no confirmation needed)."""
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("create_gitea_issue") is False
assert requires_confirmation("list_gitea_issues") is False
# ---------------------------------------------------------------------------
# create_gitea_issue
# ---------------------------------------------------------------------------
# All patches target infrastructure.hands.gitea.gitea_hand because
# tools_gitea.py uses deferred imports inside function bodies.
def test_create_issue_unavailable():
"""Should return message when Gitea is not configured."""
mock_hand = MagicMock()
mock_hand.available = False
with patch("infrastructure.hands.gitea.gitea_hand", mock_hand):
from timmy.tools_gitea import create_gitea_issue
result = create_gitea_issue("Test issue", "Body")
assert "not configured" in result
def test_create_issue_success():
"""Should create issue and return confirmation."""
from infrastructure.hands.gitea import GiteaResult
mock_hand = MagicMock()
mock_hand.available = True
mock_hand.find_duplicate = AsyncMock(return_value=None)
mock_hand.create_issue = AsyncMock(
return_value=GiteaResult(
operation="POST",
success=True,
data={"number": 42, "html_url": "http://localhost:3000/issues/42"},
)
)
with (
patch("infrastructure.hands.gitea.gitea_hand", mock_hand),
patch("timmy.tools_gitea._bridge_to_work_order"),
):
from timmy.tools_gitea import create_gitea_issue
result = create_gitea_issue("Test bug", "Bug description", "bug")
assert "#42" in result
assert "Test bug" in result
def test_create_issue_dedup_skip():
"""Should skip when similar issue already exists."""
mock_hand = MagicMock()
mock_hand.available = True
mock_hand.find_duplicate = AsyncMock(
return_value={"number": 10, "html_url": "http://localhost:3000/issues/10"}
)
with patch("infrastructure.hands.gitea.gitea_hand", mock_hand):
from timmy.tools_gitea import create_gitea_issue
result = create_gitea_issue("Existing issue")
assert "Skipped" in result
assert "#10" in result
def test_create_issue_api_failure():
"""Should return error message on API failure."""
from infrastructure.hands.gitea import GiteaResult
mock_hand = MagicMock()
mock_hand.available = True
mock_hand.find_duplicate = AsyncMock(return_value=None)
mock_hand.create_issue = AsyncMock(
return_value=GiteaResult(
operation="POST",
success=False,
error="HTTP 500: Internal Server Error",
)
)
with (
patch("infrastructure.hands.gitea.gitea_hand", mock_hand),
patch("timmy.tools_gitea._bridge_to_work_order"),
):
from timmy.tools_gitea import create_gitea_issue
result = create_gitea_issue("Test issue")
assert "Failed" in result
# ---------------------------------------------------------------------------
# list_gitea_issues
# ---------------------------------------------------------------------------
def test_list_issues_unavailable():
"""Should return message when Gitea is not configured."""
mock_hand = MagicMock()
mock_hand.available = False
with patch("infrastructure.hands.gitea.gitea_hand", mock_hand):
from timmy.tools_gitea import list_gitea_issues
result = list_gitea_issues()
assert "not configured" in result
def test_list_issues_success():
"""Should return formatted issue list."""
from infrastructure.hands.gitea import GiteaResult
mock_hand = MagicMock()
mock_hand.available = True
mock_hand.list_issues = AsyncMock(
return_value=GiteaResult(
operation="GET",
success=True,
data=[
{"number": 1, "title": "Bug fix", "labels": [{"name": "bug"}]},
{"number": 2, "title": "Feature request", "labels": []},
],
)
)
with patch("infrastructure.hands.gitea.gitea_hand", mock_hand):
from timmy.tools_gitea import list_gitea_issues
result = list_gitea_issues("open")
assert "#1" in result
assert "Bug fix" in result
assert "[bug]" in result
assert "#2" in result
def test_list_issues_empty():
"""Should return empty message when no issues."""
from infrastructure.hands.gitea import GiteaResult
mock_hand = MagicMock()
mock_hand.available = True
mock_hand.list_issues = AsyncMock(
return_value=GiteaResult(
operation="GET",
success=True,
data=[],
)
)
with patch("infrastructure.hands.gitea.gitea_hand", mock_hand):
from timmy.tools_gitea import list_gitea_issues
result = list_gitea_issues()
assert "No open issues" in result
# ---------------------------------------------------------------------------
# Work order bridge
# ---------------------------------------------------------------------------
def test_bridge_to_work_order(tmp_path):
"""Should create a work order in the local database."""
from timmy.tools_gitea import _bridge_to_work_order
# Point to a "data" subdir inside tmp_path so the code creates it
data_dir = tmp_path / "data"
data_dir.mkdir()
db_path = data_dir / "work_orders.db"
with patch("timmy.tools_gitea.settings") as mock_settings:
mock_settings.repo_root = str(tmp_path)
_bridge_to_work_order("Test WO", "Description", "bug")
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT * FROM work_orders").fetchall()
conn.close()
assert len(rows) == 1
assert rows[0]["title"] == "Test WO"
assert rows[0]["submitter"] == "timmy-thinking"
assert rows[0]["category"] == "bug"
def test_bridge_to_work_order_graceful_failure():
"""Should not raise when bridge fails."""
from timmy.tools_gitea import _bridge_to_work_order
with patch("timmy.tools_gitea.settings") as mock_settings:
mock_settings.repo_root = "/nonexistent/path/that/cannot/exist"
# Should not raise
_bridge_to_work_order("Test", "Desc", "bug")