feat: session_history tool — Timmy searches past conversations (#251)
All checks were successful
Tests / lint (pull_request) Successful in 3s
Tests / test (pull_request) Successful in 53s

Add search() method to SessionLogger and session_history() tool function.
- Case-insensitive substring search across all session JSONL files
- Optional role filter (user/timmy/system)
- Results returned newest-first with configurable limit
- Searches messages, tool calls, errors, decisions, and rationale
- Registered in introspection toolkit for orchestrator access
- 22 new tests covering search + tool formatting
This commit is contained in:
2026-03-15 15:10:31 -04:00
parent 7f656fcf22
commit e8ece54fd9
5 changed files with 559 additions and 7 deletions

View File

@@ -41,10 +41,32 @@ _SUMERIAN: dict[str, int] = {c: v * 6 for c, v in _SIMPLE.items()}
# Aleph=1..Tet=9, Yod=10..Tsade=90, Qoph=100..Tav=400
# Standard mapping for the 22 Hebrew letters extended to 26 Latin chars
_HEBREW: dict[str, int] = {
"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7, "H": 8, "I": 9,
"J": 10, "K": 20, "L": 30, "M": 40, "N": 50, "O": 60, "P": 70, "Q": 80,
"R": 90, "S": 100, "T": 200, "U": 300, "V": 400, "W": 500, "X": 600,
"Y": 700, "Z": 800,
"A": 1,
"B": 2,
"C": 3,
"D": 4,
"E": 5,
"F": 6,
"G": 7,
"H": 8,
"I": 9,
"J": 10,
"K": 20,
"L": 30,
"M": 40,
"N": 50,
"O": 60,
"P": 70,
"Q": 80,
"R": 90,
"S": 100,
"T": 200,
"U": 300,
"V": 400,
"W": 500,
"X": 600,
"Y": 700,
"Z": 800,
}
CIPHERS: dict[str, dict[str, int]] = {
@@ -317,9 +339,9 @@ def _format_number_analysis(n: int) -> str:
lines.append(f" Digit sum: {info['digit_sum']}")
if info.get("master_number"):
lines.append(f" ★ Master Number")
lines.append(" ★ Master Number")
if info.get("angel_number"):
lines.append(f" ★ Angel Number (repeating digits)")
lines.append(" ★ Angel Number (repeating digits)")
if info.get("significance"):
lines.append(f"\n Significance: {info['significance']}")

View File

@@ -155,6 +155,56 @@ class SessionLogger:
"decisions": sum(1 for e in entries if e.get("type") == "decision"),
}
def search(self, query: str, role: str | None = None, limit: int = 10) -> list[dict]:
"""Search across all session logs for entries matching a query.
Args:
query: Case-insensitive substring to search for.
role: Optional role filter ("user", "timmy", "system").
limit: Maximum number of results to return.
Returns:
List of matching entries (most recent first), each with
type, timestamp, and relevant content fields.
"""
query_lower = query.lower()
matches: list[dict] = []
# Collect all session files, sorted newest first
log_files = sorted(self.logs_dir.glob("session_*.jsonl"), reverse=True)
for log_file in log_files:
if len(matches) >= limit:
break
try:
with open(log_file) as f:
# Read all lines, reverse so newest entries come first
lines = [ln for ln in f if ln.strip()]
for line in reversed(lines):
if len(matches) >= limit:
break
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
# Role filter
if role and entry.get("role") != role:
continue
# Search in text-bearing fields
searchable = " ".join(
str(entry.get(k, ""))
for k in ("content", "error", "decision", "rationale", "result", "tool")
).lower()
if query_lower in searchable:
entry["_source_file"] = log_file.name
matches.append(entry)
except OSError:
continue
return matches
# Global session logger instance
_session_logger: SessionLogger | None = None
@@ -187,3 +237,53 @@ def flush_session_logs() -> str:
logger = get_session_logger()
path = logger.flush()
return str(path)
def session_history(query: str, role: str = "", limit: int = 10) -> str:
"""Search Timmy's past conversation history.
Find messages, tool calls, errors, and decisions from past sessions
that match the query. Results are returned most-recent first.
Args:
query: What to search for (case-insensitive substring match).
role: Optional filter by role — "user", "timmy", or "" for all.
limit: Maximum results to return (default 10).
Returns:
Formatted string of matching session entries.
"""
sl = get_session_logger()
# Flush buffer first so current session is searchable
sl.flush()
results = sl.search(query, role=role or None, limit=limit)
if not results:
return f"No session history found matching '{query}'."
lines = [f"Found {len(results)} result(s) for '{query}':\n"]
for entry in results:
ts = entry.get("timestamp", "?")[:19]
etype = entry.get("type", "?")
source = entry.get("_source_file", "")
if etype == "message":
who = entry.get("role", "?")
text = entry.get("content", "")[:200]
lines.append(f"[{ts}] {who}: {text}")
elif etype == "tool_call":
tool = entry.get("tool", "?")
result = entry.get("result", "")[:100]
lines.append(f"[{ts}] tool:{tool}{result}")
elif etype == "error":
err = entry.get("error", "")[:200]
lines.append(f"[{ts}] ERROR: {err}")
elif etype == "decision":
dec = entry.get("decision", "")[:200]
lines.append(f"[{ts}] DECIDED: {dec}")
else:
lines.append(f"[{ts}] {etype}: {json.dumps(entry)[:200]}")
if source:
lines[-1] += f" ({source})"
return "\n".join(lines)

View File

@@ -586,6 +586,14 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
logger.debug("Introspection tools not available")
try:
from timmy.session_logger import session_history
toolkit.register(session_history, name="session_history")
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (session_history registration): %s", exc)
logger.debug("session_history tool not available")
def _register_delegation_tools(toolkit: Toolkit) -> None:
"""Register inter-agent delegation tools."""
@@ -824,6 +832,11 @@ def _introspection_tool_catalog() -> dict:
"description": "Check status of memory tiers (hot memory, vault)",
"available_in": ["orchestrator"],
},
"session_history": {
"name": "Session History",
"description": "Search past conversation logs for messages, tool calls, errors, and decisions",
"available_in": ["orchestrator"],
},
}

View File

@@ -20,7 +20,6 @@ from timmy.gematria import (
reduce_number,
)
# ── Core cipher computation ──────────────────────────────────────────────────

View File

@@ -12,6 +12,7 @@ from timmy.session_logger import (
flush_session_logs,
get_session_logger,
get_session_summary,
session_history,
)
@@ -509,3 +510,420 @@ class TestIntegration:
assert logger.logs_dir == deep_logs_dir
assert deep_logs_dir.exists()
class TestSearch:
"""Tests for SessionLogger.search() method."""
def _write_session_file(self, logs_dir, filename, entries):
"""Helper: write JSONL entries to a session file."""
path = logs_dir / filename
with open(path, "w") as f:
for entry in entries:
f.write(json.dumps(entry) + "\n")
def test_search_finds_message_by_content(self, tmp_path):
"""Should find messages matching query in content field."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "Tell me about bitcoin",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "timmy",
"content": "Bitcoin is a cryptocurrency",
"timestamp": "2026-03-01T10:01:00",
},
{
"type": "message",
"role": "user",
"content": "What about ethereum?",
"timestamp": "2026-03-01T10:02:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 2
assert all("bitcoin" in r.get("content", "").lower() for r in results)
def test_search_case_insensitive(self, tmp_path):
"""Should match regardless of case."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "BITCOIN price?",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 1
def test_search_role_filter(self, tmp_path):
"""Should filter by role when specified."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "Tell me about bitcoin",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "timmy",
"content": "Bitcoin is great",
"timestamp": "2026-03-01T10:01:00",
},
],
)
results = sl.search("bitcoin", role="user")
assert len(results) == 1
assert results[0]["role"] == "user"
def test_search_respects_limit(self, tmp_path):
"""Should return at most 'limit' results."""
sl = SessionLogger(logs_dir=tmp_path)
entries = [
{
"type": "message",
"role": "user",
"content": f"bitcoin msg {i}",
"timestamp": f"2026-03-01T10:{i:02d}:00",
}
for i in range(20)
]
self._write_session_file(tmp_path, "session_2026-03-01.jsonl", entries)
results = sl.search("bitcoin", limit=5)
assert len(results) == 5
def test_search_returns_newest_first(self, tmp_path):
"""Should return most recent matches first."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "old bitcoin",
"timestamp": "2026-03-01T08:00:00",
},
],
)
self._write_session_file(
tmp_path,
"session_2026-03-02.jsonl",
[
{
"type": "message",
"role": "user",
"content": "new bitcoin",
"timestamp": "2026-03-02T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 2
# Newest file first (03-02 > 03-01)
assert "new" in results[0]["content"]
assert "old" in results[1]["content"]
def test_search_across_multiple_files(self, tmp_path):
"""Should search across all session files."""
sl = SessionLogger(logs_dir=tmp_path)
for day in range(1, 4):
self._write_session_file(
tmp_path,
f"session_2026-03-{day:02d}.jsonl",
[
{
"type": "message",
"role": "user",
"content": f"day {day} bitcoin",
"timestamp": f"2026-03-{day:02d}T10:00:00",
},
],
)
results = sl.search("bitcoin")
assert len(results) == 3
def test_search_finds_errors(self, tmp_path):
"""Should match errors by error field."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "error",
"error": "Connection refused to ollama",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:01:00",
},
],
)
results = sl.search("ollama")
assert len(results) == 1
assert results[0]["type"] == "error"
def test_search_finds_tool_calls(self, tmp_path):
"""Should match tool calls by tool name and result."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "tool_call",
"tool": "calculator",
"args": {"expr": "2+2"},
"result": "4",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("calculator")
assert len(results) == 1
assert results[0]["type"] == "tool_call"
def test_search_finds_decisions(self, tmp_path):
"""Should match decisions by decision and rationale fields."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "decision",
"decision": "Use local model",
"rationale": "Privacy matters",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("privacy")
assert len(results) == 1
assert results[0]["type"] == "decision"
def test_search_empty_query_matches_nothing(self, tmp_path):
"""Empty string matches everything (substring of any string)."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:00:00",
},
],
)
# Empty string is a substring of everything
results = sl.search("")
assert len(results) == 1
def test_search_no_results(self, tmp_path):
"""Should return empty list when nothing matches."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello world",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("xyznonexistent")
assert results == []
def test_search_no_session_files(self, tmp_path):
"""Should return empty list when no session files exist."""
sl = SessionLogger(logs_dir=tmp_path)
results = sl.search("anything")
assert results == []
def test_search_skips_malformed_json(self, tmp_path):
"""Should skip lines with bad JSON and continue."""
sl = SessionLogger(logs_dir=tmp_path)
path = tmp_path / "session_2026-03-01.jsonl"
with open(path, "w") as f:
f.write(
'{"type": "message", "role": "user", "content": "good line", "timestamp": "2026-03-01T10:00:00"}\n'
)
f.write("THIS IS NOT JSON\n")
f.write(
'{"type": "message", "role": "user", "content": "also good", "timestamp": "2026-03-01T10:01:00"}\n'
)
results = sl.search("good")
assert len(results) == 2
def test_search_includes_source_file(self, tmp_path):
"""Should tag each result with _source_file."""
sl = SessionLogger(logs_dir=tmp_path)
self._write_session_file(
tmp_path,
"session_2026-03-01.jsonl",
[
{
"type": "message",
"role": "user",
"content": "hello",
"timestamp": "2026-03-01T10:00:00",
},
],
)
results = sl.search("hello")
assert results[0]["_source_file"] == "session_2026-03-01.jsonl"
class TestSessionHistoryTool:
"""Tests for the session_history() tool function."""
def test_session_history_returns_formatted_string(self, tmp_path, mock_settings, monkeypatch):
"""Should return a formatted string of results."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
# Write a session file
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "message",
"role": "user",
"content": "bitcoin price",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("bitcoin")
assert "Found 1 result(s)" in result
assert "bitcoin price" in result
def test_session_history_no_results(self, tmp_path, mock_settings, monkeypatch):
"""Should return a 'no results' message."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
result = session_history("xyznonexistent")
assert "No session history found" in result
def test_session_history_role_filter(self, tmp_path, mock_settings, monkeypatch):
"""Should respect role filter parameter."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "message",
"role": "user",
"content": "hello bitcoin",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
f.write(
json.dumps(
{
"type": "message",
"role": "timmy",
"content": "bitcoin info",
"timestamp": "2026-03-01T10:01:00",
}
)
+ "\n"
)
result = session_history("bitcoin", role="timmy")
assert "Found 1 result(s)" in result
assert "timmy:" in result
def test_session_history_formats_errors(self, tmp_path, mock_settings, monkeypatch):
"""Should format error entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{"type": "error", "error": "Ollama timeout", "timestamp": "2026-03-01T10:00:00"}
)
+ "\n"
)
result = session_history("ollama")
assert "ERROR:" in result
def test_session_history_formats_decisions(self, tmp_path, mock_settings, monkeypatch):
"""Should format decision entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "decision",
"decision": "Use local model",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("local model")
assert "DECIDED:" in result
def test_session_history_formats_tool_calls(self, tmp_path, mock_settings, monkeypatch):
"""Should format tool call entries correctly."""
sl = SessionLogger(logs_dir=tmp_path)
monkeypatch.setattr("timmy.session_logger._session_logger", sl)
path = tmp_path / f"session_{date.today().isoformat()}.jsonl"
with open(path, "w") as f:
f.write(
json.dumps(
{
"type": "tool_call",
"tool": "calculator",
"result": "42",
"timestamp": "2026-03-01T10:00:00",
}
)
+ "\n"
)
result = session_history("calculator")
assert "tool:calculator" in result