forked from Rockachopa/Timmy-time-dashboard
feat: Timmy system introspection, delegation, and session logging (#74)
* test: remove hardcoded sleeps, add pytest-timeout
- Replace fixed time.sleep() calls with intelligent polling or WebDriverWait
- Add pytest-timeout dependency and --timeout=30 to prevent hangs
- Fixes test flakiness and improves test suite speed
* feat: add Aider AI tool to Forge's toolkit
- Add Aider tool that calls local Ollama (qwen2.5:14b) for AI coding assist
- Register tool in Forge's code toolkit
- Add functional tests for the Aider tool
* config: add opencode.json with local Ollama provider for sovereign AI
* feat: Timmy fixes and improvements
## Bug Fixes
- Fix read_file path resolution: add ~ expansion, proper relative path handling
- Add repo_root to config.py with auto-detection from .git location
- Fix hardcoded llama3.2 - now dynamic from settings.ollama_model
## Timmy's Requests
- Add communication protocol to AGENTS.md (read context first, explain changes)
- Create DECISIONS.md for architectural decision documentation
- Add reasoning guidance to system prompts (step-by-step, state uncertainty)
- Update tests to reflect correct model name (llama3.1:8b-instruct)
## Testing
- All 177 dashboard tests pass
- All 32 prompt/tool tests pass
* feat: Timmy system introspection, delegation, and session logging
## System Introspection (Sovereign Self-Knowledge)
- Add get_system_info() tool - Timmy can now query his runtime environment
- Add check_ollama_health() - verify Ollama status
- Add get_memory_status() - check memory tier status
- True introspection vs hardcoded prompts
## Path Resolution Fix
- Fix all toolkits to use settings.repo_root consistently
- Now uses Path(settings.repo_root) instead of Path.cwd()
## Inter-Agent Delegation
- Add delegate_task() tool - Timmy can dispatch to Seer, Forge, Echo, etc.
- Add list_swarm_agents() - query available agents
## Session Logging
- Add SessionLogger for comprehensive interaction logging
- Records messages, tool calls, errors, decisions
- Writes to /logs/session_{date}.jsonl
## Tests
- Add tests for introspection tools
- Add tests for delegation tools
- Add tests for session logging
- Add tests for path resolution
- All 18 new tests pass
- All 177 dashboard tests pass
---------
Co-authored-by: Alexander Payne <apayne@MM.local>
This commit is contained in:
committed by
GitHub
parent
5e60a6453b
commit
a975a845c5
187
src/timmy/session_logger.py
Normal file
187
src/timmy/session_logger.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Session logging for Timmy - captures interactions, errors, and decisions.
|
||||
|
||||
Timmy requested: "I'd love to see a detailed log of all my interactions,
|
||||
including any mistakes or errors that occur during the session."
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, date
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SessionLogger:
|
||||
"""Logs Timmy's interactions to a session file."""
|
||||
|
||||
def __init__(self, logs_dir: str | Path | None = None):
|
||||
"""Initialize session logger.
|
||||
|
||||
Args:
|
||||
logs_dir: Directory for log files. Defaults to /logs in repo root.
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
if logs_dir is None:
|
||||
self.logs_dir = Path(settings.repo_root) / "logs"
|
||||
else:
|
||||
self.logs_dir = Path(logs_dir)
|
||||
|
||||
# Create logs directory if it doesn't exist
|
||||
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Session file path
|
||||
self.session_file = self.logs_dir / f"session_{date.today().isoformat()}.jsonl"
|
||||
|
||||
# In-memory buffer
|
||||
self._buffer: list[dict] = []
|
||||
|
||||
def record_message(self, role: str, content: str) -> None:
|
||||
"""Record a user message.
|
||||
|
||||
Args:
|
||||
role: "user" or "timmy"
|
||||
content: The message content
|
||||
"""
|
||||
self._buffer.append(
|
||||
{
|
||||
"type": "message",
|
||||
"role": role,
|
||||
"content": content,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
def record_tool_call(self, tool_name: str, args: dict, result: str) -> None:
|
||||
"""Record a tool call.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool called
|
||||
args: Arguments passed to the tool
|
||||
result: Result from the tool
|
||||
"""
|
||||
# Truncate long results
|
||||
result_preview = result[:500] if isinstance(result, str) else str(result)[:500]
|
||||
|
||||
self._buffer.append(
|
||||
{
|
||||
"type": "tool_call",
|
||||
"tool": tool_name,
|
||||
"args": args,
|
||||
"result": result_preview,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
def record_error(self, error: str, context: str | None = None) -> None:
|
||||
"""Record an error.
|
||||
|
||||
Args:
|
||||
error: Error message
|
||||
context: Optional context about what was happening
|
||||
"""
|
||||
self._buffer.append(
|
||||
{
|
||||
"type": "error",
|
||||
"error": error,
|
||||
"context": context,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
def record_decision(self, decision: str, rationale: str | None = None) -> None:
|
||||
"""Record a decision Timmy made.
|
||||
|
||||
Args:
|
||||
decision: What was decided
|
||||
rationale: Why that decision was made
|
||||
"""
|
||||
self._buffer.append(
|
||||
{
|
||||
"type": "decision",
|
||||
"decision": decision,
|
||||
"rationale": rationale,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
def flush(self) -> Path:
|
||||
"""Flush buffer to disk.
|
||||
|
||||
Returns:
|
||||
Path to the session file
|
||||
"""
|
||||
if not self._buffer:
|
||||
return self.session_file
|
||||
|
||||
with open(self.session_file, "a") as f:
|
||||
for entry in self._buffer:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
|
||||
logger.info("Flushed %d entries to %s", len(self._buffer), self.session_file)
|
||||
self._buffer.clear()
|
||||
|
||||
return self.session_file
|
||||
|
||||
def get_session_summary(self) -> dict[str, Any]:
|
||||
"""Get a summary of the current session.
|
||||
|
||||
Returns:
|
||||
Dict with session statistics
|
||||
"""
|
||||
if not self.session_file.exists():
|
||||
return {
|
||||
"exists": False,
|
||||
"entries": 0,
|
||||
}
|
||||
|
||||
entries = []
|
||||
with open(self.session_file) as f:
|
||||
for line in f:
|
||||
if line.strip():
|
||||
entries.append(json.loads(line))
|
||||
|
||||
return {
|
||||
"exists": True,
|
||||
"file": str(self.session_file),
|
||||
"entries": len(entries),
|
||||
"messages": sum(1 for e in entries if e.get("type") == "message"),
|
||||
"tool_calls": sum(1 for e in entries if e.get("type") == "tool_call"),
|
||||
"errors": sum(1 for e in entries if e.get("type") == "error"),
|
||||
"decisions": sum(1 for e in entries if e.get("type") == "decision"),
|
||||
}
|
||||
|
||||
|
||||
# Global session logger instance
|
||||
_session_logger: SessionLogger | None = None
|
||||
|
||||
|
||||
def get_session_logger() -> SessionLogger:
|
||||
"""Get or create the global session logger."""
|
||||
global _session_logger
|
||||
if _session_logger is None:
|
||||
_session_logger = SessionLogger()
|
||||
return _session_logger
|
||||
|
||||
|
||||
def get_session_summary() -> dict[str, Any]:
|
||||
"""Get summary of current session logs.
|
||||
|
||||
Returns:
|
||||
Dict with session statistics (entries, messages, errors, etc.)
|
||||
"""
|
||||
logger = get_session_logger()
|
||||
return logger.get_session_summary()
|
||||
|
||||
|
||||
def flush_session_logs() -> str:
|
||||
"""Flush current session logs to disk.
|
||||
|
||||
Returns:
|
||||
Path to the log file
|
||||
"""
|
||||
logger = get_session_logger()
|
||||
path = logger.flush()
|
||||
return str(path)
|
||||
@@ -154,7 +154,9 @@ def create_research_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(search_tools.web_search, name="web_search")
|
||||
|
||||
# File reading
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
from config import settings
|
||||
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.list_files, name="list_files")
|
||||
@@ -180,7 +182,9 @@ def create_code_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(python_tools.run_python_code, name="python")
|
||||
|
||||
# File operations
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
from config import settings
|
||||
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.save_file, name="write_file")
|
||||
@@ -262,7 +266,9 @@ def create_data_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(python_tools.run_python_code, name="python")
|
||||
|
||||
# File reading
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
from config import settings
|
||||
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.list_files, name="list_files")
|
||||
@@ -284,7 +290,7 @@ def create_writing_tools(base_dir: str | Path | None = None):
|
||||
toolkit = Toolkit(name="writing")
|
||||
|
||||
# File operations
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.save_file, name="write_file")
|
||||
@@ -311,7 +317,7 @@ def create_security_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(search_tools.web_search, name="web_search")
|
||||
|
||||
# File reading for logs/configs
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.list_files, name="list_files")
|
||||
@@ -333,7 +339,7 @@ def create_devops_tools(base_dir: str | Path | None = None):
|
||||
toolkit.register(shell_tools.run_shell_command, name="shell")
|
||||
|
||||
# File operations for config management
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.save_file, name="write_file")
|
||||
@@ -425,8 +431,10 @@ def create_full_toolkit(base_dir: str | Path | None = None):
|
||||
shell_tools = ShellTools()
|
||||
toolkit.register(shell_tools.run_shell_command, name="shell")
|
||||
|
||||
# File operations
|
||||
base_path = Path(base_dir) if base_dir else Path.cwd()
|
||||
# File operations - use repo_root from settings
|
||||
from config import settings
|
||||
|
||||
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
|
||||
file_tools = FileTools(base_dir=base_path)
|
||||
toolkit.register(file_tools.read_file, name="read_file")
|
||||
toolkit.register(file_tools.save_file, name="write_file")
|
||||
@@ -453,6 +461,29 @@ def create_full_toolkit(base_dir: str | Path | None = None):
|
||||
except Exception:
|
||||
logger.debug("Memory search not available")
|
||||
|
||||
# System introspection - query runtime environment (sovereign self-knowledge)
|
||||
try:
|
||||
from timmy.tools_intro import (
|
||||
get_system_info,
|
||||
check_ollama_health,
|
||||
get_memory_status,
|
||||
)
|
||||
|
||||
toolkit.register(get_system_info, name="get_system_info")
|
||||
toolkit.register(check_ollama_health, name="check_ollama_health")
|
||||
toolkit.register(get_memory_status, name="get_memory_status")
|
||||
except Exception:
|
||||
logger.debug("Introspection tools not available")
|
||||
|
||||
# Inter-agent delegation - dispatch tasks to swarm agents
|
||||
try:
|
||||
from timmy.tools_delegation import delegate_task, list_swarm_agents
|
||||
|
||||
toolkit.register(delegate_task, name="delegate_task")
|
||||
toolkit.register(list_swarm_agents, name="list_swarm_agents")
|
||||
except Exception:
|
||||
logger.debug("Delegation tools not available")
|
||||
|
||||
return toolkit
|
||||
|
||||
|
||||
@@ -549,6 +580,21 @@ def get_all_available_tools() -> dict[str, dict]:
|
||||
"description": "Premium frontier reasoning via xAI Grok (opt-in, Lightning-payable)",
|
||||
"available_in": ["timmy"],
|
||||
},
|
||||
"get_system_info": {
|
||||
"name": "System Info",
|
||||
"description": "Introspect runtime environment - discover model, Python version, config",
|
||||
"available_in": ["timmy"],
|
||||
},
|
||||
"check_ollama_health": {
|
||||
"name": "Ollama Health",
|
||||
"description": "Check if Ollama is accessible and what models are available",
|
||||
"available_in": ["timmy"],
|
||||
},
|
||||
"get_memory_status": {
|
||||
"name": "Memory Status",
|
||||
"description": "Check status of Timmy's memory tiers (hot memory, vault)",
|
||||
"available_in": ["timmy"],
|
||||
},
|
||||
"aider": {
|
||||
"name": "Aider AI Assistant",
|
||||
"description": "Local AI coding assistant using Ollama (qwen2.5:14b or deepseek-coder)",
|
||||
|
||||
96
src/timmy/tools_delegation/__init__.py
Normal file
96
src/timmy/tools_delegation/__init__.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Inter-agent delegation tools for Timmy.
|
||||
|
||||
Allows Timmy to dispatch tasks to other swarm agents (Seer, Forge, Echo, etc.)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def delegate_task(
|
||||
agent_name: str, task_description: str, priority: str = "normal"
|
||||
) -> dict[str, Any]:
|
||||
"""Dispatch a task to another swarm agent.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent to delegate to (seer, forge, echo, helm, quill)
|
||||
task_description: What you want the agent to do
|
||||
priority: Task priority - "low", "normal", "high"
|
||||
|
||||
Returns:
|
||||
Dict with task_id, status, and message
|
||||
"""
|
||||
from swarm.coordinator import coordinator
|
||||
|
||||
# Validate agent name
|
||||
valid_agents = ["seer", "forge", "echo", "helm", "quill", "mace"]
|
||||
agent_name = agent_name.lower().strip()
|
||||
|
||||
if agent_name not in valid_agents:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Unknown agent: {agent_name}. Valid agents: {', '.join(valid_agents)}",
|
||||
"task_id": None,
|
||||
}
|
||||
|
||||
# Validate priority
|
||||
valid_priorities = ["low", "normal", "high"]
|
||||
if priority not in valid_priorities:
|
||||
priority = "normal"
|
||||
|
||||
try:
|
||||
# Submit task to coordinator
|
||||
task = coordinator.post_task(
|
||||
description=task_description,
|
||||
assigned_agent=agent_name,
|
||||
priority=priority,
|
||||
)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"task_id": task.task_id,
|
||||
"agent": agent_name,
|
||||
"status": "submitted",
|
||||
"message": f"Task submitted to {agent_name}: {task_description[:100]}...",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to delegate task to %s: %s", agent_name, e)
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"task_id": None,
|
||||
}
|
||||
|
||||
|
||||
def list_swarm_agents() -> dict[str, Any]:
|
||||
"""List all available swarm agents and their status.
|
||||
|
||||
Returns:
|
||||
Dict with agent list and status
|
||||
"""
|
||||
from swarm.coordinator import coordinator
|
||||
|
||||
try:
|
||||
agents = coordinator.list_swarm_agents()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"agents": [
|
||||
{
|
||||
"name": a.name,
|
||||
"status": a.status,
|
||||
"capabilities": a.capabilities,
|
||||
}
|
||||
for a in agents
|
||||
],
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"agents": [],
|
||||
}
|
||||
142
src/timmy/tools_intro/__init__.py
Normal file
142
src/timmy/tools_intro/__init__.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""System introspection tools for Timmy to query his own environment.
|
||||
|
||||
This provides true sovereignty - Timmy introspects his environment rather than
|
||||
being told about it in the system prompt.
|
||||
"""
|
||||
|
||||
import platform
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
def get_system_info() -> dict[str, Any]:
|
||||
"""Introspect the runtime environment to discover system information.
|
||||
|
||||
Returns:
|
||||
Dict containing:
|
||||
- python_version: Python version
|
||||
- platform: OS platform
|
||||
- model: Current Ollama model (queried from API)
|
||||
- model_backend: Configured backend (ollama/airllm/grok)
|
||||
- ollama_url: Ollama host URL
|
||||
- repo_root: Repository root path
|
||||
- grok_enabled: Whether GROK is enabled
|
||||
- spark_enabled: Whether Spark is enabled
|
||||
- memory_vault_exists: Whether memory vault is initialized
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
info = {
|
||||
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
||||
"platform": platform.system(),
|
||||
"model_backend": settings.timmy_model_backend,
|
||||
"ollama_url": settings.ollama_url,
|
||||
"repo_root": settings.repo_root,
|
||||
"grok_enabled": settings.grok_enabled,
|
||||
"spark_enabled": settings.spark_enabled,
|
||||
}
|
||||
|
||||
# Query Ollama for current model
|
||||
model_name = _get_ollama_model()
|
||||
info["model"] = model_name
|
||||
|
||||
# Check if memory vault exists
|
||||
vault_path = Path(settings.repo_root) / "memory" / "self"
|
||||
info["memory_vault_exists"] = vault_path.exists()
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def _get_ollama_model() -> str:
|
||||
"""Query Ollama API to get the current model."""
|
||||
from config import settings
|
||||
|
||||
try:
|
||||
# First try to get tags to see available models
|
||||
response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5)
|
||||
if response.status_code == 200:
|
||||
models = response.json().get("models", [])
|
||||
# Check if configured model is available
|
||||
for model in models:
|
||||
if model.get("name", "").startswith(
|
||||
settings.ollama_model.split(":")[0]
|
||||
):
|
||||
return settings.ollama_model
|
||||
|
||||
# Fallback: return configured model
|
||||
return settings.ollama_model
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to configured model
|
||||
return settings.ollama_model
|
||||
|
||||
|
||||
def check_ollama_health() -> dict[str, Any]:
|
||||
"""Check if Ollama is accessible and healthy.
|
||||
|
||||
Returns:
|
||||
Dict with status, model, and available models
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
result = {
|
||||
"accessible": False,
|
||||
"model": settings.ollama_model,
|
||||
"available_models": [],
|
||||
"error": None,
|
||||
}
|
||||
|
||||
try:
|
||||
# Check tags endpoint
|
||||
response = httpx.get(f"{settings.ollama_url}/api/tags", timeout=5)
|
||||
if response.status_code == 200:
|
||||
result["accessible"] = True
|
||||
models = response.json().get("models", [])
|
||||
result["available_models"] = [m.get("name", "") for m in models]
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_memory_status() -> dict[str, Any]:
|
||||
"""Get the status of Timmy's memory system.
|
||||
|
||||
Returns:
|
||||
Dict with memory tier information
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
repo_root = Path(settings.repo_root)
|
||||
|
||||
# Check tier 1: Hot memory
|
||||
memory_md = repo_root / "MEMORY.md"
|
||||
tier1_exists = memory_md.exists()
|
||||
tier1_content = ""
|
||||
if tier1_exists:
|
||||
tier1_content = memory_md.read_text()[:500] # First 500 chars
|
||||
|
||||
# Check tier 2: Vault
|
||||
vault_path = repo_root / "memory" / "self"
|
||||
tier2_exists = vault_path.exists()
|
||||
tier2_files = []
|
||||
if tier2_exists:
|
||||
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
|
||||
|
||||
return {
|
||||
"tier1_hot_memory": {
|
||||
"exists": tier1_exists,
|
||||
"path": str(memory_md),
|
||||
"preview": tier1_content[:200] if tier1_content else None,
|
||||
},
|
||||
"tier2_vault": {
|
||||
"exists": tier2_exists,
|
||||
"path": str(vault_path),
|
||||
"file_count": len(tier2_files),
|
||||
"files": tier2_files[:10], # First 10 files
|
||||
},
|
||||
}
|
||||
66
tests/timmy/test_delegation.py
Normal file
66
tests/timmy/test_delegation.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Tests for inter-agent delegation tools."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
def test_delegate_task_valid_agent():
|
||||
"""Should be able to delegate to a valid agent."""
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
with patch("swarm.coordinator.coordinator") as mock_coordinator:
|
||||
mock_task = MagicMock()
|
||||
mock_task.task_id = "task_123"
|
||||
mock_coordinator.post_task.return_value = mock_task
|
||||
|
||||
result = delegate_task("seer", "analyze this data")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["task_id"] == "task_123"
|
||||
assert result["agent"] == "seer"
|
||||
|
||||
|
||||
def test_delegate_task_invalid_agent():
|
||||
"""Should return error for invalid agent."""
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
result = delegate_task("nonexistent", "do something")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "error" in result
|
||||
assert "Unknown agent" in result["error"]
|
||||
|
||||
|
||||
def test_delegate_task_priority():
|
||||
"""Should respect priority parameter."""
|
||||
from timmy.tools_delegation import delegate_task
|
||||
|
||||
with patch("swarm.coordinator.coordinator") as mock_coordinator:
|
||||
mock_task = MagicMock()
|
||||
mock_task.task_id = "task_456"
|
||||
mock_coordinator.post_task.return_value = mock_task
|
||||
|
||||
result = delegate_task("forge", "write code", priority="high")
|
||||
|
||||
assert result["success"] is True
|
||||
mock_coordinator.post_task.assert_called_once()
|
||||
call_kwargs = mock_coordinator.post_task.call_args.kwargs
|
||||
assert call_kwargs.get("priority") == "high"
|
||||
|
||||
|
||||
def test_list_swarm_agents():
|
||||
"""Should list available swarm agents."""
|
||||
from timmy.tools_delegation import list_swarm_agents
|
||||
|
||||
with patch("swarm.coordinator.coordinator") as mock_coordinator:
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.name = "seer"
|
||||
mock_agent.status = "idle"
|
||||
mock_agent.capabilities = ["analysis"]
|
||||
mock_coordinator.list_swarm_agents.return_value = [mock_agent]
|
||||
|
||||
result = list_swarm_agents()
|
||||
|
||||
assert result["success"] is True
|
||||
assert len(result["agents"]) == 1
|
||||
assert result["agents"][0]["name"] == "seer"
|
||||
62
tests/timmy/test_introspection.py
Normal file
62
tests/timmy/test_introspection.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""Tests for system introspection tools."""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_get_system_info_returns_dict():
|
||||
"""System info should return a dictionary."""
|
||||
from timmy.tools_intro import get_system_info
|
||||
|
||||
info = get_system_info()
|
||||
|
||||
assert isinstance(info, dict)
|
||||
assert "python_version" in info
|
||||
assert "platform" in info
|
||||
assert "model" in info
|
||||
assert "repo_root" in info
|
||||
|
||||
|
||||
def test_get_system_info_contains_model():
|
||||
"""System info should include model name."""
|
||||
from timmy.tools_intro import get_system_info
|
||||
from config import settings
|
||||
|
||||
info = get_system_info()
|
||||
|
||||
assert "model" in info
|
||||
# Model should come from settings
|
||||
assert info["model"] == settings.ollama_model
|
||||
|
||||
|
||||
def test_get_system_info_contains_repo_root():
|
||||
"""System info should include repo_root."""
|
||||
from timmy.tools_intro import get_system_info
|
||||
from config import settings
|
||||
|
||||
info = get_system_info()
|
||||
|
||||
assert "repo_root" in info
|
||||
assert info["repo_root"] == settings.repo_root
|
||||
assert "Timmy-time-dashboard" in info["repo_root"]
|
||||
|
||||
|
||||
def test_check_ollama_health_returns_dict():
|
||||
"""Ollama health check should return a dictionary."""
|
||||
from timmy.tools_intro import check_ollama_health
|
||||
|
||||
result = check_ollama_health()
|
||||
|
||||
assert isinstance(result, dict)
|
||||
assert "accessible" in result
|
||||
assert "model" in result
|
||||
|
||||
|
||||
def test_get_memory_status_returns_dict():
|
||||
"""Memory status should return a dictionary with tier info."""
|
||||
from timmy.tools_intro import get_memory_status
|
||||
|
||||
status = get_memory_status()
|
||||
|
||||
assert isinstance(status, dict)
|
||||
assert "tier1_hot_memory" in status
|
||||
assert "tier2_vault" in status
|
||||
95
tests/timmy/test_session_logging.py
Normal file
95
tests/timmy/test_session_logging.py
Normal file
@@ -0,0 +1,95 @@
|
||||
"""Tests for session logging."""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_session_logger_records_message():
|
||||
"""Should record a user message."""
|
||||
from timmy.session_logger import SessionLogger
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
logger = SessionLogger(logs_dir=tmpdir)
|
||||
|
||||
logger.record_message("user", "Hello Timmy")
|
||||
logger.record_message("timmy", "Hello user")
|
||||
|
||||
log_file = logger.flush()
|
||||
|
||||
assert log_file.exists()
|
||||
content = log_file.read_text()
|
||||
assert "Hello Timmy" in content
|
||||
assert "message" in content
|
||||
|
||||
|
||||
def test_session_logger_records_tool_call():
|
||||
"""Should record a tool call."""
|
||||
from timmy.session_logger import SessionLogger
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
logger = SessionLogger(logs_dir=tmpdir)
|
||||
|
||||
logger.record_tool_call("read_file", {"path": "test.py"}, "file content")
|
||||
|
||||
log_file = logger.flush()
|
||||
|
||||
assert log_file.exists()
|
||||
content = log_file.read_text()
|
||||
assert "read_file" in content
|
||||
assert "tool_call" in content
|
||||
|
||||
|
||||
def test_session_logger_records_error():
|
||||
"""Should record an error."""
|
||||
from timmy.session_logger import SessionLogger
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
logger = SessionLogger(logs_dir=tmpdir)
|
||||
|
||||
logger.record_error("File not found", "Reading config")
|
||||
|
||||
log_file = logger.flush()
|
||||
|
||||
assert log_file.exists()
|
||||
content = log_file.read_text()
|
||||
assert "File not found" in content
|
||||
assert "error" in content
|
||||
|
||||
|
||||
def test_session_logger_records_decision():
|
||||
"""Should record a decision."""
|
||||
from timmy.session_logger import SessionLogger
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
logger = SessionLogger(logs_dir=tmpdir)
|
||||
|
||||
logger.record_decision("Use OOP pattern", "More maintainable")
|
||||
|
||||
log_file = logger.flush()
|
||||
|
||||
assert log_file.exists()
|
||||
content = log_file.read_text()
|
||||
assert "Use OOP pattern" in content
|
||||
assert "decision" in content
|
||||
|
||||
|
||||
def test_session_summary():
|
||||
"""Should provide session summary."""
|
||||
from timmy.session_logger import SessionLogger
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
logger = SessionLogger(logs_dir=tmpdir)
|
||||
|
||||
logger.record_message("user", "Hello")
|
||||
logger.record_message("timmy", "Hi")
|
||||
logger.record_error("Test error")
|
||||
|
||||
# Flush to create the session file
|
||||
logger.flush()
|
||||
|
||||
summary = logger.get_session_summary()
|
||||
|
||||
assert summary["exists"] is True
|
||||
assert summary["entries"] >= 3
|
||||
43
tests/tools/test_path_resolution.py
Normal file
43
tests/tools/test_path_resolution.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Tests for path resolution in file operations."""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def test_resolve_path_expands_tilde():
|
||||
"""Path resolution should expand ~ to home directory."""
|
||||
from creative.tools.file_ops import _resolve_path
|
||||
|
||||
result = _resolve_path("~/test")
|
||||
|
||||
assert result.as_posix().startswith("/Users/")
|
||||
|
||||
|
||||
def test_resolve_path_relative_to_repo():
|
||||
"""Relative paths should resolve to repo root."""
|
||||
from creative.tools.file_ops import _resolve_path
|
||||
|
||||
result = _resolve_path("src/config.py")
|
||||
|
||||
assert "Timmy-time-dashboard" in str(result)
|
||||
assert result.name == "config.py"
|
||||
|
||||
|
||||
def test_resolve_path_absolute():
|
||||
"""Absolute paths should work as-is."""
|
||||
from creative.tools.file_ops import _resolve_path
|
||||
|
||||
result = _resolve_path("/etc/hosts")
|
||||
|
||||
assert result.name == "hosts"
|
||||
|
||||
|
||||
def test_resolve_path_with_custom_base():
|
||||
"""Custom base_dir should override repo root."""
|
||||
from creative.tools.file_ops import _resolve_path
|
||||
|
||||
result = _resolve_path("test.py", base_dir="/tmp")
|
||||
|
||||
# Handle macOS /private/tmp vs /tmp
|
||||
assert result.name == "test.py"
|
||||
assert "tmp" in result.as_posix()
|
||||
Reference in New Issue
Block a user