Files
hermes-agent/tests/tools/test_session_search.py
Teknium db241ae6ce feat(sessions): add --source flag for third-party session isolation (#3255)
When third-party tools (Paperclip orchestrator, etc.) spawn hermes chat
as a subprocess, their sessions pollute user session history and search.

- hermes chat --source <tag> (also HERMES_SESSION_SOURCE env var)
- exclude_sources parameter on list_sessions_rich() and search_messages()
- Sessions with source=tool hidden from sessions list/browse/search
- Third-party adapters pass --source tool to isolate agent sessions

Cherry-picked from PR #3208 by HenkDz.

Co-authored-by: Henkey <noonou7@gmail.com>
2026-03-26 14:35:31 -07:00

287 lines
11 KiB
Python

"""Tests for tools/session_search_tool.py — helper functions and search dispatcher."""
import json
import time
import pytest
from tools.session_search_tool import (
_format_timestamp,
_format_conversation,
_truncate_around_matches,
_HIDDEN_SESSION_SOURCES,
MAX_SESSION_CHARS,
SESSION_SEARCH_SCHEMA,
)
# =========================================================================
# Tool schema guidance
# =========================================================================
class TestHiddenSessionSources:
"""Verify the _HIDDEN_SESSION_SOURCES constant used for third-party isolation."""
def test_tool_source_is_hidden(self):
assert "tool" in _HIDDEN_SESSION_SOURCES
def test_standard_sources_not_hidden(self):
for src in ("cli", "telegram", "discord", "slack", "cron"):
assert src not in _HIDDEN_SESSION_SOURCES
class TestSessionSearchSchema:
def test_keeps_cross_session_recall_guidance_without_current_session_nudge(self):
description = SESSION_SEARCH_SCHEMA["description"]
assert "past conversations" in description
assert "recent turns of the current session" not in description
# =========================================================================
# _format_timestamp
# =========================================================================
class TestFormatTimestamp:
def test_unix_float(self):
ts = 1700000000.0 # Nov 14, 2023
result = _format_timestamp(ts)
assert "2023" in result or "November" in result
def test_unix_int(self):
result = _format_timestamp(1700000000)
assert isinstance(result, str)
assert len(result) > 5
def test_iso_string(self):
result = _format_timestamp("2024-01-15T10:30:00")
assert isinstance(result, str)
def test_none_returns_unknown(self):
assert _format_timestamp(None) == "unknown"
def test_numeric_string(self):
result = _format_timestamp("1700000000.0")
assert isinstance(result, str)
assert "unknown" not in result.lower()
# =========================================================================
# _format_conversation
# =========================================================================
class TestFormatConversation:
def test_basic_messages(self):
msgs = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
result = _format_conversation(msgs)
assert "[USER]: Hello" in result
assert "[ASSISTANT]: Hi there!" in result
def test_tool_message(self):
msgs = [
{"role": "tool", "content": "search results", "tool_name": "web_search"},
]
result = _format_conversation(msgs)
assert "[TOOL:web_search]" in result
def test_long_tool_output_truncated(self):
msgs = [
{"role": "tool", "content": "x" * 1000, "tool_name": "terminal"},
]
result = _format_conversation(msgs)
assert "[truncated]" in result
def test_assistant_with_tool_calls(self):
msgs = [
{
"role": "assistant",
"content": "",
"tool_calls": [
{"function": {"name": "web_search"}},
{"function": {"name": "terminal"}},
],
},
]
result = _format_conversation(msgs)
assert "web_search" in result
assert "terminal" in result
def test_empty_messages(self):
result = _format_conversation([])
assert result == ""
# =========================================================================
# _truncate_around_matches
# =========================================================================
class TestTruncateAroundMatches:
def test_short_text_unchanged(self):
text = "Short text about docker"
result = _truncate_around_matches(text, "docker")
assert result == text
def test_long_text_truncated(self):
# Create text longer than MAX_SESSION_CHARS with query term in middle
padding = "x" * (MAX_SESSION_CHARS + 5000)
text = padding + " KEYWORD_HERE " + padding
result = _truncate_around_matches(text, "KEYWORD_HERE")
assert len(result) <= MAX_SESSION_CHARS + 100 # +100 for prefix/suffix markers
assert "KEYWORD_HERE" in result
def test_truncation_adds_markers(self):
text = "a" * 50000 + " target " + "b" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "target")
assert "truncated" in result.lower()
def test_no_match_takes_from_start(self):
text = "x" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "nonexistent")
# Should take from the beginning
assert result.startswith("x")
def test_match_at_beginning(self):
text = "KEYWORD " + "x" * (MAX_SESSION_CHARS + 5000)
result = _truncate_around_matches(text, "KEYWORD")
assert "KEYWORD" in result
# =========================================================================
# session_search (dispatcher)
# =========================================================================
class TestSessionSearch:
def test_no_db_returns_error(self):
from tools.session_search_tool import session_search
result = json.loads(session_search(query="test"))
assert result["success"] is False
assert "not available" in result["error"].lower()
def test_empty_query_returns_error(self):
from tools.session_search_tool import session_search
mock_db = object()
result = json.loads(session_search(query="", db=mock_db))
assert result["success"] is False
def test_whitespace_query_returns_error(self):
from tools.session_search_tool import session_search
mock_db = object()
result = json.loads(session_search(query=" ", db=mock_db))
assert result["success"] is False
def test_current_session_excluded(self):
"""session_search should never return the current session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
current_sid = "20260304_120000_abc123"
# Simulate FTS5 returning matches only from the current session
mock_db.search_messages.return_value = [
{"session_id": current_sid, "content": "test match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
mock_db.get_session.return_value = {"parent_session_id": None}
result = json.loads(session_search(
query="test", db=mock_db, current_session_id=current_sid,
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
def test_current_session_excluded_keeps_others(self):
"""Other sessions should still be returned when current is excluded."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
current_sid = "20260304_120000_abc123"
other_sid = "20260303_100000_def456"
mock_db.search_messages.return_value = [
{"session_id": current_sid, "content": "match 1", "source": "cli",
"session_started": 1709500000, "model": "test"},
{"session_id": other_sid, "content": "match 2", "source": "telegram",
"session_started": 1709400000, "model": "test"},
]
mock_db.get_session.return_value = {"parent_session_id": None}
mock_db.get_messages_as_conversation.return_value = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
# Mock async_call_llm to raise RuntimeError → summarizer returns None
from unittest.mock import AsyncMock, patch as _patch
with _patch("tools.session_search_tool.async_call_llm",
new_callable=AsyncMock,
side_effect=RuntimeError("no provider")):
result = json.loads(session_search(
query="test", db=mock_db, current_session_id=current_sid,
))
assert result["success"] is True
# Current session should be skipped, only other_sid should appear
assert result["sessions_searched"] == 1
assert current_sid not in [r.get("session_id") for r in result.get("results", [])]
def test_current_child_session_excludes_parent_lineage(self):
"""Compression/delegation parents should be excluded for the active child session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = [
{"session_id": "parent_sid", "content": "match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
def _get_session(session_id):
if session_id == "child_sid":
return {"parent_session_id": "parent_sid"}
if session_id == "parent_sid":
return {"parent_session_id": None}
return None
mock_db.get_session.side_effect = _get_session
result = json.loads(session_search(
query="test", db=mock_db, current_session_id="child_sid",
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
assert result["sessions_searched"] == 0
def test_current_root_session_excludes_child_lineage(self):
"""Delegation child hits should be excluded when they resolve to the current root session."""
from unittest.mock import MagicMock
from tools.session_search_tool import session_search
mock_db = MagicMock()
mock_db.search_messages.return_value = [
{"session_id": "child_sid", "content": "match", "source": "cli",
"session_started": 1709500000, "model": "test"},
]
def _get_session(session_id):
if session_id == "root_sid":
return {"parent_session_id": None}
if session_id == "child_sid":
return {"parent_session_id": "root_sid"}
return None
mock_db.get_session.side_effect = _get_session
result = json.loads(session_search(
query="test", db=mock_db, current_session_id="root_sid",
))
assert result["success"] is True
assert result["count"] == 0
assert result["results"] == []
assert result["sessions_searched"] == 0