feat: replace custom Gitea client with MCP servers

Replace the bespoke GiteaHand httpx client and tools_gitea.py wrappers
with official MCP tool servers (gitea-mcp + filesystem MCP), wired into
Agno via MCPTools. Switch all session functions to async (arun/acontinue_run)
so MCP tools auto-connect. Delete ~1070 lines of custom Gitea code.

- Create src/timmy/mcp_tools.py with MCP factories + standalone issue bridge
- Wire MCPTools into agent.py tool list (Gitea + filesystem)
- Switch session.py chat/chat_with_tools/continue_chat to async
- Update all callers (dashboard routes, Discord vendor, CLI, thinking engine)
- Add gitea_token fallback from ~/.config/gitea/token
- Add MCP session cleanup to app shutdown hook
- Update tool_safety.py for MCP tool names
- 11 new tests, all 1417 passing, coverage 74.2%

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Trip T
2026-03-12 21:40:32 -04:00
parent 41d6ebaf6a
commit 78167675f2
24 changed files with 664 additions and 1170 deletions

View File

@@ -7,6 +7,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Central configuration — all env-var access goes through this class."""
# Display name for the primary agent — override with AGENT_NAME env var
agent_name: str = "Agent"
@@ -219,12 +221,17 @@ class Settings(BaseSettings):
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# Timmy can file issues when he notices bugs or improvement opportunities.
# These values are passed as env vars to the gitea-mcp server process.
gitea_url: str = "http://localhost:3000"
gitea_token: str = "" # GITEA_TOKEN env var; falls back to ~/.config/gitea/token
gitea_repo: str = "rockachopa/Timmy-time-dashboard" # owner/repo
gitea_enabled: bool = True
gitea_timeout: int = 30
# ── MCP Servers ────────────────────────────────────────────────────
# External tool servers connected via Model Context Protocol (stdio).
mcp_gitea_command: str = "gitea-mcp -t stdio"
mcp_filesystem_command: str = "npx -y @modelcontextprotocol/server-filesystem"
mcp_timeout: int = 15
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.
@@ -314,6 +321,18 @@ class Settings(BaseSettings):
path = os.path.dirname(path)
return os.getcwd()
def model_post_init(self, __context) -> None:
"""Post-init: resolve gitea_token from file if not set via env."""
if not self.gitea_token:
token_path = os.path.expanduser("~/.config/gitea/token")
try:
if os.path.isfile(token_path):
token = open(token_path).read().strip() # noqa: SIM115
if token:
self.gitea_token = token
except OSError:
pass
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",

View File

@@ -391,6 +391,14 @@ async def lifespan(app: FastAPI):
await discord_bot.stop()
await telegram_bot.stop()
# Close MCP tool server sessions
try:
from timmy.mcp_tools import close_mcp_sessions
await close_mcp_sessions()
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
for task in [briefing_task, thinking_task, chat_task, loop_qa_task]:
if task:
task.cancel()

View File

@@ -1,4 +1,3 @@
import asyncio
import json
import logging
from datetime import datetime
@@ -90,7 +89,7 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = None
try:
run_output = await asyncio.to_thread(chat_with_tools, message)
run_output = await chat_with_tools(message)
except Exception as exc:
logger.error("Chat error: %s", exc)
error_text = f"Chat error: {exc}"
@@ -181,7 +180,7 @@ async def approve_tool(request: Request, approval_id: str):
req.confirm()
try:
result_run = await asyncio.to_thread(continue_chat, pending["run_output"])
result_run = await continue_chat(pending["run_output"])
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
@@ -220,7 +219,7 @@ async def reject_tool(request: Request, approval_id: str):
req.reject(note="User rejected from dashboard")
# Resume so the agent knows the tool was rejected
try:
await asyncio.to_thread(continue_chat, pending["run_output"])
await continue_chat(pending["run_output"])
except Exception:
pass

View File

@@ -88,7 +88,7 @@ async def api_chat(request: Request):
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
response_text = agent_chat(
response_text = await agent_chat(
context_prefix + last_user_msg,
session_id="mobile",
)

View File

@@ -1,295 +0,0 @@
"""Gitea Hand — issue tracking and self-improvement channel for Timmy.
Provides Gitea API capabilities with:
- Token auth (env var, config, or ~/.config/gitea/token fallback)
- Structured result parsing
- Dedup checks before creating issues
- Graceful degradation: log warning, return fallback, never crash
Follows project conventions:
- Config via ``from config import settings``
- Singleton pattern for module-level import
- Async httpx client (like Paperclip client pattern)
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
@dataclass
class GiteaResult:
"""Result from a Gitea API operation."""
operation: str
success: bool
data: dict = field(default_factory=dict)
error: str = ""
latency_ms: float = 0.0
def _resolve_token() -> str:
"""Resolve Gitea API token from settings or filesystem fallback."""
if settings.gitea_token:
return settings.gitea_token
try:
return _TOKEN_FILE.read_text().strip()
except (FileNotFoundError, PermissionError):
return ""
def _title_similar(a: str, b: str, threshold: float = 0.6) -> bool:
"""Check if two issue titles are similar enough to be duplicates."""
return SequenceMatcher(None, a.lower(), b.lower()).ratio() > threshold
class GiteaHand:
"""Gitea API hand for Timmy.
All methods degrade gracefully — if Gitea is unreachable or the
API call fails, the hand returns a ``GiteaResult(success=False)``
rather than raising.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
repo: str | None = None,
timeout: int | None = None,
) -> None:
self._base_url = (base_url or settings.gitea_url).rstrip("/")
self._token = token or _resolve_token()
self._repo = repo or settings.gitea_repo
self._timeout = timeout or settings.gitea_timeout
if not self._token:
logger.warning(
"Gitea token not configured — set GITEA_TOKEN or place token in %s",
_TOKEN_FILE,
)
else:
logger.info(
"GiteaHand initialised — %s/%s",
self._base_url,
self._repo,
)
@property
def available(self) -> bool:
"""Check if Gitea integration is configured and enabled."""
return bool(settings.gitea_enabled and self._token and self._repo)
def _get_client(self):
"""Create a fresh async HTTP client for the current event loop.
Always creates a new client rather than caching, because tool
functions call us via ``asyncio.run()`` which creates a new loop
each time — a cached client from a previous loop would raise
"Event loop is closed".
"""
import httpx
return httpx.AsyncClient(
base_url=self._base_url,
headers={
"Authorization": f"token {self._token}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=self._timeout,
)
async def _request(self, method: str, path: str, **kwargs) -> GiteaResult:
"""Make an API request with full error handling."""
start = time.time()
operation = f"{method.upper()} {path}"
if not self.available:
return GiteaResult(
operation=operation,
success=False,
error="Gitea not configured (missing token or repo)",
)
client = self._get_client()
try:
resp = await client.request(method, path, **kwargs)
latency = (time.time() - start) * 1000
if resp.status_code >= 400:
error_body = resp.text[:500]
logger.warning(
"Gitea API %s returned %d: %s",
operation,
resp.status_code,
error_body,
)
return GiteaResult(
operation=operation,
success=False,
error=f"HTTP {resp.status_code}: {error_body}",
latency_ms=latency,
)
return GiteaResult(
operation=operation,
success=True,
data=resp.json() if resp.text else {},
latency_ms=latency,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Gitea API %s failed: %s", operation, exc)
return GiteaResult(
operation=operation,
success=False,
error=str(exc),
latency_ms=latency,
)
finally:
await client.aclose()
# ── Issue operations ─────────────────────────────────────────────────
async def create_issue(
self,
title: str,
body: str = "",
labels: list[str] | None = None,
) -> GiteaResult:
"""Create an issue in the configured repository.
Args:
title: Issue title (required).
body: Issue body in markdown.
labels: Optional list of label names (must exist in repo).
Returns:
GiteaResult with issue data (number, html_url, etc.).
"""
owner, repo = self._repo.split("/", 1)
payload: dict = {"title": title, "body": body}
# Resolve label names to IDs if provided
if labels:
label_ids = await self._resolve_label_ids(owner, repo, labels)
if label_ids:
payload["labels"] = label_ids
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues",
json=payload,
)
async def list_issues(
self,
state: str = "open",
labels: list[str] | None = None,
limit: int = 50,
) -> GiteaResult:
"""List issues in the configured repository.
Args:
state: Filter by state ("open", "closed", "all").
labels: Filter by label names.
limit: Max issues to return.
Returns:
GiteaResult with list of issue dicts.
"""
owner, repo = self._repo.split("/", 1)
params: dict = {"state": state, "limit": limit, "type": "issues"}
if labels:
params["labels"] = ",".join(labels)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues",
params=params,
)
async def get_issue(self, number: int) -> GiteaResult:
"""Get a single issue by number."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
)
async def add_comment(self, number: int, body: str) -> GiteaResult:
"""Add a comment to an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"POST",
f"/api/v1/repos/{owner}/{repo}/issues/{number}/comments",
json={"body": body},
)
async def close_issue(self, number: int) -> GiteaResult:
"""Close an issue."""
owner, repo = self._repo.split("/", 1)
return await self._request(
"PATCH",
f"/api/v1/repos/{owner}/{repo}/issues/{number}",
json={"state": "closed"},
)
# ── Dedup helper ─────────────────────────────────────────────────────
async def find_duplicate(self, title: str, threshold: float = 0.6) -> dict | None:
"""Check if an open issue with a similar title already exists.
Returns the matching issue dict, or None if no duplicate found.
"""
result = await self.list_issues(state="open", limit=100)
if not result.success or not isinstance(result.data, list):
return None
for issue in result.data:
existing_title = issue.get("title", "")
if _title_similar(title, existing_title, threshold):
return issue
return None
# ── Label helper ─────────────────────────────────────────────────────
async def _resolve_label_ids(self, owner: str, repo: str, label_names: list[str]) -> list[int]:
"""Resolve label names to IDs. Returns empty list on failure."""
result = await self._request(
"GET",
f"/api/v1/repos/{owner}/{repo}/labels",
)
if not result.success or not isinstance(result.data, list):
return []
name_to_id = {label["name"].lower(): label["id"] for label in result.data}
return [name_to_id[name.lower()] for name in label_names if name.lower() in name_to_id]
# ── Status ───────────────────────────────────────────────────────────
def info(self) -> dict:
"""Return a status summary for the dashboard."""
return {
"base_url": self._base_url,
"repo": self._repo,
"available": self.available,
"has_token": bool(self._token),
}
# ── Module-level singleton ──────────────────────────────────────────────────
gitea_hand = GiteaHand()

View File

@@ -82,9 +82,7 @@ if _DISCORD_UI_AVAILABLE:
req = action.get("requirement")
if req:
req.reject(note="Timed out — auto-rejected")
await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
await continue_chat(action["run_output"], action.get("session_id"))
await action["target"].send(
f"Action `{action['tool_name']}` timed out and was auto-rejected."
@@ -427,9 +425,7 @@ class DiscordVendor(ChatPlatform):
req.confirm()
try:
result_run = await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
result_run = await continue_chat(action["run_output"], action.get("session_id"))
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
@@ -461,7 +457,7 @@ class DiscordVendor(ChatPlatform):
req = action["requirement"]
req.reject(note="User rejected from Discord")
try:
await asyncio.to_thread(continue_chat, action["run_output"], action.get("session_id"))
await continue_chat(action["run_output"], action.get("session_id"))
except Exception:
pass
@@ -539,7 +535,7 @@ class DiscordVendor(ChatPlatform):
try:
async with target.typing():
run_output = await asyncio.wait_for(
asyncio.to_thread(chat_with_tools, content, session_id),
chat_with_tools(content, session_id),
timeout=300,
)
except TimeoutError:

View File

@@ -245,10 +245,30 @@ def create_timmy(
use_tools = _model_supports_tools(model_name)
# Conditionally include tools — small models get none
tools = create_full_toolkit() if use_tools else None
toolkit = create_full_toolkit() if use_tools else None
if not use_tools:
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
tools_list: list = []
if toolkit:
tools_list.append(toolkit)
# Add MCP tool servers (lazy-connected on first arun())
if use_tools:
try:
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
gitea_mcp = create_gitea_mcp_tools()
if gitea_mcp:
tools_list.append(gitea_mcp)
fs_mcp = create_filesystem_mcp_tools()
if fs_mcp:
tools_list.append(fs_mcp)
except Exception as exc:
logger.debug("MCP tools unavailable: %s", exc)
# Select prompt tier based on tool capability
base_prompt = get_system_prompt(tools_enabled=use_tools)
@@ -278,7 +298,7 @@ def create_timmy(
add_history_to_context=True,
num_history_runs=20,
markdown=True,
tools=[tools] if tools else None,
tools=tools_list if tools_list else None,
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)

View File

@@ -172,6 +172,8 @@ def interview(
Asks Timmy a series of questions about his identity, capabilities,
values, and operation to verify he is working correctly.
"""
import asyncio
from timmy.interview import InterviewEntry, format_transcript, run_interview
from timmy.session import chat
@@ -179,7 +181,9 @@ def interview(
# Force agent creation by calling chat once with a warm-up prompt
try:
chat("Hello, Timmy. We're about to start your interview.", session_id="interview")
asyncio.run(
chat("Hello, Timmy. We're about to start your interview.", session_id="interview")
)
except Exception as exc:
typer.echo(f"Warning: Initialization issue — {exc}", err=True)
@@ -191,7 +195,7 @@ def interview(
typer.echo("Starting interview...\n")
transcript = run_interview(
chat_fn=lambda msg: chat(msg, session_id="interview"),
chat_fn=lambda msg: asyncio.run(chat(msg, session_id="interview")),
on_answer=_on_answer,
)

222
src/timmy/mcp_tools.py Normal file
View File

@@ -0,0 +1,222 @@
"""MCP tool server factories for Agno agent integration.
Provides factory functions that create ``MCPTools`` instances for external
tool servers (Gitea, Filesystem) using stdio transport. Also provides a
standalone async helper for filing Gitea issues from the thinking engine
without going through the full LLM loop.
Usage::
from timmy.mcp_tools import create_gitea_mcp_tools, create_filesystem_mcp_tools
# In agent creation (added to tools list):
gitea_tools = create_gitea_mcp_tools()
fs_tools = create_filesystem_mcp_tools()
# Direct issue filing (thinking engine):
from timmy.mcp_tools import create_gitea_issue_via_mcp
result = await create_gitea_issue_via_mcp("Bug title", "Body", "bug")
"""
from __future__ import annotations
import logging
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# Module-level cache for the standalone issue-filing session
_issue_session = None
def create_gitea_mcp_tools():
"""Create an MCPTools instance for the Gitea MCP server.
Returns None if Gitea is disabled or not configured (no token).
The returned MCPTools is lazy — Agno connects it on first ``arun()``.
"""
if not settings.gitea_enabled or not settings.gitea_token:
logger.debug("Gitea MCP: disabled or no token configured")
return None
try:
from agno.tools.mcp import MCPTools
# Build command — gitea-mcp expects "-t stdio" for stdio transport
command = settings.mcp_gitea_command
tools = MCPTools(
command=command,
env={
"GITEA_ACCESS_TOKEN": settings.gitea_token,
"GITEA_HOST": settings.gitea_url,
},
include_tools=[
"create_issue",
"list_repo_issues",
"create_issue_comment",
"edit_issue",
],
timeout=settings.mcp_timeout,
)
logger.info("Gitea MCP tools created (lazy connect)")
return tools
except Exception as exc:
logger.warning("Failed to create Gitea MCP tools: %s", exc)
return None
def create_filesystem_mcp_tools():
"""Create an MCPTools instance for the filesystem MCP server.
Returns None if the command is not configured.
Scoped to the project repo_root directory.
"""
try:
from agno.tools.mcp import MCPTools
command = f"{settings.mcp_filesystem_command} {settings.repo_root}"
tools = MCPTools(
command=command,
include_tools=[
"read_file",
"write_file",
"list_directory",
"search_files",
"get_file_info",
"directory_tree",
],
timeout=settings.mcp_timeout,
)
logger.info("Filesystem MCP tools created (lazy connect)")
return tools
except Exception as exc:
logger.warning("Failed to create filesystem MCP tools: %s", exc)
return None
def _bridge_to_work_order(title: str, body: str, category: str) -> None:
"""Create a local work order so the dashboard tracks the issue."""
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
conn.close()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)
async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "") -> str:
"""File a Gitea issue via the MCP server (standalone, no LLM loop).
Used by the thinking engine's ``_maybe_file_issues()`` post-hook.
Manages its own MCPTools session with lazy connect + graceful failure.
Args:
title: Issue title.
body: Issue body (markdown).
labels: Comma-separated label names.
Returns:
Confirmation string or error explanation.
"""
if not settings.gitea_enabled or not settings.gitea_token:
return "Gitea integration is not configured."
try:
from agno.tools.mcp import MCPTools
global _issue_session
if _issue_session is None:
_issue_session = MCPTools(
command=settings.mcp_gitea_command,
env={
"GITEA_ACCESS_TOKEN": settings.gitea_token,
"GITEA_HOST": settings.gitea_url,
},
timeout=settings.mcp_timeout,
)
# Ensure connected
if not getattr(_issue_session, "_connected", False):
await _issue_session.connect()
_issue_session._connected = True
# Append auto-filing signature
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n*Auto-filed by Timmy's thinking engine*"
# Parse owner/repo from settings
owner, repo = settings.gitea_repo.split("/", 1)
# Build tool arguments
args = {
"owner": owner,
"repo": repo,
"title": title,
"body": full_body,
}
# Call the MCP tool directly via the session
result = await _issue_session.call_tool("create_issue", arguments=args)
# Bridge to local work order
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
category = "bug" if "bug" in label_list else "suggestion"
_bridge_to_work_order(title, body, category)
logger.info("Created Gitea issue via MCP: %s", title[:60])
return f"Created issue: {title}\n{result}"
except Exception as exc:
logger.warning("MCP issue creation failed: %s", exc)
return f"Failed to create issue via MCP: {exc}"
async def close_mcp_sessions() -> None:
"""Close any open MCP sessions. Called during app shutdown."""
global _issue_session
if _issue_session is not None:
try:
await _issue_session.disconnect()
except Exception as exc:
logger.debug("MCP session disconnect error: %s", exc)
_issue_session = None

View File

@@ -59,11 +59,12 @@ def _get_agent():
return _agent
def chat(message: str, session_id: str | None = None) -> str:
async def chat(message: str, session_id: str | None = None) -> str:
"""Send a message to Timmy and get a response.
Uses a persistent agent and session_id so Agno's SQLite history
provides multi-turn conversation context.
provides multi-turn conversation context. Uses ``arun()`` so MCP
tool servers are auto-connected.
Args:
message: The user's message.
@@ -80,10 +81,10 @@ def chat(message: str, session_id: str | None = None) -> str:
# Run with session_id so Agno retrieves history from SQLite
try:
run = agent.run(message, stream=False, session_id=sid)
run = await agent.arun(message, stream=False, session_id=sid)
response_text = run.content if hasattr(run, "content") else str(run)
except Exception as exc:
logger.error("Session: agent.run() failed: %s", exc)
logger.error("Session: agent.arun() failed: %s", exc)
return "I'm having trouble reaching my language model right now. Please try again shortly."
# Post-processing: clean up any leaked tool calls or chain-of-thought
@@ -92,13 +93,15 @@ def chat(message: str, session_id: str | None = None) -> str:
return response_text
def chat_with_tools(message: str, session_id: str | None = None):
async def chat_with_tools(message: str, session_id: str | None = None):
"""Send a message and return the full Agno RunOutput.
Callers should check ``run_output.status``:
- ``RunStatus.paused`` — tools need confirmation (see ``run_output.requirements``)
- ``RunStatus.completed`` — response ready in ``run_output.content``
Uses ``arun()`` so MCP tool servers are auto-connected.
Returns:
An Agno ``RunOutput`` object (or a lightweight surrogate on error).
"""
@@ -107,16 +110,16 @@ def chat_with_tools(message: str, session_id: str | None = None):
_extract_facts(message)
try:
return agent.run(message, stream=False, session_id=sid)
return await agent.arun(message, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.run() failed: %s", exc)
logger.error("Session: agent.arun() failed: %s", exc)
# Return a duck-typed object that callers can handle uniformly
return _ErrorRunOutput(
"I'm having trouble reaching my language model right now. Please try again shortly."
)
def continue_chat(run_output, session_id: str | None = None):
async def continue_chat(run_output, session_id: str | None = None):
"""Resume a paused run after tool confirmation / rejection.
Args:
@@ -129,9 +132,9 @@ def continue_chat(run_output, session_id: str | None = None):
agent = _get_agent()
try:
return agent.continue_run(run_response=run_output, stream=False, session_id=sid)
return await agent.acontinue_run(run_response=run_output, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.continue_run() failed: %s", exc)
logger.error("Session: agent.acontinue_run() failed: %s", exc)
return _ErrorRunOutput(f"Error continuing run: {exc}")
@@ -149,7 +152,7 @@ class _ErrorRunOutput:
return []
def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
async def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
"""Send a message and return both cleaned and raw responses.
Backward-compatible wrapper around :func:`chat_with_tools`.
@@ -158,7 +161,7 @@ def chat_raw(message: str, session_id: str | None = None) -> tuple[str, str]:
(cleaned_response, raw_response) — cleaned has tool-call JSON and
chain-of-thought stripped; raw is the model's original output.
"""
run = chat_with_tools(message, session_id)
run = await chat_with_tools(message, session_id)
raw_response = run.content if hasattr(run, "content") and run.content else ""
cleaned = _clean_response(raw_response)
return cleaned, raw_response

View File

@@ -213,7 +213,7 @@ class ThinkingEngine:
)
try:
content = self._call_agent(prompt)
content = await self._call_agent(prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
@@ -226,10 +226,10 @@ class ThinkingEngine:
self._last_thought_id = thought.id
# Post-hook: distill facts from recent thoughts periodically
self._maybe_distill()
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
self._maybe_file_issues()
await self._maybe_file_issues()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
@@ -324,7 +324,7 @@ class ThinkingEngine:
# ── Private helpers ──────────────────────────────────────────────────
def _maybe_distill(self) -> None:
async def _maybe_distill(self) -> None:
"""Every N thoughts, extract lasting insights and store as facts.
Reads the last N thoughts, asks the LLM to extract any durable facts
@@ -355,7 +355,7 @@ class ThinkingEngine:
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
raw = self._call_agent(distill_prompt)
raw = await self._call_agent(distill_prompt)
if not raw or not raw.strip():
return
@@ -381,13 +381,12 @@ class ThinkingEngine:
except Exception as exc:
logger.debug("Thought distillation skipped: %s", exc)
def _maybe_file_issues(self) -> None:
async def _maybe_file_issues(self) -> None:
"""Every N thoughts, classify recent thoughts and file Gitea issues.
Asks the LLM to review recent thoughts for actionable items —
bugs, broken features, stale state, or improvement opportunities.
Creates Gitea issues for anything worth tracking, with dedup to
avoid flooding.
Creates Gitea issues via MCP for anything worth tracking.
Only runs when:
- Gitea is enabled and configured
@@ -404,9 +403,7 @@ class ThinkingEngine:
return
# Check Gitea availability before spending LLM tokens
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
if not settings.gitea_enabled or not settings.gitea_token:
return
recent = self.get_recent_thoughts(limit=interval)
@@ -437,7 +434,7 @@ class ThinkingEngine:
f"Recent thoughts:\n{thought_text}\n\nJSON array:"
)
raw = self._call_agent(classify_prompt)
raw = await self._call_agent(classify_prompt)
if not raw or not raw.strip():
return
@@ -452,7 +449,7 @@ class ThinkingEngine:
if not isinstance(items, list) or not items:
return
from timmy.tools_gitea import create_gitea_issue
from timmy.mcp_tools import create_gitea_issue_via_mcp
for item in items[:2]: # Safety cap
if not isinstance(item, dict):
@@ -464,7 +461,7 @@ class ThinkingEngine:
continue
label = category if category in ("bug", "feature") else ""
result = create_gitea_issue(title=title, body=body, labels=label)
result = await create_gitea_issue_via_mcp(title=title, body=body, labels=label)
logger.info("Thought→Issue: %s%s", title[:60], result[:80])
except Exception as exc:
@@ -711,7 +708,7 @@ class ThinkingEngine:
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
def _call_agent(self, prompt: str) -> str:
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Uses a separate session_id to avoid polluting user chat history.
@@ -719,13 +716,13 @@ class ThinkingEngine:
try:
from timmy.session import chat
return chat(prompt, session_id="thinking")
return await chat(prompt, session_id="thinking")
except Exception:
# Fallback: create a fresh agent
from timmy.agent import create_timmy
agent = create_timmy()
run = agent.run(prompt, stream=False)
run = await agent.arun(prompt, stream=False)
return run.content if hasattr(run, "content") else str(run)
def _store_thought(self, content: str, seed_type: str) -> Thought:

View File

@@ -43,8 +43,16 @@ SAFE_TOOLS = frozenset(
"check_ollama_health",
"get_memory_status",
"list_swarm_agents",
"create_gitea_issue",
"list_gitea_issues",
# MCP Gitea tools
"create_issue",
"list_repo_issues",
"create_issue_comment",
"edit_issue",
# MCP filesystem tools (read-only)
"list_directory",
"search_files",
"get_file_info",
"directory_tree",
}
)

View File

@@ -566,15 +566,8 @@ def create_full_toolkit(base_dir: str | Path | None = None):
except Exception:
logger.debug("Delegation tools not available")
# Gitea issue management — sovereign self-improvement channel
try:
from timmy.tools_gitea import create_gitea_issue, list_gitea_issues
toolkit.register(create_gitea_issue, name="create_gitea_issue")
toolkit.register(list_gitea_issues, name="list_gitea_issues")
logger.info("Gitea issue tools registered")
except Exception:
logger.debug("Gitea tools not available")
# Gitea issue management is now provided by the gitea-mcp server
# (wired in as MCPTools in agent.py, not registered here)
return toolkit

View File

@@ -1,209 +0,0 @@
"""Gitea tool functions — Timmy's self-improvement channel.
Provides sync tool wrappers around the async GiteaHand for use as
Agno-registered agent tools. When Timmy notices a bug, stale state,
or improvement opportunity, he can file a Gitea issue directly.
Usage::
from timmy.tools_gitea import create_gitea_issue, list_gitea_issues
# In agent conversation or thinking post-hook:
result = create_gitea_issue(
title="memory_forget tool returns error for valid queries",
body="The memory_forget operation fails silently...",
labels="bug",
)
"""
from __future__ import annotations
import asyncio
import logging
import sqlite3
import uuid
from datetime import datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
def _run_async(coro):
"""Run an async coroutine from sync context (tool functions must be sync).
When no event loop is running, uses ``asyncio.run()``.
When called from within an existing loop (e.g. FastAPI), spawns a
new thread to avoid "cannot call asyncio.run from running loop".
"""
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop — safe to use asyncio.run()
return asyncio.run(coro)
# Already in an async context — run in a new thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, coro).result(timeout=60)
def _bridge_to_work_order(title: str, body: str, category: str) -> None:
"""Also create a local work order so the dashboard tracks it."""
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
conn.close()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)
def create_gitea_issue(title: str, body: str = "", labels: str = "") -> str:
"""Create an issue in the project's Gitea repository.
Use this when you notice a bug, broken feature, stale state, or
improvement opportunity in your own codebase. Issues are tracked
in Gitea and also bridged to the local work order queue.
Args:
title: Short, descriptive issue title.
body: Detailed description in markdown (symptoms, expected
behaviour, relevant files).
labels: Comma-separated label names (e.g. "bug,thinking-engine").
Returns:
Confirmation with issue URL, or explanation if skipped/failed.
"""
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
return (
"Gitea integration is not configured. "
"Set GITEA_TOKEN or place a token in ~/.config/gitea/token."
)
label_list = [tag.strip() for tag in labels.split(",") if tag.strip()] if labels else []
async def _create():
# Dedup check — don't file if a similar issue is already open
duplicate = await gitea_hand.find_duplicate(title)
if duplicate:
number = duplicate.get("number", "?")
url = duplicate.get("html_url", "")
return f"Skipped — similar issue already open: #{number} ({url})"
# Append auto-filing signature
full_body = body
if full_body:
full_body += "\n\n"
full_body += "---\n🤖 *Auto-filed by Timmy's thinking engine*"
result = await gitea_hand.create_issue(
title=title,
body=full_body,
labels=label_list or None,
)
if not result.success:
return f"Failed to create issue: {result.error}"
issue_number = result.data.get("number", "?")
issue_url = result.data.get("html_url", "")
# Bridge to local work order system
category = "bug" if "bug" in label_list else "suggestion"
_bridge_to_work_order(title, body, category)
# Emit event if bus is available
try:
from infrastructure.events.bus import emit
asyncio.ensure_future(
emit(
"gitea.issue.created",
source="timmy-tools",
data={
"issue_number": issue_number,
"title": title,
"url": issue_url,
},
)
)
except Exception:
pass
logger.info("Created Gitea issue #%s: %s", issue_number, title)
return f"Created issue #{issue_number}: {title}\n{issue_url}"
return _run_async(_create())
def list_gitea_issues(state: str = "open") -> str:
"""List issues in the project's Gitea repository.
Use this to check what issues are already filed before creating
new ones, or to review the current backlog.
Args:
state: Filter by state — "open" (default), "closed", or "all".
Returns:
Formatted list of issues with number, title, and labels.
"""
from infrastructure.hands.gitea import gitea_hand
if not gitea_hand.available:
return "Gitea integration is not configured."
async def _list():
result = await gitea_hand.list_issues(state=state, limit=25)
if not result.success:
return f"Failed to list issues: {result.error}"
issues = result.data
if not isinstance(issues, list) or not issues:
return f"No {state} issues found."
lines = [f"## {state.capitalize()} Issues ({len(issues)})\n"]
for issue in issues:
number = issue.get("number", "?")
title = issue.get("title", "Untitled")
labels = ", ".join(label.get("name", "") for label in issue.get("labels", []))
label_str = f" [{labels}]" if labels else ""
lines.append(f"- **#{number}** {title}{label_str}")
return "\n".join(lines)
return _run_async(_list())