forked from Rockachopa/Timmy-time-dashboard
Compare commits
2 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
735bfc7820 | ||
| 7c823ab59c |
@@ -197,6 +197,113 @@ def _resolve_backend(requested: str | None) -> str:
|
|||||||
return "ollama"
|
return "ollama"
|
||||||
|
|
||||||
|
|
||||||
|
def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
|
||||||
|
"""Build the Agno tools list (toolkit + optional MCP servers).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
use_tools: Whether the model supports tool calling.
|
||||||
|
skip_mcp: If True, omit MCP tool servers.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Toolkit / MCPTools, possibly empty.
|
||||||
|
"""
|
||||||
|
if not use_tools:
|
||||||
|
logger.info("Tools disabled (model too small for reliable tool calling)")
|
||||||
|
return []
|
||||||
|
|
||||||
|
toolkit = create_full_toolkit()
|
||||||
|
tools_list: list = [toolkit]
|
||||||
|
|
||||||
|
# Add MCP tool servers (lazy-connected on first arun()).
|
||||||
|
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
||||||
|
# scopes that conflict with asyncio background task cancellation (#72).
|
||||||
|
if not skip_mcp:
|
||||||
|
try:
|
||||||
|
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
||||||
|
|
||||||
|
gitea_mcp = create_gitea_mcp_tools()
|
||||||
|
if gitea_mcp:
|
||||||
|
tools_list.append(gitea_mcp)
|
||||||
|
|
||||||
|
fs_mcp = create_filesystem_mcp_tools()
|
||||||
|
if fs_mcp:
|
||||||
|
tools_list.append(fs_mcp)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("MCP tools unavailable: %s", exc)
|
||||||
|
|
||||||
|
return tools_list
|
||||||
|
|
||||||
|
|
||||||
|
def _build_prompt(use_tools: bool, session_id: str) -> str:
|
||||||
|
"""Build the full system prompt with optional memory context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
use_tools: Whether tools are enabled (affects prompt tier and context budget).
|
||||||
|
session_id: Session identifier for the prompt.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Complete system prompt string.
|
||||||
|
"""
|
||||||
|
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from timmy.memory_system import memory_system
|
||||||
|
|
||||||
|
memory_context = memory_system.get_system_context()
|
||||||
|
if memory_context:
|
||||||
|
# Truncate if too long — smaller budget for small models
|
||||||
|
# since the expanded prompt (roster, guardrails) uses more tokens
|
||||||
|
max_context = 2000 if not use_tools else 8000
|
||||||
|
if len(memory_context) > max_context:
|
||||||
|
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
||||||
|
return (
|
||||||
|
f"{base_prompt}\n\n"
|
||||||
|
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
||||||
|
f"{memory_context}"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to load memory context: %s", exc)
|
||||||
|
|
||||||
|
return base_prompt
|
||||||
|
|
||||||
|
|
||||||
|
def _create_ollama_agent(
|
||||||
|
model_name: str,
|
||||||
|
db_file: str,
|
||||||
|
tools_list: list,
|
||||||
|
full_prompt: str,
|
||||||
|
use_tools: bool,
|
||||||
|
) -> Agent:
|
||||||
|
"""Construct the Agno Agent with an Ollama model.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_name: Resolved Ollama model name.
|
||||||
|
db_file: SQLite file for conversation memory.
|
||||||
|
tools_list: Pre-built tools list (may be empty).
|
||||||
|
full_prompt: Complete system prompt.
|
||||||
|
use_tools: Whether tools are enabled.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured Agno Agent.
|
||||||
|
"""
|
||||||
|
model_kwargs = {}
|
||||||
|
if settings.ollama_num_ctx > 0:
|
||||||
|
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
||||||
|
|
||||||
|
return Agent(
|
||||||
|
name="Agent",
|
||||||
|
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
||||||
|
db=SqliteDb(db_file=db_file),
|
||||||
|
description=full_prompt,
|
||||||
|
add_history_to_context=True,
|
||||||
|
num_history_runs=20,
|
||||||
|
markdown=False,
|
||||||
|
tools=tools_list if tools_list else None,
|
||||||
|
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||||
|
telemetry=settings.telemetry_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_timmy(
|
def create_timmy(
|
||||||
db_file: str = "timmy.db",
|
db_file: str = "timmy.db",
|
||||||
backend: str | None = None,
|
backend: str | None = None,
|
||||||
@@ -238,16 +345,12 @@ def create_timmy(
|
|||||||
return TimmyAirLLMAgent(model_size=size)
|
return TimmyAirLLMAgent(model_size=size)
|
||||||
|
|
||||||
# Default: Ollama via Agno.
|
# Default: Ollama via Agno.
|
||||||
# Resolve model with automatic pulling and fallback
|
|
||||||
model_name, is_fallback = _resolve_model_with_fallback(
|
model_name, is_fallback = _resolve_model_with_fallback(
|
||||||
requested_model=None,
|
requested_model=None,
|
||||||
require_vision=False,
|
require_vision=False,
|
||||||
auto_pull=True,
|
auto_pull=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# If Ollama is completely unreachable, fail loudly.
|
|
||||||
# Sovereignty: never silently send data to a cloud API.
|
|
||||||
# Use --backend claude explicitly if you want cloud inference.
|
|
||||||
if not _check_model_available(model_name):
|
if not _check_model_available(model_name):
|
||||||
logger.error(
|
logger.error(
|
||||||
"Ollama unreachable and no local models available. "
|
"Ollama unreachable and no local models available. "
|
||||||
@@ -258,74 +361,9 @@ def create_timmy(
|
|||||||
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
logger.info("Using fallback model %s (requested was unavailable)", model_name)
|
||||||
|
|
||||||
use_tools = _model_supports_tools(model_name)
|
use_tools = _model_supports_tools(model_name)
|
||||||
|
tools_list = _build_tools_list(use_tools, skip_mcp)
|
||||||
# Conditionally include tools — small models get none
|
full_prompt = _build_prompt(use_tools, session_id)
|
||||||
toolkit = create_full_toolkit() if use_tools else None
|
agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools)
|
||||||
if not use_tools:
|
|
||||||
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
|
|
||||||
|
|
||||||
# Build the tools list — Agno accepts a list of Toolkit / MCPTools
|
|
||||||
tools_list: list = []
|
|
||||||
if toolkit:
|
|
||||||
tools_list.append(toolkit)
|
|
||||||
|
|
||||||
# Add MCP tool servers (lazy-connected on first arun()).
|
|
||||||
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
|
|
||||||
# scopes that conflict with asyncio background task cancellation (#72).
|
|
||||||
if use_tools and not skip_mcp:
|
|
||||||
try:
|
|
||||||
from timmy.mcp_tools import create_filesystem_mcp_tools, create_gitea_mcp_tools
|
|
||||||
|
|
||||||
gitea_mcp = create_gitea_mcp_tools()
|
|
||||||
if gitea_mcp:
|
|
||||||
tools_list.append(gitea_mcp)
|
|
||||||
|
|
||||||
fs_mcp = create_filesystem_mcp_tools()
|
|
||||||
if fs_mcp:
|
|
||||||
tools_list.append(fs_mcp)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("MCP tools unavailable: %s", exc)
|
|
||||||
|
|
||||||
# Select prompt tier based on tool capability
|
|
||||||
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
|
|
||||||
|
|
||||||
# Try to load memory context
|
|
||||||
try:
|
|
||||||
from timmy.memory_system import memory_system
|
|
||||||
|
|
||||||
memory_context = memory_system.get_system_context()
|
|
||||||
if memory_context:
|
|
||||||
# Truncate if too long — smaller budget for small models
|
|
||||||
# since the expanded prompt (roster, guardrails) uses more tokens
|
|
||||||
max_context = 2000 if not use_tools else 8000
|
|
||||||
if len(memory_context) > max_context:
|
|
||||||
memory_context = memory_context[:max_context] + "\n... [truncated]"
|
|
||||||
full_prompt = (
|
|
||||||
f"{base_prompt}\n\n"
|
|
||||||
f"## GROUNDED CONTEXT (verified sources — cite when using)\n\n"
|
|
||||||
f"{memory_context}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
full_prompt = base_prompt
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Failed to load memory context: %s", exc)
|
|
||||||
full_prompt = base_prompt
|
|
||||||
|
|
||||||
model_kwargs = {}
|
|
||||||
if settings.ollama_num_ctx > 0:
|
|
||||||
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
|
|
||||||
agent = Agent(
|
|
||||||
name="Agent",
|
|
||||||
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
|
|
||||||
db=SqliteDb(db_file=db_file),
|
|
||||||
description=full_prompt,
|
|
||||||
add_history_to_context=True,
|
|
||||||
num_history_runs=20,
|
|
||||||
markdown=False,
|
|
||||||
tools=tools_list if tools_list else None,
|
|
||||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
|
||||||
telemetry=settings.telemetry_enabled,
|
|
||||||
)
|
|
||||||
_warmup_model(model_name)
|
_warmup_model(model_name)
|
||||||
return agent
|
return agent
|
||||||
|
|
||||||
|
|||||||
@@ -232,6 +232,90 @@ class ThinkingEngine:
|
|||||||
return False # Disabled — never idle
|
return False # Disabled — never idle
|
||||||
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
|
return datetime.now(UTC) - self._last_input_time > timedelta(minutes=timeout)
|
||||||
|
|
||||||
|
def _build_thinking_context(self) -> tuple[str, str, list["Thought"]]:
|
||||||
|
"""Assemble the context needed for a thinking cycle.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(memory_context, system_context, recent_thoughts)
|
||||||
|
"""
|
||||||
|
memory_context = self._load_memory_context()
|
||||||
|
system_context = self._gather_system_snapshot()
|
||||||
|
recent_thoughts = self.get_recent_thoughts(limit=5)
|
||||||
|
return memory_context, system_context, recent_thoughts
|
||||||
|
|
||||||
|
async def _generate_novel_thought(
|
||||||
|
self,
|
||||||
|
prompt: str | None,
|
||||||
|
memory_context: str,
|
||||||
|
system_context: str,
|
||||||
|
recent_thoughts: list["Thought"],
|
||||||
|
) -> tuple[str | None, str]:
|
||||||
|
"""Run the dedup-retry loop to produce a novel thought.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(content, seed_type) — content is None if no novel thought produced.
|
||||||
|
"""
|
||||||
|
seed_type: str = "freeform"
|
||||||
|
|
||||||
|
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
||||||
|
if prompt:
|
||||||
|
seed_type = "prompted"
|
||||||
|
seed_context = f"Journal prompt: {prompt}"
|
||||||
|
else:
|
||||||
|
seed_type, seed_context = self._gather_seed()
|
||||||
|
|
||||||
|
continuity = self._build_continuity_context()
|
||||||
|
|
||||||
|
full_prompt = _THINKING_PROMPT.format(
|
||||||
|
memory_context=memory_context,
|
||||||
|
system_context=system_context,
|
||||||
|
seed_context=seed_context,
|
||||||
|
continuity_context=continuity,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw = await self._call_agent(full_prompt)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
||||||
|
return None, seed_type
|
||||||
|
|
||||||
|
if not raw or not raw.strip():
|
||||||
|
logger.debug("Thinking cycle produced empty response, skipping")
|
||||||
|
return None, seed_type
|
||||||
|
|
||||||
|
content = raw.strip()
|
||||||
|
|
||||||
|
# Dedup: reject thoughts too similar to recent ones
|
||||||
|
if not self._is_too_similar(content, recent_thoughts):
|
||||||
|
return content, seed_type # Good — novel thought
|
||||||
|
|
||||||
|
if attempt < self._MAX_DEDUP_RETRIES:
|
||||||
|
logger.info(
|
||||||
|
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
||||||
|
attempt + 1,
|
||||||
|
self._MAX_DEDUP_RETRIES + 1,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"Thought still repetitive after %d retries, discarding",
|
||||||
|
self._MAX_DEDUP_RETRIES + 1,
|
||||||
|
)
|
||||||
|
return None, seed_type
|
||||||
|
|
||||||
|
return None, seed_type
|
||||||
|
|
||||||
|
async def _process_thinking_result(self, thought: "Thought") -> None:
|
||||||
|
"""Run all post-hooks after a thought is stored."""
|
||||||
|
self._maybe_check_memory()
|
||||||
|
await self._maybe_distill()
|
||||||
|
await self._maybe_file_issues()
|
||||||
|
await self._check_workspace()
|
||||||
|
self._maybe_check_memory_status()
|
||||||
|
self._update_memory(thought)
|
||||||
|
self._log_event(thought)
|
||||||
|
self._write_journal(thought)
|
||||||
|
await self._broadcast(thought)
|
||||||
|
|
||||||
async def think_once(self, prompt: str | None = None) -> Thought | None:
|
async def think_once(self, prompt: str | None = None) -> Thought | None:
|
||||||
"""Execute one thinking cycle.
|
"""Execute one thinking cycle.
|
||||||
|
|
||||||
@@ -257,91 +341,21 @@ class ThinkingEngine:
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
memory_context = self._load_memory_context()
|
memory_context, system_context, recent_thoughts = self._build_thinking_context()
|
||||||
system_context = self._gather_system_snapshot()
|
|
||||||
recent_thoughts = self.get_recent_thoughts(limit=5)
|
|
||||||
|
|
||||||
content: str | None = None
|
|
||||||
seed_type: str = "freeform"
|
|
||||||
|
|
||||||
for attempt in range(self._MAX_DEDUP_RETRIES + 1):
|
|
||||||
if prompt:
|
|
||||||
seed_type = "prompted"
|
|
||||||
seed_context = f"Journal prompt: {prompt}"
|
|
||||||
else:
|
|
||||||
seed_type, seed_context = self._gather_seed()
|
|
||||||
|
|
||||||
continuity = self._build_continuity_context()
|
|
||||||
|
|
||||||
full_prompt = _THINKING_PROMPT.format(
|
|
||||||
memory_context=memory_context,
|
|
||||||
system_context=system_context,
|
|
||||||
seed_context=seed_context,
|
|
||||||
continuity_context=continuity,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
raw = await self._call_agent(full_prompt)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not raw or not raw.strip():
|
|
||||||
logger.debug("Thinking cycle produced empty response, skipping")
|
|
||||||
return None
|
|
||||||
|
|
||||||
content = raw.strip()
|
|
||||||
|
|
||||||
# Dedup: reject thoughts too similar to recent ones
|
|
||||||
if not self._is_too_similar(content, recent_thoughts):
|
|
||||||
break # Good — novel thought
|
|
||||||
|
|
||||||
if attempt < self._MAX_DEDUP_RETRIES:
|
|
||||||
logger.info(
|
|
||||||
"Thought too similar to recent (attempt %d/%d), retrying with new seed",
|
|
||||||
attempt + 1,
|
|
||||||
self._MAX_DEDUP_RETRIES + 1,
|
|
||||||
)
|
|
||||||
content = None # Will retry
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
"Thought still repetitive after %d retries, discarding",
|
|
||||||
self._MAX_DEDUP_RETRIES + 1,
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
content, seed_type = await self._generate_novel_thought(
|
||||||
|
prompt,
|
||||||
|
memory_context,
|
||||||
|
system_context,
|
||||||
|
recent_thoughts,
|
||||||
|
)
|
||||||
if not content:
|
if not content:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
thought = self._store_thought(content, seed_type)
|
thought = self._store_thought(content, seed_type)
|
||||||
self._last_thought_id = thought.id
|
self._last_thought_id = thought.id
|
||||||
|
|
||||||
# Post-hook: check memory status periodically
|
await self._process_thinking_result(thought)
|
||||||
self._maybe_check_memory()
|
|
||||||
|
|
||||||
# Post-hook: distill facts from recent thoughts periodically
|
|
||||||
await self._maybe_distill()
|
|
||||||
|
|
||||||
# Post-hook: file Gitea issues for actionable observations
|
|
||||||
await self._maybe_file_issues()
|
|
||||||
|
|
||||||
# Post-hook: check workspace for new messages from Hermes
|
|
||||||
await self._check_workspace()
|
|
||||||
|
|
||||||
# Post-hook: proactive memory status audit
|
|
||||||
self._maybe_check_memory_status()
|
|
||||||
|
|
||||||
# Post-hook: update MEMORY.md with latest reflection
|
|
||||||
self._update_memory(thought)
|
|
||||||
|
|
||||||
# Log to swarm event system
|
|
||||||
self._log_event(thought)
|
|
||||||
|
|
||||||
# Append to daily journal file
|
|
||||||
self._write_journal(thought)
|
|
||||||
|
|
||||||
# Broadcast to WebSocket clients
|
|
||||||
await self._broadcast(thought)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Thought [%s] (%s): %s",
|
"Thought [%s] (%s): %s",
|
||||||
|
|||||||
@@ -454,3 +454,127 @@ def test_no_hardcoded_fallback_constants_in_agent():
|
|||||||
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
|
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
|
||||||
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
|
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── _build_tools_list helper ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_returns_empty_when_no_tools():
|
||||||
|
"""When use_tools=False, _build_tools_list returns an empty list."""
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
result = _build_tools_list(use_tools=False, skip_mcp=False)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_includes_toolkit():
|
||||||
|
"""When use_tools=True, _build_tools_list includes the toolkit."""
|
||||||
|
mock_toolkit = MagicMock()
|
||||||
|
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
result = _build_tools_list(use_tools=True, skip_mcp=True)
|
||||||
|
|
||||||
|
assert mock_toolkit in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_tools_list_adds_mcp_when_not_skipped():
|
||||||
|
"""When skip_mcp=False, _build_tools_list attempts MCP tools."""
|
||||||
|
mock_toolkit = MagicMock()
|
||||||
|
mock_gitea = MagicMock()
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
|
||||||
|
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea),
|
||||||
|
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None),
|
||||||
|
):
|
||||||
|
from timmy.agent import _build_tools_list
|
||||||
|
|
||||||
|
result = _build_tools_list(use_tools=True, skip_mcp=False)
|
||||||
|
|
||||||
|
assert mock_toolkit in result
|
||||||
|
assert mock_gitea in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── _build_prompt helper ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_returns_base_when_no_memory():
|
||||||
|
"""_build_prompt returns base prompt when memory context is empty."""
|
||||||
|
with patch("timmy.memory_system.memory_system") as mock_mem:
|
||||||
|
mock_mem.get_system_context.return_value = ""
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=True, session_id="test")
|
||||||
|
|
||||||
|
assert "Timmy" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_appends_memory_context():
|
||||||
|
"""_build_prompt appends memory context when available."""
|
||||||
|
with patch("timmy.memory_system.memory_system") as mock_mem:
|
||||||
|
mock_mem.get_system_context.return_value = "User likes pizza"
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=True, session_id="test")
|
||||||
|
|
||||||
|
assert "User likes pizza" in result
|
||||||
|
assert "GROUNDED CONTEXT" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_build_prompt_truncates_long_memory_for_small_models():
|
||||||
|
"""_build_prompt truncates memory for small models (use_tools=False)."""
|
||||||
|
long_context = "x" * 5000
|
||||||
|
with patch("timmy.memory_system.memory_system") as mock_mem:
|
||||||
|
mock_mem.get_system_context.return_value = long_context
|
||||||
|
from timmy.agent import _build_prompt
|
||||||
|
|
||||||
|
result = _build_prompt(use_tools=False, session_id="test")
|
||||||
|
|
||||||
|
# Max context is 2000 for small models + truncation marker
|
||||||
|
assert "[truncated]" in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── _create_ollama_agent helper ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_ollama_agent_passes_correct_kwargs():
|
||||||
|
"""_create_ollama_agent passes the expected kwargs to Agent()."""
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.Agent") as MockAgent,
|
||||||
|
patch("timmy.agent.Ollama"),
|
||||||
|
patch("timmy.agent.SqliteDb"),
|
||||||
|
):
|
||||||
|
from timmy.agent import _create_ollama_agent
|
||||||
|
|
||||||
|
_create_ollama_agent(
|
||||||
|
model_name="test-model",
|
||||||
|
db_file="test.db",
|
||||||
|
tools_list=[MagicMock()],
|
||||||
|
full_prompt="Test prompt",
|
||||||
|
use_tools=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
kwargs = MockAgent.call_args.kwargs
|
||||||
|
assert kwargs["description"] == "Test prompt"
|
||||||
|
assert kwargs["tools"] is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_ollama_agent_none_tools_when_empty():
|
||||||
|
"""_create_ollama_agent passes tools=None when tools_list is empty."""
|
||||||
|
with (
|
||||||
|
patch("timmy.agent.Agent") as MockAgent,
|
||||||
|
patch("timmy.agent.Ollama"),
|
||||||
|
patch("timmy.agent.SqliteDb"),
|
||||||
|
):
|
||||||
|
from timmy.agent import _create_ollama_agent
|
||||||
|
|
||||||
|
_create_ollama_agent(
|
||||||
|
model_name="test-model",
|
||||||
|
db_file="test.db",
|
||||||
|
tools_list=[],
|
||||||
|
full_prompt="Test prompt",
|
||||||
|
use_tools=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
kwargs = MockAgent.call_args.kwargs
|
||||||
|
assert kwargs["tools"] is None
|
||||||
|
|||||||
Reference in New Issue
Block a user