[loop-cycle-63] feat: session_history tool — Timmy searches past conversations (#251) (#258)

This commit is contained in:
2026-03-15 15:11:43 -04:00
parent dd34dc064f
commit 80aba0bf6d
3 changed files with 531 additions and 0 deletions

View File

@@ -155,6 +155,56 @@ class SessionLogger:
"decisions": sum(1 for e in entries if e.get("type") == "decision"),
}
def search(self, query: str, role: str | None = None, limit: int = 10) -> list[dict]:
"""Search across all session logs for entries matching a query.
Args:
query: Case-insensitive substring to search for.
role: Optional role filter ("user", "timmy", "system").
limit: Maximum number of results to return.
Returns:
List of matching entries (most recent first), each with
type, timestamp, and relevant content fields.
"""
query_lower = query.lower()
matches: list[dict] = []
# Collect all session files, sorted newest first
log_files = sorted(self.logs_dir.glob("session_*.jsonl"), reverse=True)
for log_file in log_files:
if len(matches) >= limit:
break
try:
with open(log_file) as f:
# Read all lines, reverse so newest entries come first
lines = [ln for ln in f if ln.strip()]
for line in reversed(lines):
if len(matches) >= limit:
break
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Role filter
if role and entry.get("role") != role:
continue
# Search in text-bearing fields
searchable = " ".join(
str(entry.get(k, ""))
for k in ("content", "error", "decision", "rationale", "result", "tool")
).lower()
if query_lower in searchable:
entry["_source_file"] = log_file.name
matches.append(entry)
except OSError:
continue
return matches
# Global session logger instance
_session_logger: SessionLogger | None = None
@@ -187,3 +237,53 @@ def flush_session_logs() -> str:
logger = get_session_logger()
path = logger.flush()
return str(path)
def session_history(query: str, role: str = "", limit: int = 10) -> str:
"""Search Timmy's past conversation history.
Find messages, tool calls, errors, and decisions from past sessions
that match the query. Results are returned most-recent first.
Args:
query: What to search for (case-insensitive substring match).
role: Optional filter by role — "user", "timmy", or "" for all.
limit: Maximum results to return (default 10).
Returns:
Formatted string of matching session entries.
"""
sl = get_session_logger()
# Flush buffer first so current session is searchable
sl.flush()
results = sl.search(query, role=role or None, limit=limit)
if not results:
return f"No session history found matching '{query}'."
lines = [f"Found {len(results)} result(s) for '{query}':\n"]
for entry in results:
ts = entry.get("timestamp", "?")[:19]
etype = entry.get("type", "?")
source = entry.get("_source_file", "")
if etype == "message":
who = entry.get("role", "?")
text = entry.get("content", "")[:200]
lines.append(f"[{ts}] {who}: {text}")
elif etype == "tool_call":
tool = entry.get("tool", "?")
result = entry.get("result", "")[:100]
lines.append(f"[{ts}] tool:{tool}{result}")
elif etype == "error":
err = entry.get("error", "")[:200]
lines.append(f"[{ts}] ERROR: {err}")
elif etype == "decision":
dec = entry.get("decision", "")[:200]
lines.append(f"[{ts}] DECIDED: {dec}")
else:
lines.append(f"[{ts}] {etype}: {json.dumps(entry)[:200]}")
if source:
lines[-1] += f" ({source})"
return "\n".join(lines)

View File

@@ -586,6 +586,14 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
logger.debug("Introspection tools not available")
try:
from timmy.session_logger import session_history
toolkit.register(session_history, name="session_history")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (session_history registration): %s", exc)
logger.debug("session_history tool not available")
def _register_delegation_tools(toolkit: Toolkit) -> None:
"""Register inter-agent delegation tools."""
@@ -824,6 +832,11 @@ def _introspection_tool_catalog() -> dict:
"description": "Check status of memory tiers (hot memory, vault)",
"available_in": ["orchestrator"],
},
"session_history": {
"name": "Session History",
"description": "Search past conversation logs for messages, tool calls, errors, and decisions",
"available_in": ["orchestrator"],
},
}

View File

@@ -12,6 +12,7 @@ from timmy.session_logger import (
flush_session_logs,
get_session_logger,
get_session_summary,
session_history,
)
@@ -509,3 +510,420 @@ class TestIntegration:
assert logger.logs_dir == deep_logs_dir
assert deep_logs_dir.exists()
class TestSearch:
"""Tests for SessionLogger.search() method."""
def _write_session_file(self, logs_dir, filename, entries):
"""Helper: write JSONL entries to a session file."""
path = logs_dir / filename
with open(path, "w") as f:
for entry in entries:
f.write(json.dumps(entry) + "\n")
def test_search_finds_message_by_content(self, tmp_path):
"""Should find messages matching query in content field."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "Tell me about bitcoin",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "timmy",
"content": "Bitcoin is a cryptocurrency",
"timestamp": "2026-03-01T10:01:00",
},
{
"type": "message",
"role": "user",
"content": "What about ethereum?",
"timestamp": "2026-03-01T10:02:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 2
assert all("bitcoin" in r.get("content", "").lower() for r in results)
def test_search_case_insensitive(self, tmp_path):
"""Should match regardless of case."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "BITCOIN price?",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 1
def test_search_role_filter(self, tmp_path):
"""Should filter by role when specified."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "Tell me about bitcoin",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "timmy",
"content": "Bitcoin is great",
"timestamp": "2026-03-01T10:01:00",
},
],
)
results = sl.search("bitcoin", role="user")
assert len(results) == 1
assert results[0]["role"] == "user"
def test_search_respects_limit(self, tmp_path):
"""Should return at most 'limit' results."""
sl = SessionLogger(logs_dir=tmp_path)
entries = [
{
"type": "message",
"role": "user",
"content": f"bitcoin msg {i}",
"timestamp": f"2026-03-01T10:{i:02d}:00",
}
for i in range(20)
]
self._write_session_file(tmp_path, "session_2026-03-01.jsonl", entries)
results = sl.search("bitcoin", limit=5)
assert len(results) == 5
def test_search_returns_newest_first(self, tmp_path):
"""Should return most recent matches first."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "old bitcoin",
"timestamp": "2026-03-01T08:00:00",
},
],
)
self._write_session_file(
tmp_path,
"session_2026-03-02.jsonl",
[
{
"type": "message",
"role": "user",
"content": "new bitcoin",
"timestamp": "2026-03-02T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 2
# Newest file first (03-02 > 03-01)
assert "new" in results[0]["content"]
assert "old" in results[1]["content"]
def test_search_across_multiple_files(self, tmp_path):
"""Should search across all session files."""
sl = SessionLogger(logs_dir=tmp_path)
for day in range(1, 4):
self._write_session_file(
tmp_path,
f"session_2026-03-{day:02d}.jsonl",
[
{
"type": "message",
"role": "user",
"content": f"day {day} bitcoin",
"timestamp": f"2026-03-{day:02d}T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 3
def test_search_finds_errors(self, tmp_path):
"""Should match errors by error field."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "error",
"error": "Connection refused to ollama",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:01:00",
},
],
)
results = sl.search("ollama")
assert len(results) == 1
assert results[0]["type"] == "error"
def test_search_finds_tool_calls(self, tmp_path):
"""Should match tool calls by tool name and result."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "tool_call",
"tool": "calculator",
"args": {"expr": "2+2"},
"result": "4",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("calculator")
assert len(results) == 1
assert results[0]["type"] == "tool_call"
def test_search_finds_decisions(self, tmp_path):
"""Should match decisions by decision and rationale fields."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "decision",
"decision": "Use local model",
"rationale": "Privacy matters",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("privacy")
assert len(results) == 1
assert results[0]["type"] == "decision"
def test_search_empty_query_matches_nothing(self, tmp_path):
"""Empty string matches everything (substring of any string)."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:00:00",
},
],
)
# Empty string is a substring of everything
results = sl.search("")
assert len(results) == 1
def test_search_no_results(self, tmp_path):
"""Should return empty list when nothing matches."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello world",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("xyznonexistent")
assert results == []
def test_search_no_session_files(self, tmp_path):
"""Should return empty list when no session files exist."""
sl = SessionLogger(logs_dir=tmp_path)
results = sl.search("anything")
assert results == []
def test_search_skips_malformed_json(self, tmp_path):
"""Should skip lines with bad JSON and continue."""
sl = SessionLogger(logs_dir=tmp_path)
path = tmp_path / "session_2026-03-01.jsonl"
with open(path, "w") as f:
f.write(
'{"type": "message", "role": "user", "content": "good line", "timestamp": "2026-03-01T10:00:00"}\n'
)
f.write("THIS IS NOT JSON\n")
f.write(
'{"type": "message", "role": "user", "content": "also good", "timestamp": "2026-03-01T10:01:00"}\n'
)
results = sl.search("good")
assert len(results) == 2
def test_search_includes_source_file(self, tmp_path):
"""Should tag each result with _source_file."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("hello")
assert results[0]["_source_file"] == "session_2026-03-01.jsonl"
class TestSessionHistoryTool:
"""Tests for the session_history() tool function."""
def test_session_history_returns_formatted_string(self, tmp_path, mock_settings, monkeypatch):
"""Should return a formatted string of results."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
# Write a session file
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "message",
"role": "user",
"content": "bitcoin price",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("bitcoin")
assert "Found 1 result(s)" in result
assert "bitcoin price" in result
def test_session_history_no_results(self, tmp_path, mock_settings, monkeypatch):
"""Should return a 'no results' message."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
result = session_history("xyznonexistent")
assert "No session history found" in result
def test_session_history_role_filter(self, tmp_path, mock_settings, monkeypatch):
"""Should respect role filter parameter."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "message",
"role": "user",
"content": "hello bitcoin",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
f.write(
json.dumps(
{
"type": "message",
"role": "timmy",
"content": "bitcoin info",
"timestamp": "2026-03-01T10:01:00",
}
)
+ "\n"
)
result = session_history("bitcoin", role="timmy")
assert "Found 1 result(s)" in result
assert "timmy:" in result
def test_session_history_formats_errors(self, tmp_path, mock_settings, monkeypatch):
"""Should format error entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{"type": "error", "error": "Ollama timeout", "timestamp": "2026-03-01T10:00:00"}
)
+ "\n"
)
result = session_history("ollama")
assert "ERROR:" in result
def test_session_history_formats_decisions(self, tmp_path, mock_settings, monkeypatch):
"""Should format decision entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "decision",
"decision": "Use local model",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("local model")
assert "DECIDED:" in result
def test_session_history_formats_tool_calls(self, tmp_path, mock_settings, monkeypatch):
"""Should format tool call entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "tool_call",
"tool": "calculator",
"result": "42",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("calculator")
assert "tool:calculator" in result