Compare commits
2 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7bb6f15c33 | ||
| b45b543f2d |
@@ -100,48 +100,25 @@ def _get_git_context() -> dict:
|
|||||||
return {"branch": "unknown", "commit": "unknown"}
|
return {"branch": "unknown", "commit": "unknown"}
|
||||||
|
|
||||||
|
|
||||||
def capture_error(
|
def _extract_origin(exc: Exception) -> tuple[str, int]:
|
||||||
exc: Exception,
|
"""Walk the traceback to find the deepest file and line number."""
|
||||||
source: str = "unknown",
|
|
||||||
context: dict | None = None,
|
|
||||||
) -> str | None:
|
|
||||||
"""Capture an error and optionally create a bug report.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
exc: The exception to capture
|
|
||||||
source: Module/component where the error occurred
|
|
||||||
context: Optional dict of extra context (request path, etc.)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Task ID of the created bug report, or None if deduplicated/disabled
|
|
||||||
"""
|
|
||||||
from config import settings
|
|
||||||
|
|
||||||
if not settings.error_feedback_enabled:
|
|
||||||
return None
|
|
||||||
|
|
||||||
error_hash = _stack_hash(exc)
|
|
||||||
|
|
||||||
if _is_duplicate(error_hash):
|
|
||||||
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Format the stack trace
|
|
||||||
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
|
||||||
|
|
||||||
# Extract file/line from traceback
|
|
||||||
tb_obj = exc.__traceback__
|
tb_obj = exc.__traceback__
|
||||||
affected_file = "unknown"
|
|
||||||
affected_line = 0
|
|
||||||
while tb_obj and tb_obj.tb_next:
|
while tb_obj and tb_obj.tb_next:
|
||||||
tb_obj = tb_obj.tb_next
|
tb_obj = tb_obj.tb_next
|
||||||
if tb_obj:
|
if tb_obj:
|
||||||
affected_file = tb_obj.tb_frame.f_code.co_filename
|
return tb_obj.tb_frame.f_code.co_filename, tb_obj.tb_lineno
|
||||||
affected_line = tb_obj.tb_lineno
|
return "unknown", 0
|
||||||
|
|
||||||
git_ctx = _get_git_context()
|
|
||||||
|
|
||||||
# 1. Log to event_log
|
def _log_error_event(
|
||||||
|
exc: Exception,
|
||||||
|
source: str,
|
||||||
|
error_hash: str,
|
||||||
|
affected_file: str,
|
||||||
|
affected_line: int,
|
||||||
|
git_ctx: dict,
|
||||||
|
) -> None:
|
||||||
|
"""Log the error to the event log (best-effort)."""
|
||||||
try:
|
try:
|
||||||
from swarm.event_log import EventType, log_event
|
from swarm.event_log import EventType, log_event
|
||||||
|
|
||||||
@@ -161,8 +138,18 @@ def capture_error(
|
|||||||
except Exception as log_exc:
|
except Exception as log_exc:
|
||||||
logger.debug("Failed to log error event: %s", log_exc)
|
logger.debug("Failed to log error event: %s", log_exc)
|
||||||
|
|
||||||
# 2. Create bug report task
|
|
||||||
task_id = None
|
def _create_bug_report(
|
||||||
|
exc: Exception,
|
||||||
|
source: str,
|
||||||
|
error_hash: str,
|
||||||
|
affected_file: str,
|
||||||
|
affected_line: int,
|
||||||
|
git_ctx: dict,
|
||||||
|
tb_str: str,
|
||||||
|
context: dict | None,
|
||||||
|
) -> str | None:
|
||||||
|
"""Create a bug report task and return its ID (best-effort)."""
|
||||||
try:
|
try:
|
||||||
from swarm.task_queue.models import create_task
|
from swarm.task_queue.models import create_task
|
||||||
|
|
||||||
@@ -193,29 +180,30 @@ def capture_error(
|
|||||||
auto_approve=True,
|
auto_approve=True,
|
||||||
task_type="bug_report",
|
task_type="bug_report",
|
||||||
)
|
)
|
||||||
task_id = task.id
|
|
||||||
|
|
||||||
# Log the creation event
|
|
||||||
try:
|
try:
|
||||||
from swarm.event_log import EventType, log_event
|
from swarm.event_log import EventType, log_event
|
||||||
|
|
||||||
log_event(
|
log_event(
|
||||||
EventType.BUG_REPORT_CREATED,
|
EventType.BUG_REPORT_CREATED,
|
||||||
source=source,
|
source=source,
|
||||||
task_id=task_id,
|
task_id=task.id,
|
||||||
data={
|
data={
|
||||||
"error_hash": error_hash,
|
"error_hash": error_hash,
|
||||||
"title": title[:100],
|
"title": title[:100],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as log_exc:
|
||||||
logger.warning("Bug report screenshot error: %s", exc)
|
logger.warning("Bug report log error: %s", log_exc)
|
||||||
pass
|
|
||||||
|
|
||||||
|
return task.id
|
||||||
except Exception as task_exc:
|
except Exception as task_exc:
|
||||||
logger.debug("Failed to create bug report task: %s", task_exc)
|
logger.debug("Failed to create bug report task: %s", task_exc)
|
||||||
|
return None
|
||||||
|
|
||||||
# 3. Send notification
|
|
||||||
|
def _send_error_notification(exc: Exception, source: str) -> None:
|
||||||
|
"""Push a notification about the captured error (best-effort)."""
|
||||||
try:
|
try:
|
||||||
from infrastructure.notifications.push import notifier
|
from infrastructure.notifications.push import notifier
|
||||||
|
|
||||||
@@ -224,11 +212,12 @@ def capture_error(
|
|||||||
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
|
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
|
||||||
category="system",
|
category="system",
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as notify_exc:
|
||||||
logger.warning("Bug report notification error: %s", exc)
|
logger.warning("Bug report notification error: %s", notify_exc)
|
||||||
pass
|
|
||||||
|
|
||||||
# 4. Record in session logger (via registered callback)
|
|
||||||
|
def _record_to_session(exc: Exception, source: str) -> None:
|
||||||
|
"""Forward the error to the registered session recorder (best-effort)."""
|
||||||
if _error_recorder is not None:
|
if _error_recorder is not None:
|
||||||
try:
|
try:
|
||||||
_error_recorder(
|
_error_recorder(
|
||||||
@@ -238,4 +227,44 @@ def capture_error(
|
|||||||
except Exception as log_exc:
|
except Exception as log_exc:
|
||||||
logger.warning("Bug report session logging error: %s", log_exc)
|
logger.warning("Bug report session logging error: %s", log_exc)
|
||||||
|
|
||||||
|
|
||||||
|
def capture_error(
|
||||||
|
exc: Exception,
|
||||||
|
source: str = "unknown",
|
||||||
|
context: dict | None = None,
|
||||||
|
) -> str | None:
|
||||||
|
"""Capture an error and optionally create a bug report.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
exc: The exception to capture
|
||||||
|
source: Module/component where the error occurred
|
||||||
|
context: Optional dict of extra context (request path, etc.)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Task ID of the created bug report, or None if deduplicated/disabled
|
||||||
|
"""
|
||||||
|
from config import settings
|
||||||
|
|
||||||
|
if not settings.error_feedback_enabled:
|
||||||
|
return None
|
||||||
|
|
||||||
|
error_hash = _stack_hash(exc)
|
||||||
|
|
||||||
|
if _is_duplicate(error_hash):
|
||||||
|
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
|
||||||
|
return None
|
||||||
|
|
||||||
|
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
||||||
|
affected_file, affected_line = _extract_origin(exc)
|
||||||
|
git_ctx = _get_git_context()
|
||||||
|
|
||||||
|
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
|
||||||
|
|
||||||
|
task_id = _create_bug_report(
|
||||||
|
exc, source, error_hash, affected_file, affected_line, git_ctx, tb_str, context
|
||||||
|
)
|
||||||
|
|
||||||
|
_send_error_notification(exc, source)
|
||||||
|
_record_to_session(exc, source)
|
||||||
|
|
||||||
return task_id
|
return task_id
|
||||||
|
|||||||
@@ -197,6 +197,90 @@ def _resolve_backend(requested: str | None) -> str:
|
|||||||
return "ollama"
|
return "ollama"
|
||||||
|
|
||||||
|
|
||||||
|
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
|
||||||
|
"""Assemble the tools list based on model capability and MCP flags.
|
||||||
|
|
||||||
|
Returns a list of Toolkit / MCPTools objects, or an empty list.
|
||||||
|
"""
|
||||||
|
if not use_tools:
|
||||||
|
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
||||||
|
return []
|
||||||
|
|
||||||
|
tools_list: list = [create_full_toolkit()]
|
||||||
|
|
||||||
|
# Add MCP tool servers (lazy-connected on first arun()).
|
||||||
|
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
||||||
|
# scopes that conflict with asyncio background task cancellation (#72).
|
||||||
|
if not skip_mcp:
|
||||||
|
try:
|
||||||
|
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
||||||
|
|
||||||
|
gitea_mcp = create_gitea_mcp_tools()
|
||||||
|
if gitea_mcp:
|
||||||
|
tools_list.append(gitea_mcp)
|
||||||
|
|
||||||
|
fs_mcp = create_filesystem_mcp_tools()
|
||||||
|
if fs_mcp:
|
||||||
|
tools_list.append(fs_mcp)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("MCP tools unavailable: %s", exc)
|
||||||
|
|
||||||
|
return tools_list
|
||||||
|
|
||||||
|
|
||||||
|
def _build_prompt(use_tools: bool, session_id: str) -> str:
|
||||||
|
"""Build the full system prompt with optional memory context."""
|
||||||
|
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from timmy.memory_system import memory_system
|
||||||
|
|
||||||
|
memory_context = memory_system.get_system_context()
|
||||||
|
if memory_context:
|
||||||
|
# Smaller budget for small models — expanded prompt uses more tokens
|
||||||
|
max_context = 2000 if not use_tools else 8000
|
||||||
|
if len(memory_context) > max_context:
|
||||||
|
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
||||||
|
return (
|
||||||
|
f"{base_prompt}\n\n"
|
||||||
|
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
||||||
|
f"{memory_context}"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to load memory context: %s", exc)
|
||||||
|
|
||||||
|
return base_prompt
|
||||||
|
|
||||||
|
|
||||||
|
def _create_ollama_agent(
|
||||||
|
*,
|
||||||
|
db_file: str,
|
||||||
|
model_name: str,
|
||||||
|
tools_list: list,
|
||||||
|
full_prompt: str,
|
||||||
|
use_tools: bool,
|
||||||
|
) -> Agent:
|
||||||
|
"""Construct the Agno Agent with Ollama backend and warm up the model."""
|
||||||
|
model_kwargs = {}
|
||||||
|
if settings.ollama_num_ctx > 0:
|
||||||
|
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||||
|
|
||||||
|
agent = Agent(
|
||||||
|
name="Agent",
|
||||||
|
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
||||||
|
db=SqliteDb(db_file=db_file),
|
||||||
|
description=full_prompt,
|
||||||
|
add_history_to_context=True,
|
||||||
|
num_history_runs=20,
|
||||||
|
markdown=False,
|
||||||
|
tools=tools_list if tools_list else None,
|
||||||
|
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||||
|
telemetry=settings.telemetry_enabled,
|
||||||
|
)
|
||||||
|
_warmup_model(model_name)
|
||||||
|
return agent
|
||||||
|
|
||||||
|
|
||||||
def create_timmy(
|
def create_timmy(
|
||||||
db_file: str = "timmy.db",
|
db_file: str = "timmy.db",
|
||||||
backend: str | None = None,
|
backend: str | None = None,
|
||||||
@@ -238,16 +322,12 @@ def create_timmy(
|
|||||||
return TimmyAirLLMAgent(model_size=size)
|
return TimmyAirLLMAgent(model_size=size)
|
||||||
|
|
||||||
# Default: Ollama via Agno.
|
# Default: Ollama via Agno.
|
||||||
# Resolve model with automatic pulling and fallback
|
|
||||||
model_name, is_fallback = _resolve_model_with_fallback(
|
model_name, is_fallback = _resolve_model_with_fallback(
|
||||||
requested_model=None,
|
requested_model=None,
|
||||||
require_vision=False,
|
require_vision=False,
|
||||||
auto_pull=True,
|
auto_pull=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# If Ollama is completely unreachable, fail loudly.
|
|
||||||
# Sovereignty: never silently send data to a cloud API.
|
|
||||||
# Use --backend claude explicitly if you want cloud inference.
|
|
||||||
if not _check_model_available(model_name):
|
if not _check_model_available(model_name):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Ollama unreachable and no local models available. "
|
"Ollama unreachable and no local models available. "
|
||||||
@@ -258,76 +338,16 @@ def create_timmy(
|
|||||||
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
||||||
|
|
||||||
use_tools = _model_supports_tools(model_name)
|
use_tools = _model_supports_tools(model_name)
|
||||||
|
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
|
||||||
|
full_prompt = _build_prompt(use_tools, session_id)
|
||||||
|
|
||||||
# Conditionally include tools — small models get none
|
return _create_ollama_agent(
|
||||||
toolkit = create_full_toolkit() if use_tools else None
|
db_file=db_file,
|
||||||
if not use_tools:
|
model_name=model_name,
|
||||||
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
tools_list=tools_list,
|
||||||
|
full_prompt=full_prompt,
|
||||||
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
|
use_tools=use_tools,
|
||||||
tools_list: list = []
|
|
||||||
if toolkit:
|
|
||||||
tools_list.append(toolkit)
|
|
||||||
|
|
||||||
# Add MCP tool servers (lazy-connected on first arun()).
|
|
||||||
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
|
||||||
# scopes that conflict with asyncio background task cancellation (#72).
|
|
||||||
if use_tools and not skip_mcp:
|
|
||||||
try:
|
|
||||||
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
|
||||||
|
|
||||||
gitea_mcp = create_gitea_mcp_tools()
|
|
||||||
if gitea_mcp:
|
|
||||||
tools_list.append(gitea_mcp)
|
|
||||||
|
|
||||||
fs_mcp = create_filesystem_mcp_tools()
|
|
||||||
if fs_mcp:
|
|
||||||
tools_list.append(fs_mcp)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("MCP tools unavailable: %s", exc)
|
|
||||||
|
|
||||||
# Select prompt tier based on tool capability
|
|
||||||
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
|
||||||
|
|
||||||
# Try to load memory context
|
|
||||||
try:
|
|
||||||
from timmy.memory_system import memory_system
|
|
||||||
|
|
||||||
memory_context = memory_system.get_system_context()
|
|
||||||
if memory_context:
|
|
||||||
# Truncate if too long — smaller budget for small models
|
|
||||||
# since the expanded prompt (roster, guardrails) uses more tokens
|
|
||||||
max_context = 2000 if not use_tools else 8000
|
|
||||||
if len(memory_context) > max_context:
|
|
||||||
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
|
||||||
full_prompt = (
|
|
||||||
f"{base_prompt}\n\n"
|
|
||||||
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
|
||||||
f"{memory_context}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
full_prompt = base_prompt
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Failed to load memory context: %s", exc)
|
|
||||||
full_prompt = base_prompt
|
|
||||||
|
|
||||||
model_kwargs = {}
|
|
||||||
if settings.ollama_num_ctx > 0:
|
|
||||||
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
|
||||||
agent = Agent(
|
|
||||||
name="Agent",
|
|
||||||
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
|
||||||
db=SqliteDb(db_file=db_file),
|
|
||||||
description=full_prompt,
|
|
||||||
add_history_to_context=True,
|
|
||||||
num_history_runs=20,
|
|
||||||
markdown=False,
|
|
||||||
tools=tools_list if tools_list else None,
|
|
||||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
|
||||||
telemetry=settings.telemetry_enabled,
|
|
||||||
)
|
)
|
||||||
_warmup_model(model_name)
|
|
||||||
return agent
|
|
||||||
|
|
||||||
|
|
||||||
class TimmyWithMemory:
|
class TimmyWithMemory:
|
||||||
|
|||||||
@@ -5,9 +5,14 @@ from datetime import UTC, datetime, timedelta
|
|||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from infrastructure.error_capture import (
|
from infrastructure.error_capture import (
|
||||||
|
_create_bug_report,
|
||||||
_dedup_cache,
|
_dedup_cache,
|
||||||
|
_extract_origin,
|
||||||
_get_git_context,
|
_get_git_context,
|
||||||
_is_duplicate,
|
_is_duplicate,
|
||||||
|
_log_error_event,
|
||||||
|
_record_to_session,
|
||||||
|
_send_error_notification,
|
||||||
_stack_hash,
|
_stack_hash,
|
||||||
capture_error,
|
capture_error,
|
||||||
)
|
)
|
||||||
@@ -193,3 +198,87 @@ class TestCaptureError:
|
|||||||
|
|
||||||
def teardown_method(self):
|
def teardown_method(self):
|
||||||
_dedup_cache.clear()
|
_dedup_cache.clear()
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractOrigin:
|
||||||
|
"""Test _extract_origin helper."""
|
||||||
|
|
||||||
|
def test_returns_file_and_line(self):
|
||||||
|
try:
|
||||||
|
_make_exception()
|
||||||
|
except ValueError as e:
|
||||||
|
filename, lineno = _extract_origin(e)
|
||||||
|
assert filename.endswith("test_error_capture.py")
|
||||||
|
assert lineno > 0
|
||||||
|
|
||||||
|
def test_no_traceback_returns_defaults(self):
|
||||||
|
exc = ValueError("no tb")
|
||||||
|
exc.__traceback__ = None
|
||||||
|
assert _extract_origin(exc) == ("unknown", 0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogErrorEvent:
|
||||||
|
"""Test _log_error_event helper."""
|
||||||
|
|
||||||
|
def test_does_not_crash_when_event_log_missing(self):
|
||||||
|
try:
|
||||||
|
raise RuntimeError("log test")
|
||||||
|
except RuntimeError as e:
|
||||||
|
_log_error_event(e, "test", "abc123", "file.py", 42, {})
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateBugReport:
|
||||||
|
"""Test _create_bug_report helper."""
|
||||||
|
|
||||||
|
def test_returns_none_on_import_failure(self):
|
||||||
|
try:
|
||||||
|
raise RuntimeError("report test")
|
||||||
|
except RuntimeError as e:
|
||||||
|
with patch("infrastructure.error_capture.logger"):
|
||||||
|
result = _create_bug_report(e, "test", "abc", "f.py", 1, {}, "tb", None)
|
||||||
|
# Returns a task id or None depending on whether swarm is available
|
||||||
|
assert result is None or isinstance(result, str)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendErrorNotification:
|
||||||
|
"""Test _send_error_notification helper."""
|
||||||
|
|
||||||
|
def test_does_not_crash_on_notifier_failure(self):
|
||||||
|
try:
|
||||||
|
raise RuntimeError("notify test")
|
||||||
|
except RuntimeError as e:
|
||||||
|
_send_error_notification(e, "test")
|
||||||
|
|
||||||
|
|
||||||
|
class TestRecordToSession:
|
||||||
|
"""Test _record_to_session helper."""
|
||||||
|
|
||||||
|
def test_noop_when_no_recorder(self):
|
||||||
|
import infrastructure.error_capture as ec
|
||||||
|
|
||||||
|
original = ec._error_recorder
|
||||||
|
try:
|
||||||
|
ec._error_recorder = None
|
||||||
|
try:
|
||||||
|
raise RuntimeError("session test")
|
||||||
|
except RuntimeError as e:
|
||||||
|
_record_to_session(e, "test") # should not crash
|
||||||
|
finally:
|
||||||
|
ec._error_recorder = original
|
||||||
|
|
||||||
|
def test_calls_registered_recorder(self):
|
||||||
|
import infrastructure.error_capture as ec
|
||||||
|
|
||||||
|
original = ec._error_recorder
|
||||||
|
calls = []
|
||||||
|
try:
|
||||||
|
ec._error_recorder = lambda **kwargs: calls.append(kwargs)
|
||||||
|
try:
|
||||||
|
raise RuntimeError("recorded")
|
||||||
|
except RuntimeError as e:
|
||||||
|
_record_to_session(e, "src")
|
||||||
|
assert len(calls) == 1
|
||||||
|
assert "RuntimeError: recorded" in calls[0]["error"]
|
||||||
|
assert calls[0]["context"] == "src"
|
||||||
|
finally:
|
||||||
|
ec._error_recorder = original
|
||||||
|
|||||||
@@ -444,6 +444,150 @@ def test_get_effective_ollama_model_walks_fallback_chain():
|
|||||||
assert result == "fb-2"
|
assert result == "fb-2"
|
||||||
|
|
||||||
|
|
||||||
|
# ── _build_tools_list ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_empty_when_tools_disabled():
|
||||||
|
"""Small models get an empty tools list."""
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_includes_toolkit_when_enabled():
|
||||||
|
"""Tool-capable models get the full toolkit."""
|
||||||
|
mock_toolkit = MagicMock()
|
||||||
|
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||||
|
assert mock_toolkit in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_skips_mcp_when_flagged():
|
||||||
|
"""skip_mcp=True must not call MCP factories."""
|
||||||
|
mock_toolkit = MagicMock()
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||||
|
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
|
||||||
|
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
|
||||||
|
):
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
|
||||||
|
mock_gitea.assert_not_called()
|
||||||
|
mock_fs.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_includes_mcp_when_not_skipped():
|
||||||
|
"""skip_mcp=False should attempt MCP tool creation."""
|
||||||
|
mock_toolkit = MagicMock()
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||||
|
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
|
||||||
|
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
|
||||||
|
):
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
|
||||||
|
mock_gitea.assert_called_once()
|
||||||
|
mock_fs.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
# ── _build_prompt ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_includes_base_prompt():
|
||||||
|
"""Prompt should always contain the base system prompt."""
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=False, session_id="test")
|
||||||
|
assert "Timmy" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_appends_memory_context():
|
||||||
|
"""Memory context should be appended when available."""
|
||||||
|
mock_memory = MagicMock()
|
||||||
|
mock_memory.get_system_context.return_value = "User prefers dark mode."
|
||||||
|
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=True, session_id="test")
|
||||||
|
assert "GROUNDED CONTEXT" in result
|
||||||
|
assert "dark mode" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_truncates_long_memory():
|
||||||
|
"""Long memory context should be truncated."""
|
||||||
|
mock_memory = MagicMock()
|
||||||
|
mock_memory.get_system_context.return_value = "x" * 10000
|
||||||
|
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=False, session_id="test")
|
||||||
|
assert "[truncated]" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_survives_memory_failure():
|
||||||
|
"""Prompt should fall back to base when memory fails."""
|
||||||
|
mock_memory = MagicMock()
|
||||||
|
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
|
||||||
|
with patch("timmy.memory_system.memory_system", mock_memory):
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=True, session_id="test")
|
||||||
|
assert "Timmy" in result
|
||||||
|
# Memory context should NOT be appended (the db locked error was caught)
|
||||||
|
assert "db locked" not in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── _create_ollama_agent ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_ollama_agent_passes_correct_kwargs():
|
||||||
|
"""_create_ollama_agent must pass the expected kwargs to Agent."""
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.Agent") as MockAgent,
|
||||||
|
patch("timmy.agent.Ollama"),
|
||||||
|
patch("timmy.agent.SqliteDb"),
|
||||||
|
patch("timmy.agent._warmup_model", return_value=True),
|
||||||
|
):
|
||||||
|
from timmy.agent import _create_ollama_agent
|
||||||
|
|
||||||
|
_create_ollama_agent(
|
||||||
|
db_file="test.db",
|
||||||
|
model_name="llama3.1",
|
||||||
|
tools_list=[MagicMock()],
|
||||||
|
full_prompt="test prompt",
|
||||||
|
use_tools=True,
|
||||||
|
)
|
||||||
|
kwargs = MockAgent.call_args.kwargs
|
||||||
|
assert kwargs["description"] == "test prompt"
|
||||||
|
assert kwargs["markdown"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_ollama_agent_none_tools_when_empty():
|
||||||
|
"""Empty tools_list should pass tools=None to Agent."""
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.Agent") as MockAgent,
|
||||||
|
patch("timmy.agent.Ollama"),
|
||||||
|
patch("timmy.agent.SqliteDb"),
|
||||||
|
patch("timmy.agent._warmup_model", return_value=True),
|
||||||
|
):
|
||||||
|
from timmy.agent import _create_ollama_agent
|
||||||
|
|
||||||
|
_create_ollama_agent(
|
||||||
|
db_file="test.db",
|
||||||
|
model_name="llama3.2",
|
||||||
|
tools_list=[],
|
||||||
|
full_prompt="test prompt",
|
||||||
|
use_tools=False,
|
||||||
|
)
|
||||||
|
kwargs = MockAgent.call_args.kwargs
|
||||||
|
assert kwargs["tools"] is None
|
||||||
|
|
||||||
|
|
||||||
def test_no_hardcoded_fallback_constants_in_agent():
|
def test_no_hardcoded_fallback_constants_in_agent():
|
||||||
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
|
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
|
||||||
import timmy.agent as agent_mod
|
import timmy.agent as agent_mod
|
||||||
|
|||||||
Reference in New Issue
Block a user