When third-party tools (Paperclip orchestrator, etc.) spawn hermes chat as a subprocess, their sessions pollute user session history and search. - hermes chat --source <tag> (also HERMES_SESSION_SOURCE env var) - exclude_sources parameter on list_sessions_rich() and search_messages() - Sessions with source=tool hidden from sessions list/browse/search - Third-party adapters pass --source tool to isolate agent sessions Cherry-picked from PR #3208 by HenkDz. Co-authored-by: Henkey <noonou7@gmail.com>
287 lines
11 KiB
Python
287 lines
11 KiB
Python
"""Tests for tools/session_search_tool.py — helper functions and search dispatcher."""
|
|
|
|
import json
|
|
import time
|
|
import pytest
|
|
|
|
from tools.session_search_tool import (
|
|
_format_timestamp,
|
|
_format_conversation,
|
|
_truncate_around_matches,
|
|
_HIDDEN_SESSION_SOURCES,
|
|
MAX_SESSION_CHARS,
|
|
SESSION_SEARCH_SCHEMA,
|
|
)
|
|
|
|
|
|
# =========================================================================
|
|
# Tool schema guidance
|
|
# =========================================================================
|
|
|
|
class TestHiddenSessionSources:
|
|
"""Verify the _HIDDEN_SESSION_SOURCES constant used for third-party isolation."""
|
|
|
|
def test_tool_source_is_hidden(self):
|
|
assert "tool" in _HIDDEN_SESSION_SOURCES
|
|
|
|
def test_standard_sources_not_hidden(self):
|
|
for src in ("cli", "telegram", "discord", "slack", "cron"):
|
|
assert src not in _HIDDEN_SESSION_SOURCES
|
|
|
|
|
|
class TestSessionSearchSchema:
|
|
def test_keeps_cross_session_recall_guidance_without_current_session_nudge(self):
|
|
description = SESSION_SEARCH_SCHEMA["description"]
|
|
assert "past conversations" in description
|
|
assert "recent turns of the current session" not in description
|
|
|
|
|
|
# =========================================================================
|
|
# _format_timestamp
|
|
# =========================================================================
|
|
|
|
class TestFormatTimestamp:
|
|
def test_unix_float(self):
|
|
ts = 1700000000.0 # Nov 14, 2023
|
|
result = _format_timestamp(ts)
|
|
assert "2023" in result or "November" in result
|
|
|
|
def test_unix_int(self):
|
|
result = _format_timestamp(1700000000)
|
|
assert isinstance(result, str)
|
|
assert len(result) > 5
|
|
|
|
def test_iso_string(self):
|
|
result = _format_timestamp("2024-01-15T10:30:00")
|
|
assert isinstance(result, str)
|
|
|
|
def test_none_returns_unknown(self):
|
|
assert _format_timestamp(None) == "unknown"
|
|
|
|
def test_numeric_string(self):
|
|
result = _format_timestamp("1700000000.0")
|
|
assert isinstance(result, str)
|
|
assert "unknown" not in result.lower()
|
|
|
|
|
|
# =========================================================================
|
|
# _format_conversation
|
|
# =========================================================================
|
|
|
|
class TestFormatConversation:
|
|
def test_basic_messages(self):
|
|
msgs = [
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi there!"},
|
|
]
|
|
result = _format_conversation(msgs)
|
|
assert "[USER]: Hello" in result
|
|
assert "[ASSISTANT]: Hi there!" in result
|
|
|
|
def test_tool_message(self):
|
|
msgs = [
|
|
{"role": "tool", "content": "search results", "tool_name": "web_search"},
|
|
]
|
|
result = _format_conversation(msgs)
|
|
assert "[TOOL:web_search]" in result
|
|
|
|
def test_long_tool_output_truncated(self):
|
|
msgs = [
|
|
{"role": "tool", "content": "x" * 1000, "tool_name": "terminal"},
|
|
]
|
|
result = _format_conversation(msgs)
|
|
assert "[truncated]" in result
|
|
|
|
def test_assistant_with_tool_calls(self):
|
|
msgs = [
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{"function": {"name": "web_search"}},
|
|
{"function": {"name": "terminal"}},
|
|
],
|
|
},
|
|
]
|
|
result = _format_conversation(msgs)
|
|
assert "web_search" in result
|
|
assert "terminal" in result
|
|
|
|
def test_empty_messages(self):
|
|
result = _format_conversation([])
|
|
assert result == ""
|
|
|
|
|
|
# =========================================================================
|
|
# _truncate_around_matches
|
|
# =========================================================================
|
|
|
|
class TestTruncateAroundMatches:
|
|
def test_short_text_unchanged(self):
|
|
text = "Short text about docker"
|
|
result = _truncate_around_matches(text, "docker")
|
|
assert result == text
|
|
|
|
def test_long_text_truncated(self):
|
|
# Create text longer than MAX_SESSION_CHARS with query term in middle
|
|
padding = "x" * (MAX_SESSION_CHARS + 5000)
|
|
text = padding + " KEYWORD_HERE " + padding
|
|
result = _truncate_around_matches(text, "KEYWORD_HERE")
|
|
assert len(result) <= MAX_SESSION_CHARS + 100 # +100 for prefix/suffix markers
|
|
assert "KEYWORD_HERE" in result
|
|
|
|
def test_truncation_adds_markers(self):
|
|
text = "a" * 50000 + " target " + "b" * (MAX_SESSION_CHARS + 5000)
|
|
result = _truncate_around_matches(text, "target")
|
|
assert "truncated" in result.lower()
|
|
|
|
def test_no_match_takes_from_start(self):
|
|
text = "x" * (MAX_SESSION_CHARS + 5000)
|
|
result = _truncate_around_matches(text, "nonexistent")
|
|
# Should take from the beginning
|
|
assert result.startswith("x")
|
|
|
|
def test_match_at_beginning(self):
|
|
text = "KEYWORD " + "x" * (MAX_SESSION_CHARS + 5000)
|
|
result = _truncate_around_matches(text, "KEYWORD")
|
|
assert "KEYWORD" in result
|
|
|
|
|
|
# =========================================================================
|
|
# session_search (dispatcher)
|
|
# =========================================================================
|
|
|
|
class TestSessionSearch:
|
|
def test_no_db_returns_error(self):
|
|
from tools.session_search_tool import session_search
|
|
result = json.loads(session_search(query="test"))
|
|
assert result["success"] is False
|
|
assert "not available" in result["error"].lower()
|
|
|
|
def test_empty_query_returns_error(self):
|
|
from tools.session_search_tool import session_search
|
|
mock_db = object()
|
|
result = json.loads(session_search(query="", db=mock_db))
|
|
assert result["success"] is False
|
|
|
|
def test_whitespace_query_returns_error(self):
|
|
from tools.session_search_tool import session_search
|
|
mock_db = object()
|
|
result = json.loads(session_search(query=" ", db=mock_db))
|
|
assert result["success"] is False
|
|
|
|
def test_current_session_excluded(self):
|
|
"""session_search should never return the current session."""
|
|
from unittest.mock import MagicMock
|
|
from tools.session_search_tool import session_search
|
|
|
|
mock_db = MagicMock()
|
|
current_sid = "20260304_120000_abc123"
|
|
|
|
# Simulate FTS5 returning matches only from the current session
|
|
mock_db.search_messages.return_value = [
|
|
{"session_id": current_sid, "content": "test match", "source": "cli",
|
|
"session_started": 1709500000, "model": "test"},
|
|
]
|
|
mock_db.get_session.return_value = {"parent_session_id": None}
|
|
|
|
result = json.loads(session_search(
|
|
query="test", db=mock_db, current_session_id=current_sid,
|
|
))
|
|
assert result["success"] is True
|
|
assert result["count"] == 0
|
|
assert result["results"] == []
|
|
|
|
def test_current_session_excluded_keeps_others(self):
|
|
"""Other sessions should still be returned when current is excluded."""
|
|
from unittest.mock import MagicMock
|
|
from tools.session_search_tool import session_search
|
|
|
|
mock_db = MagicMock()
|
|
current_sid = "20260304_120000_abc123"
|
|
other_sid = "20260303_100000_def456"
|
|
|
|
mock_db.search_messages.return_value = [
|
|
{"session_id": current_sid, "content": "match 1", "source": "cli",
|
|
"session_started": 1709500000, "model": "test"},
|
|
{"session_id": other_sid, "content": "match 2", "source": "telegram",
|
|
"session_started": 1709400000, "model": "test"},
|
|
]
|
|
mock_db.get_session.return_value = {"parent_session_id": None}
|
|
mock_db.get_messages_as_conversation.return_value = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi there"},
|
|
]
|
|
|
|
# Mock async_call_llm to raise RuntimeError → summarizer returns None
|
|
from unittest.mock import AsyncMock, patch as _patch
|
|
with _patch("tools.session_search_tool.async_call_llm",
|
|
new_callable=AsyncMock,
|
|
side_effect=RuntimeError("no provider")):
|
|
result = json.loads(session_search(
|
|
query="test", db=mock_db, current_session_id=current_sid,
|
|
))
|
|
|
|
assert result["success"] is True
|
|
# Current session should be skipped, only other_sid should appear
|
|
assert result["sessions_searched"] == 1
|
|
assert current_sid not in [r.get("session_id") for r in result.get("results", [])]
|
|
|
|
def test_current_child_session_excludes_parent_lineage(self):
|
|
"""Compression/delegation parents should be excluded for the active child session."""
|
|
from unittest.mock import MagicMock
|
|
from tools.session_search_tool import session_search
|
|
|
|
mock_db = MagicMock()
|
|
mock_db.search_messages.return_value = [
|
|
{"session_id": "parent_sid", "content": "match", "source": "cli",
|
|
"session_started": 1709500000, "model": "test"},
|
|
]
|
|
|
|
def _get_session(session_id):
|
|
if session_id == "child_sid":
|
|
return {"parent_session_id": "parent_sid"}
|
|
if session_id == "parent_sid":
|
|
return {"parent_session_id": None}
|
|
return None
|
|
|
|
mock_db.get_session.side_effect = _get_session
|
|
|
|
result = json.loads(session_search(
|
|
query="test", db=mock_db, current_session_id="child_sid",
|
|
))
|
|
|
|
assert result["success"] is True
|
|
assert result["count"] == 0
|
|
assert result["results"] == []
|
|
assert result["sessions_searched"] == 0
|
|
|
|
def test_current_root_session_excludes_child_lineage(self):
|
|
"""Delegation child hits should be excluded when they resolve to the current root session."""
|
|
from unittest.mock import MagicMock
|
|
from tools.session_search_tool import session_search
|
|
|
|
mock_db = MagicMock()
|
|
mock_db.search_messages.return_value = [
|
|
{"session_id": "child_sid", "content": "match", "source": "cli",
|
|
"session_started": 1709500000, "model": "test"},
|
|
]
|
|
|
|
def _get_session(session_id):
|
|
if session_id == "root_sid":
|
|
return {"parent_session_id": None}
|
|
if session_id == "child_sid":
|
|
return {"parent_session_id": "root_sid"}
|
|
return None
|
|
|
|
mock_db.get_session.side_effect = _get_session
|
|
|
|
result = json.loads(session_search(
|
|
query="test", db=mock_db, current_session_id="root_sid",
|
|
))
|
|
|
|
assert result["success"] is True
|
|
assert result["count"] == 0
|
|
assert result["results"] == []
|
|
assert result["sessions_searched"] == 0
|