Files
Timmy-time-dashboard/src/dashboard/routes/agents.py
Claude (Opus 4.6) cd1bc2bf6b
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
[claude] Add agent emotional state simulation (#1013) (#1144)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 18:36:52 +00:00

312 lines
10 KiB
Python

import json
import logging
from datetime import datetime
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse
from dashboard.store import message_log
from dashboard.templating import templates
from timmy.session import _clean_response, chat_with_tools, continue_chat
from timmy.tool_safety import (
format_action_description,
get_impact_level,
)
from timmy.welcome import WELCOME_MESSAGE
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/agents", tags=["agents"])
MAX_MESSAGE_LENGTH = 10_000 # chars — reject before hitting the model
# In-memory store for paused runs (approval_id -> run context).
# Each entry holds the RunOutput, the RunRequirement ref, and tool metadata.
_pending_runs: dict[str, dict] = {}
@router.get("")
async def list_agents():
"""Return registered agents."""
from config import settings
return {
"agents": [
{
"id": "default",
"name": settings.agent_name,
"status": "idle",
"capabilities": "chat,reasoning,research,planning",
"type": "local",
"model": settings.ollama_model,
"backend": "ollama",
"version": "1.0.0",
}
]
}
@router.get("/emotional-profile", response_class=HTMLResponse)
async def emotional_profile(request: Request):
"""HTMX partial: render emotional profiles for all loaded agents."""
try:
from timmy.agents.loader import load_agents
agents = load_agents()
profiles = []
for agent_id, agent in agents.items():
profile = agent.emotional_state.get_profile()
profile["agent_id"] = agent_id
profile["agent_name"] = agent.name
profiles.append(profile)
except Exception as exc:
logger.warning("Failed to load emotional profiles: %s", exc)
profiles = []
return templates.TemplateResponse(
request,
"partials/emotional_profile.html",
{"profiles": profiles},
)
@router.get("/emotional-profile/json")
async def emotional_profile_json():
"""JSON API: return emotional profiles for all loaded agents."""
try:
from timmy.agents.loader import load_agents
agents = load_agents()
profiles = []
for agent_id, agent in agents.items():
profile = agent.emotional_state.get_profile()
profile["agent_id"] = agent_id
profile["agent_name"] = agent.name
profiles.append(profile)
return {"profiles": profiles}
except Exception as exc:
logger.warning("Failed to load emotional profiles: %s", exc)
return {"profiles": [], "error": str(exc)}
@router.get("/default/panel", response_class=HTMLResponse)
async def agent_panel(request: Request):
"""Chat panel — for HTMX main-panel swaps."""
return templates.TemplateResponse(request, "partials/agent_panel_chat.html", {"agent": None})
@router.get("/default/history", response_class=HTMLResponse)
async def get_history(request: Request):
return templates.TemplateResponse(
request,
"partials/history.html",
{"messages": message_log.all(), "welcome_message": WELCOME_MESSAGE},
)
@router.delete("/default/history", response_class=HTMLResponse)
async def clear_history(request: Request):
message_log.clear()
return templates.TemplateResponse(
request,
"partials/history.html",
{"messages": [], "welcome_message": WELCOME_MESSAGE},
)
def _validate_message(message: str) -> str:
"""Strip and validate chat input; raise HTTPException on bad input."""
from fastapi import HTTPException
message = message.strip()
if not message:
raise HTTPException(status_code=400, detail="Message cannot be empty")
if len(message) > MAX_MESSAGE_LENGTH:
raise HTTPException(status_code=422, detail="Message too long")
return message
def _record_user_activity() -> None:
"""Notify the thinking engine that the user is active."""
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
def _extract_tool_actions(run_output) -> list[dict]:
"""If Agno paused the run for tool confirmation, build approval items."""
from timmy.approvals import create_item
tool_actions: list[dict] = []
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if not (is_paused and getattr(run_output, "active_requirements", None)):
return tool_actions
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
return tool_actions
def _log_exchange(
message: str, response_text: str | None, error_text: str | None, timestamp: str
) -> None:
"""Append user message and agent/error reply to the in-memory log."""
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
message = _validate_message(message)
_record_user_activity()
timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None
error_text = None
try:
run_output = await chat_with_tools(message)
except Exception as exc:
logger.error("Chat error: %s", exc)
error_text = f"Chat error: {exc}"
run_output = None
tool_actions: list[dict] = []
if run_output is not None:
tool_actions = _extract_tool_actions(run_output)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response_text = _clean_response(raw_content or "")
if not response_text and not tool_actions:
response_text = None
_log_exchange(message, response_text, error_text, timestamp)
return templates.TemplateResponse(
request,
"partials/chat_message.html",
{
"user_message": message,
"response": response_text,
"error": error_text,
"timestamp": timestamp,
"task_id": None,
"queue_info": None,
"tool_actions": tool_actions,
},
)
@router.post("/default/tool/{approval_id}/approve", response_class=HTMLResponse)
async def approve_tool(request: Request, approval_id: str):
"""Confirm a paused tool and resume execution via Agno."""
from timmy.approvals import approve
pending = _pending_runs.pop(approval_id, None)
if not pending:
return HTMLResponse(
"<p class='text-danger'>Action not found or already processed.</p>",
status_code=404,
)
approve(approval_id)
tool_name = pending["tool_name"]
# Confirm the requirement — Agno will execute the tool on continue_run
req = pending["requirement"]
req.confirm()
try:
result_run = await continue_chat(pending["run_output"])
# Extract tool result from the resumed run
tool_result = ""
for te in getattr(result_run, "tools", None) or []:
if getattr(te, "tool_name", None) == tool_name and getattr(te, "result", None):
tool_result = te.result
break
if not tool_result:
tool_result = getattr(result_run, "content", None) or "Tool executed successfully."
except Exception as exc:
logger.error("Tool execution failed: %s", exc)
tool_result = f"Error: {exc}"
return templates.TemplateResponse(
request,
"partials/chat_tool_result.html",
{
"approval_id": approval_id,
"tool_name": tool_name,
"status": "approved",
"result": str(tool_result)[:2000],
},
)
@router.post("/default/tool/{approval_id}/reject", response_class=HTMLResponse)
async def reject_tool(request: Request, approval_id: str):
"""Reject a pending tool action."""
from timmy.approvals import reject
pending = _pending_runs.pop(approval_id, None)
tool_name = "action"
if pending:
tool_name = pending["tool_name"]
req = pending["requirement"]
req.reject(note="User rejected from dashboard")
# Resume so the agent knows the tool was rejected
try:
await continue_chat(pending["run_output"])
except Exception as exc:
logger.warning("Agent tool rejection error: %s", exc)
pass
reject(approval_id)
return templates.TemplateResponse(
request,
"partials/chat_tool_result.html",
{
"approval_id": approval_id,
"tool_name": tool_name,
"status": "rejected",
"result": "",
},
)