feat: migrate to Agno native HITL tool confirmation flow (#158)

Replace the homebrew regex-based tool extraction and manual dispatch
(tool_executor.py) with Agno's built-in Human-In-The-Loop confirmation:

- Toolkit(requires_confirmation_tools=...) marks dangerous tools
- agent.run() returns RunOutput with status=paused when confirmation needed
- RunRequirement.confirm()/reject() + agent.continue_run() resumes execution

Dashboard and Discord vendor both use the native flow. DuckDuckGo import
isolated so its absence doesn't kill all tools. Test stubs cleaned up
(agno is a real dependency, only truly optional packages stubbed).

1384 tests pass in parallel (~14s).

Co-authored-by: Trip T <trip@local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-09 21:54:04 -04:00
committed by GitHub
parent 574031a55c
commit 904a7c564e
18 changed files with 1317 additions and 85 deletions

View File

@@ -29,6 +29,13 @@ class Settings(BaseSettings):
# Discord bot token — set via DISCORD_TOKEN env var or the /discord/setup endpoint
discord_token: str = ""
# ── Discord action confirmation ──────────────────────────────────────────
# When True, dangerous tools (shell, write_file, python) require user
# confirmation via Discord button before executing.
discord_confirm_actions: bool = True
# Seconds to wait for user confirmation before auto-rejecting.
discord_confirm_timeout: int = 120
# ── AirLLM / backend selection ───────────────────────────────────────────
# "ollama" — always use Ollama (default, safe everywhere)
# "airllm" — always use AirLLM (requires pip install ".[bigbrain]")

View File

@@ -1,4 +1,5 @@
import asyncio
import json
import logging
from datetime import datetime
@@ -7,12 +8,22 @@ from fastapi.responses import HTMLResponse
from dashboard.store import message_log
from dashboard.templating import templates
from timmy.session import chat as agent_chat
from timmy.session import _clean_response, chat_with_tools, continue_chat
from timmy.tool_safety import (
format_action_description,
get_impact_level,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/agents", tags=["agents"])
MAX_MESSAGE_LENGTH = 10_000 # chars — reject before hitting the model
# In-memory store for paused runs (approval_id -> run context).
# Each entry holds the RunOutput, the RunRequirement ref, and tool metadata.
_pending_runs: dict[str, dict] = {}
@router.get("")
async def list_agents():
@@ -62,25 +73,72 @@ async def clear_history(request: Request):
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response."""
"""Chat — synchronous response with native Agno tool confirmation."""
message = message.strip()
if not message:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="Message cannot be empty")
if len(message) > MAX_MESSAGE_LENGTH:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="Message too long")
timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None
error_text = None
try:
response_text = await asyncio.to_thread(agent_chat, message)
run_output = await asyncio.to_thread(chat_with_tools, message)
except Exception as exc:
logger.error("Chat error: %s", exc)
error_text = f"Chat error: {exc}"
run_output = None
# Check if Agno paused the run for tool confirmation
tool_actions = []
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from timmy.approvals import create_item
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response_text = _clean_response(raw_content or "")
if not response_text and not tool_actions:
response_text = None # let error template show if needed
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text is not None:
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
@@ -97,5 +155,84 @@ async def chat_agent(request: Request, message: str = Form(...)):
"timestamp": timestamp,
"task_id": None,
"queue_info": None,
"tool_actions": tool_actions,
},
)
@router.post("/default/tool/{approval_id}/approve", response_class=HTMLResponse)
async def approve_tool(request: Request, approval_id: str):
"""Confirm a paused tool and resume execution via Agno."""
from timmy.approvals import approve
pending = _pending_runs.pop(approval_id, None)
if not pending:
return HTMLResponse(
"<p class='text-danger'>Action not found or already processed.</p>",
status_code=404,
)
approve(approval_id)
tool_name = pending["tool_name"]
# Confirm the requirement — Agno will execute the tool on continue_run
req = pending["requirement"]
req.confirm()
try:
result_run = await asyncio.to_thread(continue_chat, pending["run_output"])
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
if getattr(te, "tool_name", None) == tool_name and getattr(te, "result", None):
tool_result = te.result
break
if not tool_result:
tool_result = getattr(result_run, "content", None) or "Tool executed successfully."
except Exception as exc:
logger.error("Tool execution failed: %s", exc)
tool_result = f"Error: {exc}"
return templates.TemplateResponse(
request,
"partials/chat_tool_result.html",
{
"approval_id": approval_id,
"tool_name": tool_name,
"status": "approved",
"result": str(tool_result)[:2000],
},
)
@router.post("/default/tool/{approval_id}/reject", response_class=HTMLResponse)
async def reject_tool(request: Request, approval_id: str):
"""Reject a pending tool action."""
from timmy.approvals import reject
pending = _pending_runs.pop(approval_id, None)
tool_name = "action"
if pending:
tool_name = pending["tool_name"]
req = pending["requirement"]
req.reject(note="User rejected from dashboard")
# Resume so the agent knows the tool was rejected
try:
await asyncio.to_thread(continue_chat, pending["run_output"])
except Exception:
pass
reject(approval_id)
return templates.TemplateResponse(
request,
"partials/chat_tool_result.html",
{
"approval_id": approval_id,
"tool_name": tool_name,
"status": "rejected",
"result": "",
},
)

View File

@@ -12,12 +12,39 @@
<small class="text-muted">Position in queue: {{ queue_info.position }}/{{ queue_info.total }}</small>
</div>
{% endif %}
{% if tool_actions %}
{% for action in tool_actions %}
<div class="approval-card pending" id="tool-{{ action.approval_id }}">
<div class="d-flex justify-content-between align-items-start mb-1">
<div class="approval-card-title">{{ action.tool_name }}</div>
<span class="impact-badge impact-{{ action.impact }}">{{ action.impact }}</span>
</div>
<div class="approval-card-desc">{{ action.description | e }}</div>
<div class="approval-actions">
<button class="btn-approve"
hx-post="/agents/default/tool/{{ action.approval_id }}/approve"
hx-target="#tool-{{ action.approval_id }}"
hx-swap="outerHTML">
APPROVE
</button>
<button class="btn-reject"
hx-post="/agents/default/tool/{{ action.approval_id }}/reject"
hx-target="#tool-{{ action.approval_id }}"
hx-swap="outerHTML">
REJECT
</button>
</div>
</div>
{% endfor %}
{% endif %}
<script>
(function() {
var script = document.currentScript;
var prev = script.previousElementSibling;
// Skip queue-status div to find the agent message div
if (prev && prev.classList.contains('queue-status')) prev = prev.previousElementSibling;
// Skip approval cards and queue-status div to find the agent message div
while (prev && (prev.classList.contains('approval-card') || prev.classList.contains('queue-status'))) {
prev = prev.previousElementSibling;
}
var el = prev ? prev.querySelector('.timmy-md') : null;
if (el && typeof marked !== 'undefined' && typeof DOMPurify !== 'undefined') {
el.innerHTML = DOMPurify.sanitize(marked.parse(el.textContent));

View File

@@ -0,0 +1,15 @@
<div class="approval-card {{ status }}" id="tool-{{ approval_id }}">
<div class="approval-card-title">{{ tool_name }}</div>
{% if status == "approved" %}
<div class="text-success" style="font-size:0.82rem; font-family:'JetBrains Mono',monospace;">
&#x2713; Executed
</div>
{% if result %}
<pre class="tool-result" style="margin-top:0.5rem; padding:0.5rem; background:var(--bg-card, #1a1a2e); border-radius:4px; font-size:0.78rem; white-space:pre-wrap; overflow-x:auto; max-height:300px;">{{ result | e }}</pre>
{% endif %}
{% elif status == "rejected" %}
<div class="text-danger" style="font-size:0.82rem; font-family:'JetBrains Mono',monospace;">
&#x2717; Rejected
</div>
{% endif %}
</div>

View File

@@ -10,6 +10,7 @@ Architecture:
DiscordVendor
├── _client (discord.Client) — handles gateway events
├── _thread_map — channel_id -> active thread
├── _pending_actions — approval_id -> action details
└── _message_handler — bridges to Timmy agent
"""
@@ -17,7 +18,7 @@ import asyncio
import json
import logging
from pathlib import Path
from typing import Optional
from typing import Any, Optional
from integrations.chat_bridge.base import (
ChatMessage,
@@ -27,29 +28,75 @@ from integrations.chat_bridge.base import (
PlatformState,
PlatformStatus,
)
from timmy.session import _clean_response, chat_with_tools, continue_chat
from timmy.tool_safety import format_action_description as _format_action_description
from timmy.tool_safety import get_impact_level as _get_impact_level
logger = logging.getLogger(__name__)
_STATE_FILE = Path(__file__).parent.parent.parent.parent / "discord_state.json"
# Module-level agent singleton — reused across all Discord messages.
# Mirrors the pattern from timmy.session._agent.
_discord_agent = None
# ---------------------------------------------------------------------------
# Discord UI components (guarded — discord.py is optional)
# ---------------------------------------------------------------------------
try:
import discord as _discord_lib
_DISCORD_UI_AVAILABLE = True
except ImportError:
_DISCORD_UI_AVAILABLE = False
def _get_discord_agent():
"""Lazy-initialize the Discord agent singleton."""
global _discord_agent
if _discord_agent is None:
from timmy.agent import create_timmy
if _DISCORD_UI_AVAILABLE:
try:
_discord_agent = create_timmy()
logger.info("Discord: Timmy agent initialized (singleton)")
except Exception as exc:
logger.error("Discord: Failed to create Timmy agent: %s", exc)
raise
return _discord_agent
class ActionConfirmView(_discord_lib.ui.View):
"""Discord UI View with Approve and Reject buttons."""
def __init__(self, approval_id: str, vendor: "DiscordVendor"):
from config import settings
super().__init__(timeout=settings.discord_confirm_timeout)
self.approval_id = approval_id
self.vendor = vendor
@_discord_lib.ui.button(label="Approve", style=_discord_lib.ButtonStyle.green)
async def approve_button(self, interaction, button):
await self.vendor._on_action_approved(self.approval_id, interaction)
@_discord_lib.ui.button(label="Reject", style=_discord_lib.ButtonStyle.red)
async def reject_button(self, interaction, button):
await self.vendor._on_action_rejected(self.approval_id, interaction)
async def on_timeout(self):
"""Auto-reject on timeout."""
action = self.vendor._pending_actions.pop(self.approval_id, None)
if not action:
return
try:
from timmy.approvals import reject
reject(self.approval_id)
# Reject the requirement and resume so the agent knows
req = action.get("requirement")
if req:
req.reject(note="Timed out — auto-rejected")
await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
await action["target"].send(
f"Action `{action['tool_name']}` timed out and was auto-rejected."
)
except Exception:
pass
# ---------------------------------------------------------------------------
# DiscordVendor
# ---------------------------------------------------------------------------
class DiscordVendor(ChatPlatform):
@@ -66,6 +113,7 @@ class DiscordVendor(ChatPlatform):
self._task: Optional[asyncio.Task] = None
self._guild_count: int = 0
self._active_threads: dict[str, str] = {} # channel_id -> thread_id
self._pending_actions: dict[str, dict] = {} # approval_id -> action details
# ── ChatPlatform interface ─────────────────────────────────────────────
@@ -289,6 +337,108 @@ class DiscordVendor(ChatPlatform):
f"&permissions={permissions}"
)
# ── Action confirmation ────────────────────────────────────────────────
async def _send_confirmation(
self, target: Any, tool_name: str, tool_args: dict, approval_id: str
) -> None:
"""Send a confirmation message with Approve/Reject buttons."""
description = _format_action_description(tool_name, tool_args)
impact = _get_impact_level(tool_name)
if _DISCORD_UI_AVAILABLE:
import discord
embed = discord.Embed(
title="Action Confirmation Required",
description=description,
color=discord.Color.orange(),
)
embed.add_field(name="Tool", value=f"`{tool_name}`", inline=True)
embed.add_field(name="Impact", value=impact, inline=True)
embed.set_footer(text=f"Approval ID: {approval_id[:8]}")
view = ActionConfirmView(approval_id=approval_id, vendor=self)
msg = await target.send(embed=embed, view=view)
else:
# Fallback when discord.py UI components not available
msg = await target.send(
f"**Action Confirmation Required**\n"
f"{description}\n"
f"Tool: `{tool_name}` | Impact: {impact}\n"
f"_Reply 'approve {approval_id[:8]}' or 'reject {approval_id[:8]}'_"
)
self._pending_actions[approval_id] = {
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"message": msg,
}
async def _on_action_approved(self, approval_id: str, interaction: Any) -> None:
"""Confirm the tool and resume via Agno's continue_run."""
action = self._pending_actions.pop(approval_id, None)
if not action:
await interaction.response.send_message("Action already processed.", ephemeral=True)
return
from timmy.approvals import approve
approve(approval_id)
await interaction.response.send_message("Approved. Executing...", ephemeral=True)
target = action["target"]
tool_name = action["tool_name"]
# Confirm the requirement — Agno will execute the tool on continue_run
req = action["requirement"]
req.confirm()
try:
result_run = await asyncio.to_thread(
continue_chat, action["run_output"], action.get("session_id")
)
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
if getattr(te, "tool_name", None) == tool_name and getattr(te, "result", None):
tool_result = te.result
break
if not tool_result:
tool_result = getattr(result_run, "content", None) or "Tool executed successfully."
result_text = f"**{tool_name}** result:\n```\n{str(tool_result)[:1800]}\n```"
for chunk in _chunk_message(result_text, 2000):
await target.send(chunk)
except Exception as exc:
logger.error("Discord: tool execution failed: %s", exc)
await target.send(f"**{tool_name}** failed: `{exc}`")
async def _on_action_rejected(self, approval_id: str, interaction: Any) -> None:
"""Reject the pending action and notify the agent."""
action = self._pending_actions.pop(approval_id, None)
if not action:
await interaction.response.send_message("Action already processed.", ephemeral=True)
return
from timmy.approvals import reject
reject(approval_id)
# Reject the requirement and resume so the agent knows
req = action["requirement"]
req.reject(note="User rejected from Discord")
try:
await asyncio.to_thread(continue_chat, action["run_output"], action.get("session_id"))
except Exception:
pass
await interaction.response.send_message(
f"Rejected. `{action['tool_name']}` will not execute.", ephemeral=True
)
# ── Internal ───────────────────────────────────────────────────────────
async def _run_client(self, token: str) -> None:
@@ -354,38 +504,67 @@ class DiscordVendor(ChatPlatform):
session_id = f"discord_{message.channel.id}"
# Run Timmy agent with typing indicator and timeout
run_output = None
response = None
try:
agent = _get_discord_agent()
# Show typing indicator while the agent processes
async with target.typing():
run = await asyncio.wait_for(
asyncio.to_thread(agent.run, content, stream=False, session_id=session_id),
run_output = await asyncio.wait_for(
asyncio.to_thread(chat_with_tools, content, session_id),
timeout=300,
)
response = run.content if hasattr(run, "content") else str(run)
except asyncio.TimeoutError:
logger.error("Discord: agent.run() timed out after 300s")
logger.error("Discord: chat_with_tools() timed out after 300s")
response = "Sorry, that took too long. Please try a simpler request."
except Exception as exc:
logger.error("Discord: agent.run() failed: %s", exc)
logger.error("Discord: chat_with_tools() failed: %s", exc)
response = (
"I'm having trouble reaching my language model right now. Please try again shortly."
)
# Strip hallucinated tool-call JSON and chain-of-thought narration
from timmy.session import _clean_response
# Check if Agno paused the run for tool confirmation
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
response = _clean_response(response)
if is_paused and getattr(run_output, "active_requirements", None):
from config import settings
if settings.discord_confirm_actions:
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from timmy.approvals import create_item
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
# Discord has a 2000 character limit — send with error handling
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
if response and response.strip():
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
async def _get_or_create_thread(self, message):
"""Get the active thread for a channel, or create one.

View File

@@ -26,7 +26,7 @@ _agent = None
# Matches raw JSON tool calls: {"name": "python", "parameters": {...}}
_TOOL_CALL_JSON = re.compile(
r'\{\s*"name"\s*:\s*"[^"]+?"\s*,\s*"parameters"\s*:\s*\{.*?\}\s*\}',
r'\{\s*"name"\s*:\s*"[^"]+?"\s*,\s*"(?:parameters|arguments)"\s*:\s*\{.*?\}\s*\}',
re.DOTALL,
)
@@ -93,6 +93,78 @@ def chat(message: str, session_id: Optional[str] = None) -> str:
return response_text
def chat_with_tools(message: str, session_id: Optional[str] = None):
"""Send a message and return the full Agno RunOutput.
Callers should check ``run_output.status``:
- ``RunStatus.paused`` — tools need confirmation (see ``run_output.requirements``)
- ``RunStatus.completed`` — response ready in ``run_output.content``
Returns:
An Agno ``RunOutput`` object (or a lightweight surrogate on error).
"""
sid = session_id or _DEFAULT_SESSION_ID
agent = _get_agent()
_extract_facts(message)
try:
return agent.run(message, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.run() failed: %s", exc)
# Return a duck-typed object that callers can handle uniformly
return _ErrorRunOutput(
"I'm having trouble reaching my language model right now. Please try again shortly."
)
def continue_chat(run_output, session_id: Optional[str] = None):
"""Resume a paused run after tool confirmation / rejection.
Args:
run_output: The paused ``RunOutput`` returned by ``chat_with_tools()``.
Returns:
A new ``RunOutput`` with the resumed execution results.
"""
sid = session_id or _DEFAULT_SESSION_ID
agent = _get_agent()
try:
return agent.continue_run(run_response=run_output, stream=False, session_id=sid)
except Exception as exc:
logger.error("Session: agent.continue_run() failed: %s", exc)
return _ErrorRunOutput(f"Error continuing run: {exc}")
class _ErrorRunOutput:
"""Lightweight stand-in for RunOutput when the model is unreachable."""
def __init__(self, message: str):
self.content = message
self.status = "ERROR"
self.requirements = []
self.tools = []
@property
def active_requirements(self):
return []
def chat_raw(message: str, session_id: Optional[str] = None) -> tuple[str, str]:
"""Send a message and return both cleaned and raw responses.
Backward-compatible wrapper around :func:`chat_with_tools`.
Returns:
(cleaned_response, raw_response) — cleaned has tool-call JSON and
chain-of-thought stripped; raw is the model's original output.
"""
run = chat_with_tools(message, session_id)
raw_response = run.content if hasattr(run, "content") and run.content else ""
cleaned = _clean_response(raw_response)
return cleaned, raw_response
def reset_session(session_id: Optional[str] = None) -> None:
"""Reset a session (clear conversation context).

123
src/timmy/tool_safety.py Normal file
View File

@@ -0,0 +1,123 @@
"""Tool safety classification and tool-call extraction helpers.
Classifies tools into tiers based on their potential impact:
- DANGEROUS: Can modify filesystem, execute code, or change system state.
Requires user confirmation before execution.
- SAFE: Read-only or purely computational. Executes without confirmation.
Also provides shared helpers for extracting hallucinated tool calls from
model output and formatting them for human review. Used by both the
Discord vendor and the dashboard chat route.
"""
import json
import re
# ---------------------------------------------------------------------------
# Tool classification
# ---------------------------------------------------------------------------
# Tools that require confirmation before execution.
DANGEROUS_TOOLS = frozenset(
{
"shell",
"python",
"write_file",
"aider",
"plan_and_execute",
}
)
# Tools that are safe to execute without confirmation.
SAFE_TOOLS = frozenset(
{
"web_search",
"calculator",
"memory_search",
"memory_read",
"memory_write",
"read_file",
"list_files",
"consult_grok",
"get_system_info",
"check_ollama_health",
"get_memory_status",
"list_swarm_agents",
}
)
def requires_confirmation(tool_name: str) -> bool:
"""Check if a tool requires user confirmation before execution.
Unknown tools default to requiring confirmation (safe-by-default).
"""
if tool_name in SAFE_TOOLS:
return False
return True
# ---------------------------------------------------------------------------
# Tool call extraction from model output
# ---------------------------------------------------------------------------
_TOOL_CALL_RE = re.compile(
r'\{\s*"name"\s*:\s*"([^"]+?)"\s*,\s*"(?:parameters|arguments)"\s*:\s*(\{.*?\})\s*\}',
re.DOTALL,
)
def extract_tool_calls(text: str) -> list[tuple[str, dict]]:
"""Extract hallucinated tool calls from model output.
Returns list of (tool_name, arguments_dict) tuples.
Handles both ``"arguments"`` and ``"parameters"`` JSON keys.
"""
if not text:
return []
results = []
for match in _TOOL_CALL_RE.finditer(text):
tool_name = match.group(1)
try:
args = json.loads(match.group(2))
except json.JSONDecodeError:
continue
results.append((tool_name, args))
return results
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def format_action_description(tool_name: str, tool_args: dict) -> str:
"""Format a human-readable description of a tool action."""
if tool_name == "shell":
cmd = tool_args.get("command") or tool_args.get("args", "")
if isinstance(cmd, list):
cmd = " ".join(cmd)
return f"Run shell command:\n`{cmd}`"
elif tool_name == "write_file":
path = tool_args.get("file_name", "unknown")
size = len(tool_args.get("contents", ""))
return f"Write file: `{path}` ({size} chars)"
elif tool_name == "python":
code = tool_args.get("code", "")[:200]
return f"Execute Python:\n```python\n{code}\n```"
else:
args_str = json.dumps(tool_args, indent=2)[:300]
return f"Execute `{tool_name}` with args:\n```json\n{args_str}\n```"
def get_impact_level(tool_name: str) -> str:
"""Return the impact level for a tool (high, medium, or low)."""
high_impact = {"shell", "python"}
medium_impact = {"write_file", "aider", "plan_and_execute"}
if tool_name in high_impact:
return "high"
if tool_name in medium_impact:
return "medium"
return "low"

View File

@@ -26,7 +26,6 @@ logger = logging.getLogger(__name__)
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
@@ -36,6 +35,15 @@ except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# DuckDuckGo is optional — don't let it kill all tools
try:
from agno.tools.duckduckgo import DuckDuckGoTools
_DUCKDUCKGO_AVAILABLE = True
except ImportError:
_DUCKDUCKGO_AVAILABLE = False
DuckDuckGoTools = None # type: ignore[assignment, misc]
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@@ -142,8 +150,9 @@ def create_research_tools(base_dir: str | Path | None = None):
toolkit = Toolkit(name="research")
# Web search via DuckDuckGo
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
# File reading
from config import settings
@@ -262,8 +271,9 @@ def create_data_tools(base_dir: str | Path | None = None):
toolkit.register(file_tools.list_files, name="list_files")
# Web search for finding datasets
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
return toolkit
@@ -301,8 +311,9 @@ def create_security_tools(base_dir: str | Path | None = None):
toolkit.register(shell_tools.run_shell_command, name="shell")
# Web search for threat intelligence
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
@@ -403,11 +414,20 @@ def create_full_toolkit(base_dir: str | Path | None = None):
if not _AGNO_TOOLS_AVAILABLE:
# Return None when tools aren't available (tests)
return None
toolkit = Toolkit(name="full")
# Web search
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
from timmy.tool_safety import DANGEROUS_TOOLS
toolkit = Toolkit(
name="full",
requires_confirmation_tools=list(DANGEROUS_TOOLS),
)
# Web search (optional — degrades gracefully if ddgs not installed)
if _DUCKDUCKGO_AVAILABLE:
search_tools = DuckDuckGoTools()
toolkit.register(search_tools.web_search, name="web_search")
else:
logger.info("DuckDuckGo tools unavailable (ddgs not installed) — skipping web_search")
# Python execution
python_tools = PythonTools()

View File

@@ -14,20 +14,14 @@ try:
except ImportError:
import conftest_markers # noqa: F401
# ── Stub heavy optional dependencies so tests run without them installed ──────
# Uses setdefault: real module is used if already installed, mock otherwise.
# Stub heavy optional dependencies so tests run without them installed.
# Uses setdefault: real module is used if already installed, mock otherwise.
# Note: only stub packages that are truly optional and may not be installed.
# Packages like typer, httpx, fastapi are required deps — never stub those.
# ── Stub heavy optional dependencies so unit tests run without them ────────────
# Only stub truly optional packages that may not be installed.
# agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers.
for _mod in [
"agno",
"agno.agent",
"agno.models",
"agno.models.ollama",
"agno.db",
"agno.db.sqlite",
"airllm",
"mcp",
"mcp.registry",
"telegram",
"telegram.ext",
"discord",
@@ -40,6 +34,13 @@ for _mod in [
]:
sys.modules.setdefault(_mod, MagicMock())
# mcp.registry needs a tool_registry with get_handler (used by timmy.agents.base)
_mcp_reg = sys.modules.get("mcp.registry")
if _mcp_reg is not None and not hasattr(_mcp_reg, "tool_registry"):
_mock_tool_reg = MagicMock()
_mock_tool_reg.get_handler.return_value = None
_mcp_reg.tool_registry = _mock_tool_reg
# ── Test mode setup ──────────────────────────────────────────────────────────
os.environ["TIMMY_TEST_MODE"] = "1"
os.environ["TIMMY_DISABLE_CSRF"] = "1"

View File

@@ -0,0 +1,203 @@
"""Tests for dashboard tool confirmation flow using native Agno RunOutput."""
from unittest.mock import MagicMock, patch
import pytest
def _mock_completed_run(content="Just a reply."):
"""Create a mock RunOutput for a completed (no tool) run."""
run = MagicMock()
run.content = content
run.status = "COMPLETED"
run.active_requirements = []
return run
def _mock_paused_run(tool_name="shell", tool_args=None, content="Sure, I can do that."):
"""Create a mock RunOutput for a paused run needing tool confirmation."""
tool_args = tool_args or {"command": "echo hello"}
te = MagicMock()
te.tool_name = tool_name
te.tool_args = tool_args
req = MagicMock()
req.needs_confirmation = True
req.tool_execution = te
run = MagicMock()
run.content = content
run.status = "PAUSED"
run.active_requirements = [req]
return run, req
def _mock_approval_item(item_id="test-approval-123"):
"""Create a mock ApprovalItem."""
item = MagicMock()
item.id = item_id
return item
# ── Chat returns tool actions ────────────────────────────────────────────────
def test_chat_with_tool_call_shows_approval_card(client):
"""When Agno pauses for tool confirmation, the response includes an approval card."""
run, _req = _mock_paused_run()
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run echo hello"})
assert response.status_code == 200
assert "APPROVE" in response.text
assert "REJECT" in response.text
assert "shell" in response.text
def test_chat_without_tool_call_has_no_approval_card(client):
"""Normal responses without tool calls should not show approval buttons."""
run = _mock_completed_run()
with patch("dashboard.routes.agents.chat_with_tools", return_value=run):
response = client.post("/agents/default/chat", data={"message": "hello"})
assert response.status_code == 200
assert "APPROVE" not in response.text
assert "REJECT" not in response.text
def test_chat_tool_card_contains_impact_badge(client):
"""Tool approval cards should show impact level (shell = high)."""
run, _req = _mock_paused_run()
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
assert "impact-high" in response.text
def test_chat_tool_card_has_htmx_approve_endpoint(client):
"""Approval card buttons should target the correct HTMX endpoints."""
run, _req = _mock_paused_run()
item = _mock_approval_item()
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
assert 'hx-post="/agents/default/tool/' in response.text
assert '/approve"' in response.text
assert '/reject"' in response.text
# ── Approve endpoint ─────────────────────────────────────────────────────────
def _create_pending_tool(client, approval_id="test-approval-123"):
"""Helper: send a chat that creates a pending tool, return the approval_id."""
run, _req = _mock_paused_run()
item = _mock_approval_item(approval_id)
with (
patch("dashboard.routes.agents.chat_with_tools", return_value=run),
patch("timmy.approvals.create_item", return_value=item),
):
response = client.post("/agents/default/chat", data={"message": "run it"})
assert 'hx-post="/agents/default/tool/' in response.text
return approval_id
def test_approve_executes_tool_and_returns_result(client):
"""Approving a tool should resume via Agno and return the result card."""
approval_id = _create_pending_tool(client)
# Mock the resume returning a completed run with tool result
result_run = MagicMock()
result_tool = MagicMock()
result_tool.tool_name = "shell"
result_tool.result = "hello\n"
result_run.tools = [result_tool]
result_run.content = "Done."
with (
patch("dashboard.routes.agents.continue_chat", return_value=result_run),
patch("timmy.approvals.approve"),
):
response = client.post(f"/agents/default/tool/{approval_id}/approve")
assert response.status_code == 200
assert "hello" in response.text
def test_approve_unknown_id_returns_404(client):
"""Approving a non-existent tool action should return 404."""
response = client.post("/agents/default/tool/nonexistent-id/approve")
assert response.status_code == 404
def test_approve_same_id_twice_returns_404(client):
"""Each approval can only be used once."""
approval_id = _create_pending_tool(client)
result_run = _mock_completed_run("ok")
with (
patch("dashboard.routes.agents.continue_chat", return_value=result_run),
patch("timmy.approvals.approve"),
):
client.post(f"/agents/default/tool/{approval_id}/approve")
# Second attempt should 404
response = client.post(f"/agents/default/tool/{approval_id}/approve")
assert response.status_code == 404
# ── Reject endpoint ──────────────────────────────────────────────────────────
def test_reject_returns_rejected_card(client):
"""Rejecting a tool should return a rejected status card."""
approval_id = _create_pending_tool(client)
with (
patch("dashboard.routes.agents.continue_chat", return_value=_mock_completed_run()),
patch("timmy.approvals.reject"),
):
response = client.post(f"/agents/default/tool/{approval_id}/reject")
assert response.status_code == 200
assert "Rejected" in response.text
def test_reject_unknown_id_still_returns_200(client):
"""Rejecting a non-existent ID is idempotent (no crash)."""
with patch("timmy.approvals.reject"):
response = client.post("/agents/default/tool/nonexistent-id/reject")
assert response.status_code == 200
# ── Safe tools skip confirmation ─────────────────────────────────────────────
def test_safe_tool_does_not_show_approval(client):
"""Completed runs (safe tools auto-executed) should not show approval cards."""
run = _mock_completed_run("Here is the file content.")
with patch("dashboard.routes.agents.chat_with_tools", return_value=run):
response = client.post("/agents/default/chat", data={"message": "read the readme"})
assert "APPROVE" not in response.text
assert "REJECT" not in response.text

View File

@@ -1,4 +1,4 @@
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
# ── Index ─────────────────────────────────────────────────────────────────────
@@ -92,11 +92,17 @@ def test_agents_list_metadata(client):
# ── Chat ──────────────────────────────────────────────────────────────────────
def _mock_run(content="Operational and ready."):
"""Create a mock RunOutput for a completed run."""
run = MagicMock()
run.content = content
run.status = "COMPLETED"
run.active_requirements = []
return run
def test_chat_agent_success(client):
with patch(
"dashboard.routes.agents.agent_chat",
return_value="Operational and ready.",
):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run()):
response = client.post("/agents/default/chat", data={"message": "status?"})
assert response.status_code == 200
@@ -105,15 +111,19 @@ def test_chat_agent_success(client):
def test_chat_agent_shows_user_message(client):
with patch("dashboard.routes.agents.agent_chat", return_value="Acknowledged."):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("Acknowledged.")):
response = client.post("/agents/default/chat", data={"message": "hello there"})
assert "hello there" in response.text
def test_chat_agent_ollama_offline(client):
# Without Ollama, chat returns an error but still shows the user message.
response = client.post("/agents/default/chat", data={"message": "ping"})
# When Ollama is unreachable, chat shows the user message + error.
with patch(
"dashboard.routes.agents.chat_with_tools",
side_effect=Exception("Ollama unreachable"),
):
response = client.post("/agents/default/chat", data={"message": "ping"})
assert response.status_code == 200
assert "ping" in response.text
@@ -134,7 +144,9 @@ def test_history_empty_shows_init_message(client):
def test_history_records_user_and_agent_messages(client):
with patch("dashboard.routes.agents.agent_chat", return_value="I am operational."):
with patch(
"dashboard.routes.agents.chat_with_tools", return_value=_mock_run("I am operational.")
):
client.post("/agents/default/chat", data={"message": "status check"})
response = client.get("/agents/default/history")
@@ -142,14 +154,18 @@ def test_history_records_user_and_agent_messages(client):
def test_history_records_error_when_offline(client):
client.post("/agents/default/chat", data={"message": "ping"})
with patch(
"dashboard.routes.agents.chat_with_tools",
side_effect=Exception("Ollama unreachable"),
):
client.post("/agents/default/chat", data={"message": "ping"})
response = client.get("/agents/default/history")
assert "ping" in response.text
def test_history_clear_resets_to_init_message(client):
with patch("dashboard.routes.agents.agent_chat", return_value="Acknowledged."):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("Acknowledged.")):
client.post("/agents/default/chat", data={"message": "hello"})
response = client.delete("/agents/default/history")
@@ -158,7 +174,7 @@ def test_history_clear_resets_to_init_message(client):
def test_history_empty_after_clear(client):
with patch("dashboard.routes.agents.agent_chat", return_value="OK."):
with patch("dashboard.routes.agents.chat_with_tools", return_value=_mock_run("OK.")):
client.post("/agents/default/chat", data={"message": "test"})
client.delete("/agents/default/history")

View File

@@ -1,13 +1,35 @@
"""Shared fixtures for functional/E2E tests."""
import importlib
import os
import subprocess
import sys
import time
import urllib.request
from unittest.mock import MagicMock
import pytest
# ── Un-stub agno for functional tests ─────────────────────────────────────────
# Root conftest stubs agno with MagicMock for unit tests.
# Functional tests need real agno (tool execution, agent creation, etc.).
_agno_mods = [
"agno",
"agno.agent",
"agno.models",
"agno.models.ollama",
"agno.db",
"agno.db.sqlite",
"agno.tools",
"agno.tools.shell",
"agno.tools.python",
"agno.tools.file",
]
for _mod in _agno_mods:
if _mod in sys.modules and isinstance(sys.modules[_mod], MagicMock):
del sys.modules[_mod]
import agno # noqa: E402 — force real import
# Default dashboard URL - override with DASHBOARD_URL env var
DASHBOARD_URL = os.environ.get("DASHBOARD_URL", "http://localhost:8000")

View File

@@ -0,0 +1,275 @@
"""Tests for Discord action confirmation system using native Agno RunOutput.
Covers tool safety classification, formatting, impact levels,
and the confirmation flow in _handle_message.
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# _format_action_description (imported from tool_safety)
# ---------------------------------------------------------------------------
class TestFormatActionDescription:
def test_shell_command_string(self):
from integrations.chat_bridge.vendors.discord import _format_action_description
desc = _format_action_description("shell", {"command": "ls -la /tmp"})
assert "ls -la /tmp" in desc
def test_shell_command_list(self):
from integrations.chat_bridge.vendors.discord import _format_action_description
desc = _format_action_description("shell", {"args": ["mkdir", "-p", "/tmp/test"]})
assert "mkdir -p /tmp/test" in desc
def test_write_file(self):
from integrations.chat_bridge.vendors.discord import _format_action_description
desc = _format_action_description(
"write_file", {"file_name": "/tmp/foo.md", "contents": "hello world"}
)
assert "/tmp/foo.md" in desc
assert "11 chars" in desc
def test_python_code(self):
from integrations.chat_bridge.vendors.discord import _format_action_description
desc = _format_action_description("python", {"code": "print(42)"})
assert "print(42)" in desc
def test_unknown_tool(self):
from integrations.chat_bridge.vendors.discord import _format_action_description
desc = _format_action_description("custom_tool", {"key": "value"})
assert "custom_tool" in desc
# ---------------------------------------------------------------------------
# _get_impact_level (imported from tool_safety)
# ---------------------------------------------------------------------------
class TestGetImpactLevel:
def test_high_impact(self):
from integrations.chat_bridge.vendors.discord import _get_impact_level
assert _get_impact_level("shell") == "high"
assert _get_impact_level("python") == "high"
def test_medium_impact(self):
from integrations.chat_bridge.vendors.discord import _get_impact_level
assert _get_impact_level("write_file") == "medium"
assert _get_impact_level("aider") == "medium"
def test_low_impact(self):
from integrations.chat_bridge.vendors.discord import _get_impact_level
assert _get_impact_level("web_search") == "low"
assert _get_impact_level("unknown") == "low"
# ---------------------------------------------------------------------------
# Tool safety classification
# ---------------------------------------------------------------------------
class TestToolSafety:
def test_shell_requires_confirmation(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("shell") is True
def test_python_requires_confirmation(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("python") is True
def test_write_file_requires_confirmation(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("write_file") is True
def test_read_file_is_safe(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("read_file") is False
def test_calculator_is_safe(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("calculator") is False
def test_web_search_is_safe(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("web_search") is False
def test_unknown_tool_requires_confirmation(self):
from timmy.tool_safety import requires_confirmation
assert requires_confirmation("unknown_tool") is True
# ---------------------------------------------------------------------------
# _handle_message confirmation flow (native Agno RunOutput)
# ---------------------------------------------------------------------------
def _mock_paused_run(tool_name="shell", tool_args=None, content="I will create the dir."):
"""Create a mock RunOutput for a paused run needing tool confirmation."""
tool_args = tool_args or {"args": ["mkdir", "/tmp/test"]}
te = MagicMock()
te.tool_name = tool_name
te.tool_args = tool_args
req = MagicMock()
req.needs_confirmation = True
req.tool_execution = te
run = MagicMock()
run.content = content
run.status = "PAUSED"
run.active_requirements = [req]
return run
def _mock_completed_run(content="Hello! How can I help?"):
"""Create a mock RunOutput for a completed (no tool) run."""
run = MagicMock()
run.content = content
run.status = "COMPLETED"
run.active_requirements = []
return run
class TestHandleMessageConfirmation:
@pytest.mark.asyncio
async def test_dangerous_tool_sends_confirmation(self, monkeypatch):
"""When Agno pauses for tool confirmation, should send confirmation prompt."""
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
# Mock chat_with_tools returning a paused RunOutput
paused_run = _mock_paused_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: paused_run,
)
vendor._client = MagicMock()
vendor._client.user = MagicMock()
vendor._client.user.id = 12345
message = MagicMock()
message.content = "create a directory"
message.channel = MagicMock()
message.channel.guild = MagicMock()
monkeypatch.setattr(vendor, "_get_or_create_thread", AsyncMock(return_value=None))
ctx = AsyncMock()
ctx.__aenter__ = AsyncMock(return_value=None)
ctx.__aexit__ = AsyncMock(return_value=False)
message.channel.typing = MagicMock(return_value=ctx)
message.channel.send = AsyncMock()
# Mock approvals
mock_item = MagicMock()
mock_item.id = "test-approval-id-1234"
monkeypatch.setattr(
"timmy.approvals.create_item",
lambda **kwargs: mock_item,
)
vendor._send_confirmation = AsyncMock()
await vendor._handle_message(message)
# Should have called _send_confirmation for the shell tool
vendor._send_confirmation.assert_called_once()
call_args = vendor._send_confirmation.call_args
assert call_args[0][1] == "shell" # tool_name
assert call_args[0][3] == "test-approval-id-1234" # approval_id
@pytest.mark.asyncio
async def test_no_tool_calls_sends_normal_response(self, monkeypatch):
"""When Agno returns a completed run, should send text directly."""
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
completed_run = _mock_completed_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: completed_run,
)
vendor._client = MagicMock()
vendor._client.user = MagicMock()
vendor._client.user.id = 12345
message = MagicMock()
message.content = "hello"
message.channel = MagicMock()
message.channel.guild = MagicMock()
monkeypatch.setattr(vendor, "_get_or_create_thread", AsyncMock(return_value=None))
ctx = AsyncMock()
ctx.__aenter__ = AsyncMock(return_value=None)
ctx.__aexit__ = AsyncMock(return_value=False)
message.channel.typing = MagicMock(return_value=ctx)
message.channel.send = AsyncMock()
await vendor._handle_message(message)
# Should send the text response directly (no confirmation)
message.channel.send.assert_called()
sent_text = message.channel.send.call_args_list[-1][0][0]
assert "Hello" in sent_text
@pytest.mark.asyncio
async def test_confirmation_disabled_via_config(self, monkeypatch):
"""When discord_confirm_actions=False, no confirmation prompts sent."""
from config import settings
from integrations.chat_bridge.vendors.discord import DiscordVendor
monkeypatch.setattr(settings, "discord_confirm_actions", False)
vendor = DiscordVendor()
paused_run = _mock_paused_run()
monkeypatch.setattr(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
lambda msg, sid=None: paused_run,
)
vendor._client = MagicMock()
vendor._client.user = MagicMock()
vendor._client.user.id = 12345
message = MagicMock()
message.content = "do something"
message.channel = MagicMock()
message.channel.guild = MagicMock()
monkeypatch.setattr(vendor, "_get_or_create_thread", AsyncMock(return_value=None))
ctx = AsyncMock()
ctx.__aenter__ = AsyncMock(return_value=None)
ctx.__aexit__ = AsyncMock(return_value=False)
message.channel.typing = MagicMock(return_value=ctx)
message.channel.send = AsyncMock()
vendor._send_confirmation = AsyncMock()
await vendor._handle_message(message)
# Should NOT call _send_confirmation
vendor._send_confirmation.assert_not_called()

View File

@@ -232,16 +232,21 @@ def test_model_supports_tools_unknown_model_gets_tools():
def test_create_timmy_no_tools_for_small_model():
"""llama3.2 should get no tools."""
"""Small models (llama3.2) should get no tools."""
mock_toolkit = MagicMock()
with patch("timmy.agent.Agent") as MockAgent, patch("timmy.agent.Ollama"), patch(
"timmy.agent.SqliteDb"
), patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit), patch(
"timmy.agent._resolve_model_with_fallback", return_value=("llama3.2:3b", False)
), patch(
"timmy.agent._check_model_available", return_value=True
):
from timmy.agent import create_timmy
create_timmy()
kwargs = MockAgent.call_args.kwargs
# Default model is llama3.2 → tools should be None
# llama3.2 is in _SMALL_MODEL_PATTERNS → tools should be None
assert kwargs["tools"] is None

View File

@@ -5,11 +5,15 @@ This caused socket read errors in production. The agno Ollama class uses
``timeout`` (not ``request_timeout``).
"""
import importlib
from unittest.mock import MagicMock, patch
def test_base_agent_sets_timeout():
"""BaseAgent creates Ollama with timeout=300."""
# Ensure module is loaded before patching — prevents xdist ordering issues
importlib.import_module("timmy.agents.base")
with patch("timmy.agents.base.Ollama") as mock_ollama, patch("timmy.agents.base.Agent"):
mock_ollama.return_value = MagicMock()

View File

@@ -132,6 +132,18 @@ def test_clean_response_strips_json_tool_calls():
assert "The result is 577." in clean
def test_clean_response_strips_arguments_format():
"""JSON tool calls using 'arguments' key (OpenAI format) should also be removed."""
from timmy.session import _clean_response
dirty = 'Here is the result. {"name": "shell", "arguments": {"args": ["mkdir", "-p", "/tmp/test"]}} The directory was created.'
clean = _clean_response(dirty)
assert '{"name"' not in clean
assert '"arguments"' not in clean
assert "The directory was created." in clean
def test_clean_response_strips_function_calls():
"""Function-call-style text should be removed."""
from timmy.session import _clean_response

View File

@@ -111,21 +111,20 @@ class TestPersonaToolkits:
}
assert set(PERSONA_TOOLKITS.keys()) == expected
def test_get_tools_for_known_persona_raises_without_agno(self):
"""Agno is mocked but not a real package, so create_*_tools raises ImportError."""
with pytest.raises(ImportError, match="Agno tools not available"):
get_tools_for_persona("echo")
def test_get_tools_for_known_persona_returns_toolkit(self):
"""Known personas should return a Toolkit with registered tools."""
result = get_tools_for_persona("echo")
assert result is not None
def test_get_tools_for_unknown_persona(self):
result = get_tools_for_persona("nonexistent")
assert result is None
def test_creative_personas_return_none(self):
"""Creative personas (pixel, lyra, reel) use stub toolkits that
return None when Agno is unavailable."""
def test_creative_personas_return_toolkit(self):
"""Creative personas (pixel, lyra, reel) return toolkits."""
for persona_id in ("pixel", "lyra", "reel"):
result = get_tools_for_persona(persona_id)
assert result is None
assert result is not None
# ── Tool catalog ─────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,115 @@
"""Tests for timmy.tool_safety — classification, extraction, and formatting."""
import pytest
from timmy.tool_safety import (
extract_tool_calls,
format_action_description,
get_impact_level,
requires_confirmation,
)
# ---------------------------------------------------------------------------
# requires_confirmation
# ---------------------------------------------------------------------------
class TestRequiresConfirmation:
def test_dangerous_tools(self):
for tool in ("shell", "python", "write_file", "aider", "plan_and_execute"):
assert requires_confirmation(tool) is True
def test_safe_tools(self):
for tool in ("web_search", "calculator", "read_file", "list_files"):
assert requires_confirmation(tool) is False
def test_unknown_defaults_to_dangerous(self):
assert requires_confirmation("totally_unknown") is True
# ---------------------------------------------------------------------------
# extract_tool_calls
# ---------------------------------------------------------------------------
class TestExtractToolCalls:
def test_arguments_format(self):
text = (
'Creating dir. {"name": "shell", "arguments": {"args": ["mkdir", "-p", "/tmp/test"]}}'
)
calls = extract_tool_calls(text)
assert len(calls) == 1
assert calls[0][0] == "shell"
assert calls[0][1]["args"] == ["mkdir", "-p", "/tmp/test"]
def test_parameters_format(self):
text = 'Result: {"name": "python", "parameters": {"code": "print(1+1)"}}'
calls = extract_tool_calls(text)
assert len(calls) == 1
assert calls[0][0] == "python"
def test_multiple_calls(self):
text = (
'Step 1: {"name": "shell", "arguments": {"args": ["mkdir", "/tmp/a"]}} '
'Step 2: {"name": "write_file", "arguments": {"file_name": "/tmp/a/f.md", "contents": "hi"}}'
)
calls = extract_tool_calls(text)
assert len(calls) == 2
def test_empty_and_none(self):
assert extract_tool_calls("") == []
assert extract_tool_calls(None) == []
assert extract_tool_calls("Just normal text.") == []
def test_malformed_json(self):
text = '{"name": "shell", "arguments": {not valid json}}'
assert extract_tool_calls(text) == []
# ---------------------------------------------------------------------------
# format_action_description
# ---------------------------------------------------------------------------
class TestFormatActionDescription:
def test_shell_command(self):
desc = format_action_description("shell", {"command": "ls -la /tmp"})
assert "ls -la /tmp" in desc
def test_shell_args_list(self):
desc = format_action_description("shell", {"args": ["mkdir", "-p", "/tmp/t"]})
assert "mkdir -p /tmp/t" in desc
def test_write_file(self):
desc = format_action_description(
"write_file", {"file_name": "/tmp/f.md", "contents": "hello world"}
)
assert "/tmp/f.md" in desc
assert "11 chars" in desc
def test_python(self):
desc = format_action_description("python", {"code": "print(42)"})
assert "print(42)" in desc
def test_unknown_tool(self):
desc = format_action_description("custom_tool", {"key": "value"})
assert "custom_tool" in desc
# ---------------------------------------------------------------------------
# get_impact_level
# ---------------------------------------------------------------------------
class TestGetImpactLevel:
def test_high(self):
assert get_impact_level("shell") == "high"
assert get_impact_level("python") == "high"
def test_medium(self):
assert get_impact_level("write_file") == "medium"
assert get_impact_level("aider") == "medium"
def test_low(self):
assert get_impact_level("web_search") == "low"
assert get_impact_level("unknown") == "low"