"""Comprehensive unit tests for src/timmy/session_logger.py.""" import json from datetime import date from pathlib import Path from unittest.mock import MagicMock, patch import pytest from timmy.session_logger import ( SessionLogger, flush_session_logs, get_session_logger, get_session_summary, self_reflect, session_history, ) @pytest.fixture(autouse=True) def reset_global_singleton(monkeypatch): """Reset global singleton between tests.""" monkeypatch.setattr("timmy.session_logger._session_logger", None) @pytest.fixture def mock_settings(tmp_path): """Mock config.settings.repo_root to use tmp_path.""" mock_settings = MagicMock() mock_settings.repo_root = str(tmp_path) # Patch at the source where settings is imported from with patch("config.settings", mock_settings): yield mock_settings class TestSessionLoggerInit: """Tests for SessionLogger initialization.""" def test_init_with_custom_logs_dir(self, tmp_path): """Should initialize with custom logs_dir using tmp_path.""" logs_dir = tmp_path / "custom_logs" logger = SessionLogger(logs_dir=logs_dir) assert logger.logs_dir == logs_dir assert logs_dir.exists() def test_init_default_logs_dir(self, mock_settings, tmp_path): """Should default to /logs in repo root when logs_dir is None.""" logger = SessionLogger() assert logger.logs_dir == tmp_path / "logs" assert logger.logs_dir.exists() def test_init_creates_session_file_path(self, tmp_path): """Should create session file path with today's date.""" logger = SessionLogger(logs_dir=tmp_path) today_str = date.today().isoformat() expected_path = tmp_path / f"session_{today_str}.jsonl" assert logger.session_file == expected_path def test_init_initializes_empty_buffer(self, tmp_path): """Should initialize with empty buffer.""" logger = SessionLogger(logs_dir=tmp_path) assert logger._buffer == [] class TestRecordMessage: """Tests for record_message method.""" def test_record_message_without_confidence(self, tmp_path): """Should record message without confidence.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "Hello Timmy") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "message" assert entry["role"] == "user" assert entry["content"] == "Hello Timmy" assert "timestamp" in entry assert "confidence" not in entry def test_record_message_with_confidence(self, tmp_path): """Should record message with confidence.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("timmy", "Hello user", confidence=0.95) assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "message" assert entry["role"] == "timmy" assert entry["content"] == "Hello user" assert entry["confidence"] == 0.95 def test_record_message_multiple(self, tmp_path): """Should record multiple messages.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "First message") logger.record_message("timmy", "Second message", confidence=0.8) logger.record_message("user", "Third message") assert len(logger._buffer) == 3 assert logger._buffer[0]["role"] == "user" assert logger._buffer[1]["role"] == "timmy" assert logger._buffer[2]["role"] == "user" class TestRecordToolCall: """Tests for record_tool_call method.""" def test_record_tool_call_basic(self, tmp_path): """Should record basic tool call.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_tool_call("read_file", {"path": "test.py"}, "file content") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "tool_call" assert entry["tool"] == "read_file" assert entry["args"] == {"path": "test.py"} assert entry["result"] == "file content" assert "timestamp" in entry def test_record_tool_call_result_truncation_over_500_chars(self, tmp_path): """Should truncate result when over 500 characters.""" logger = SessionLogger(logs_dir=tmp_path) long_result = "a" * 600 logger.record_tool_call("search", {"query": "test"}, long_result) entry = logger._buffer[0] assert len(entry["result"]) == 500 assert entry["result"] == "a" * 500 def test_record_tool_call_result_exactly_500_chars(self, tmp_path): """Should not truncate result when exactly 500 characters.""" logger = SessionLogger(logs_dir=tmp_path) exact_result = "b" * 500 logger.record_tool_call("read_file", {"path": "file.txt"}, exact_result) entry = logger._buffer[0] assert len(entry["result"]) == 500 assert entry["result"] == exact_result def test_record_tool_call_result_under_500_chars(self, tmp_path): """Should not truncate result when under 500 characters.""" logger = SessionLogger(logs_dir=tmp_path) short_result = "short result" logger.record_tool_call("calc", {"expr": "1+1"}, short_result) entry = logger._buffer[0] assert entry["result"] == short_result def test_record_tool_call_non_string_result(self, tmp_path): """Should handle non-string result.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_tool_call("calc", {"expr": "1+1"}, 42) entry = logger._buffer[0] assert entry["result"] == "42" def test_record_tool_call_non_string_result_truncation(self, tmp_path): """Should truncate non-string result over 500 chars.""" logger = SessionLogger(logs_dir=tmp_path) long_list = list(range(200)) # Will be longer than 500 chars as string logger.record_tool_call("search", {"query": "test"}, long_list) entry = logger._buffer[0] assert len(entry["result"]) == 500 class TestRecordError: """Tests for record_error method.""" def test_record_error_without_context(self, tmp_path): """Should record error without context.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_error("File not found") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "error" assert entry["error"] == "File not found" assert entry["context"] is None assert "timestamp" in entry def test_record_error_with_context(self, tmp_path): """Should record error with context.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_error("Connection failed", "Reading config file") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "error" assert entry["error"] == "Connection failed" assert entry["context"] == "Reading config file" def test_record_error_multiple(self, tmp_path): """Should record multiple errors.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_error("Error 1", "Context 1") logger.record_error("Error 2") logger.record_error("Error 3", "Context 3") assert len(logger._buffer) == 3 assert logger._buffer[0]["error"] == "Error 1" assert logger._buffer[1]["context"] is None assert logger._buffer[2]["error"] == "Error 3" class TestRecordDecision: """Tests for record_decision method.""" def test_record_decision_without_rationale(self, tmp_path): """Should record decision without rationale.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_decision("Use OOP pattern") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "decision" assert entry["decision"] == "Use OOP pattern" assert entry["rationale"] is None assert "timestamp" in entry def test_record_decision_with_rationale(self, tmp_path): """Should record decision with rationale.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_decision("Refactor to classes", "More maintainable code") assert len(logger._buffer) == 1 entry = logger._buffer[0] assert entry["type"] == "decision" assert entry["decision"] == "Refactor to classes" assert entry["rationale"] == "More maintainable code" def test_record_decision_multiple(self, tmp_path): """Should record multiple decisions.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_decision("Decision 1") logger.record_decision("Decision 2", "Reason 2") logger.record_decision("Decision 3", "Reason 3") assert len(logger._buffer) == 3 assert logger._buffer[0]["rationale"] is None assert logger._buffer[1]["rationale"] == "Reason 2" class TestFlush: """Tests for flush method.""" def test_flush_writes_jsonl(self, tmp_path): """Should write buffer to JSONL file.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "Hello") logger.record_tool_call("read", {"path": "test.txt"}, "content") result_path = logger.flush() assert result_path == logger.session_file assert result_path.exists() # Verify JSONL format lines = result_path.read_text().strip().split("\n") assert len(lines) == 2 # Verify each line is valid JSON entry1 = json.loads(lines[0]) assert entry1["type"] == "message" assert entry1["content"] == "Hello" entry2 = json.loads(lines[1]) assert entry2["type"] == "tool_call" assert entry2["tool"] == "read" def test_flush_clears_buffer(self, tmp_path): """Should clear buffer after flush.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "Hello") logger.record_error("Test error") assert len(logger._buffer) == 2 logger.flush() assert len(logger._buffer) == 0 assert logger._buffer == [] def test_flush_returns_path(self, tmp_path): """Should return path to session file.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "Hello") result_path = logger.flush() assert isinstance(result_path, Path) assert result_path == logger.session_file def test_flush_empty_buffer_returns_path_without_writing(self, tmp_path): """Should return path without writing when buffer is empty.""" logger = SessionLogger(logs_dir=tmp_path) result_path = logger.flush() assert isinstance(result_path, Path) assert result_path == logger.session_file # File should not exist since buffer was empty assert not result_path.exists() def test_flush_appends_to_existing_file(self, tmp_path): """Should append to existing file on subsequent flushes.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "First") logger.flush() logger.record_message("user", "Second") logger.record_error("Error") logger.flush() lines = logger.session_file.read_text().strip().split("\n") assert len(lines) == 3 entries = [json.loads(line) for line in lines] assert entries[0]["content"] == "First" assert entries[1]["content"] == "Second" assert entries[2]["type"] == "error" class TestGetSessionSummary: """Tests for get_session_summary method.""" def test_get_session_summary_file_not_exist(self, tmp_path): """Should return exists=False when file does not exist.""" logger = SessionLogger(logs_dir=tmp_path) summary = logger.get_session_summary() assert summary["exists"] is False assert summary["entries"] == 0 def test_get_session_summary_after_flush(self, tmp_path): """Should return summary after flushing entries.""" logger = SessionLogger(logs_dir=tmp_path) logger.record_message("user", "Hello") logger.record_message("timmy", "Hi") logger.record_tool_call("read", {"path": "test.txt"}, "content") logger.record_error("Test error") logger.record_decision("Use pattern") logger.flush() summary = logger.get_session_summary() assert summary["exists"] is True assert summary["file"] == str(logger.session_file) assert summary["entries"] == 5 assert summary["messages"] == 2 assert summary["tool_calls"] == 1 assert summary["errors"] == 1 assert summary["decisions"] == 1 def test_get_session_summary_empty_file(self, tmp_path): """Should handle empty file.""" logger = SessionLogger(logs_dir=tmp_path) # Create empty file logger.session_file.touch() summary = logger.get_session_summary() assert summary["exists"] is True assert summary["entries"] == 0 def test_get_session_summary_with_blank_lines(self, tmp_path): """Should handle file with blank lines.""" logger = SessionLogger(logs_dir=tmp_path) # Create file with blank lines logger.session_file.write_text( '{"type": "message", "content": "Hello"}\n\n{"type": "error", "error": "Oops"}\n' ) summary = logger.get_session_summary() assert summary["exists"] is True assert summary["entries"] == 2 class TestModuleLevelFunctions: """Tests for module-level functions.""" def test_get_session_logger_returns_singleton(self, mock_settings, tmp_path): """Should return same singleton instance.""" logger1 = get_session_logger() logger2 = get_session_logger() assert logger1 is logger2 assert isinstance(logger1, SessionLogger) def test_get_session_logger_creates_new_when_none(self, mock_settings, tmp_path): """Should create new logger when singleton is None.""" logger = get_session_logger() assert isinstance(logger, SessionLogger) assert logger.logs_dir == tmp_path / "logs" def test_get_session_summary_module_function(self, mock_settings, tmp_path): """Should get summary via module-level function.""" # First, create and flush some logs logger = get_session_logger() logger.record_message("user", "Test") logger.flush() # Call module-level function summary = get_session_summary() assert summary["exists"] is True assert summary["entries"] >= 1 assert summary["messages"] >= 1 def test_flush_session_logs_module_function(self, mock_settings, tmp_path): """Should flush logs via module-level function.""" logger = get_session_logger() logger.record_message("user", "Test message") # Call module-level function result_path_str = flush_session_logs() assert isinstance(result_path_str, str) assert result_path_str == str(logger.session_file) assert logger._buffer == [] def test_flush_session_logs_returns_string_path(self, mock_settings, tmp_path): """Should return string path, not Path object.""" get_session_logger() result = flush_session_logs() assert isinstance(result, str) class TestIntegration: """Integration tests for SessionLogger.""" def test_full_session_workflow(self, tmp_path): """Should handle full session workflow.""" logger = SessionLogger(logs_dir=tmp_path) # Simulate a session logger.record_message("user", "What is 2+2?") logger.record_decision("Use calculator tool", "User needs calculation") logger.record_tool_call("calculate", {"expr": "2+2"}, "4") logger.record_message("timmy", "2+2 is 4", confidence=0.99) # Verify buffer state assert len(logger._buffer) == 4 # Flush and verify logger.flush() assert len(logger._buffer) == 0 # Verify summary summary = logger.get_session_summary() assert summary["entries"] == 4 assert summary["messages"] == 2 assert summary["decisions"] == 1 assert summary["tool_calls"] == 1 assert summary["errors"] == 0 # Verify file content lines = logger.session_file.read_text().strip().split("\n") assert len(lines) == 4 # Load and verify structure entries = [json.loads(line) for line in lines] assert entries[0]["type"] == "message" assert entries[1]["type"] == "decision" assert entries[2]["type"] == "tool_call" assert entries[3]["type"] == "message" def test_logs_dir_created_as_string(self, tmp_path): """Should accept logs_dir as string.""" logs_dir_str = str(tmp_path / "string_logs") logger = SessionLogger(logs_dir=logs_dir_str) assert logger.logs_dir == Path(logs_dir_str) assert logger.logs_dir.exists() def test_logs_dir_created_with_parents(self, tmp_path): """Should create parent directories if needed.""" deep_logs_dir = tmp_path / "very" / "deep" / "logs" logger = SessionLogger(logs_dir=deep_logs_dir) assert logger.logs_dir == deep_logs_dir assert deep_logs_dir.exists() class TestSearch: """Tests for SessionLogger.search() method.""" def _write_session_file(self, logs_dir, filename, entries): """Helper: write JSONL entries to a session file.""" path = logs_dir / filename with open(path, "w") as f: for entry in entries: f.write(json.dumps(entry) + "\n") def test_search_finds_message_by_content(self, tmp_path): """Should find messages matching query in content field.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "Tell me about bitcoin", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "timmy", "content": "Bitcoin is a cryptocurrency", "timestamp": "2026-03-01T10:01:00", }, { "type": "message", "role": "user", "content": "What about ethereum?", "timestamp": "2026-03-01T10:02:00", }, ], ) results = sl.search("bitcoin") assert len(results) == 2 assert all("bitcoin" in r.get("content", "").lower() for r in results) def test_search_case_insensitive(self, tmp_path): """Should match regardless of case.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "BITCOIN price?", "timestamp": "2026-03-01T10:00:00", }, ], ) results = sl.search("bitcoin") assert len(results) == 1 def test_search_role_filter(self, tmp_path): """Should filter by role when specified.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "Tell me about bitcoin", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "timmy", "content": "Bitcoin is great", "timestamp": "2026-03-01T10:01:00", }, ], ) results = sl.search("bitcoin", role="user") assert len(results) == 1 assert results[0]["role"] == "user" def test_search_respects_limit(self, tmp_path): """Should return at most 'limit' results.""" sl = SessionLogger(logs_dir=tmp_path) entries = [ { "type": "message", "role": "user", "content": f"bitcoin msg {i}", "timestamp": f"2026-03-01T10:{i:02d}:00", } for i in range(20) ] self._write_session_file(tmp_path, "session_2026-03-01.jsonl", entries) results = sl.search("bitcoin", limit=5) assert len(results) == 5 def test_search_returns_newest_first(self, tmp_path): """Should return most recent matches first.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "old bitcoin", "timestamp": "2026-03-01T08:00:00", }, ], ) self._write_session_file( tmp_path, "session_2026-03-02.jsonl", [ { "type": "message", "role": "user", "content": "new bitcoin", "timestamp": "2026-03-02T10:00:00", }, ], ) results = sl.search("bitcoin") assert len(results) == 2 # Newest file first (03-02 > 03-01) assert "new" in results[0]["content"] assert "old" in results[1]["content"] def test_search_across_multiple_files(self, tmp_path): """Should search across all session files.""" sl = SessionLogger(logs_dir=tmp_path) for day in range(1, 4): self._write_session_file( tmp_path, f"session_2026-03-{day:02d}.jsonl", [ { "type": "message", "role": "user", "content": f"day {day} bitcoin", "timestamp": f"2026-03-{day:02d}T10:00:00", }, ], ) results = sl.search("bitcoin") assert len(results) == 3 def test_search_finds_errors(self, tmp_path): """Should match errors by error field.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "error", "error": "Connection refused to ollama", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "user", "content": "hello", "timestamp": "2026-03-01T10:01:00", }, ], ) results = sl.search("ollama") assert len(results) == 1 assert results[0]["type"] == "error" def test_search_finds_tool_calls(self, tmp_path): """Should match tool calls by tool name and result.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "tool_call", "tool": "calculator", "args": {"expr": "2+2"}, "result": "4", "timestamp": "2026-03-01T10:00:00", }, ], ) results = sl.search("calculator") assert len(results) == 1 assert results[0]["type"] == "tool_call" def test_search_finds_decisions(self, tmp_path): """Should match decisions by decision and rationale fields.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "decision", "decision": "Use local model", "rationale": "Privacy matters", "timestamp": "2026-03-01T10:00:00", }, ], ) results = sl.search("privacy") assert len(results) == 1 assert results[0]["type"] == "decision" def test_search_empty_query_matches_nothing(self, tmp_path): """Empty string matches everything (substring of any string).""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "hello", "timestamp": "2026-03-01T10:00:00", }, ], ) # Empty string is a substring of everything results = sl.search("") assert len(results) == 1 def test_search_no_results(self, tmp_path): """Should return empty list when nothing matches.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "hello world", "timestamp": "2026-03-01T10:00:00", }, ], ) results = sl.search("xyznonexistent") assert results == [] def test_search_no_session_files(self, tmp_path): """Should return empty list when no session files exist.""" sl = SessionLogger(logs_dir=tmp_path) results = sl.search("anything") assert results == [] def test_search_skips_malformed_json(self, tmp_path): """Should skip lines with bad JSON and continue.""" sl = SessionLogger(logs_dir=tmp_path) path = tmp_path / "session_2026-03-01.jsonl" with open(path, "w") as f: f.write( '{"type": "message", "role": "user", "content": "good line", "timestamp": "2026-03-01T10:00:00"}\n' ) f.write("THIS IS NOT JSON\n") f.write( '{"type": "message", "role": "user", "content": "also good", "timestamp": "2026-03-01T10:01:00"}\n' ) results = sl.search("good") assert len(results) == 2 def test_search_includes_source_file(self, tmp_path): """Should tag each result with _source_file.""" sl = SessionLogger(logs_dir=tmp_path) self._write_session_file( tmp_path, "session_2026-03-01.jsonl", [ { "type": "message", "role": "user", "content": "hello", "timestamp": "2026-03-01T10:00:00", }, ], ) results = sl.search("hello") assert results[0]["_source_file"] == "session_2026-03-01.jsonl" class TestSessionHistoryTool: """Tests for the session_history() tool function.""" def test_session_history_returns_formatted_string(self, tmp_path, mock_settings, monkeypatch): """Should return a formatted string of results.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) # Write a session file path = tmp_path / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: f.write( json.dumps( { "type": "message", "role": "user", "content": "bitcoin price", "timestamp": "2026-03-01T10:00:00", } ) + "\n" ) result = session_history("bitcoin") assert "Found 1 result(s)" in result assert "bitcoin price" in result def test_session_history_no_results(self, tmp_path, mock_settings, monkeypatch): """Should return a 'no results' message.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) result = session_history("xyznonexistent") assert "No session history found" in result def test_session_history_role_filter(self, tmp_path, mock_settings, monkeypatch): """Should respect role filter parameter.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) path = tmp_path / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: f.write( json.dumps( { "type": "message", "role": "user", "content": "hello bitcoin", "timestamp": "2026-03-01T10:00:00", } ) + "\n" ) f.write( json.dumps( { "type": "message", "role": "timmy", "content": "bitcoin info", "timestamp": "2026-03-01T10:01:00", } ) + "\n" ) result = session_history("bitcoin", role="timmy") assert "Found 1 result(s)" in result assert "timmy:" in result def test_session_history_formats_errors(self, tmp_path, mock_settings, monkeypatch): """Should format error entries correctly.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) path = tmp_path / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: f.write( json.dumps( {"type": "error", "error": "Ollama timeout", "timestamp": "2026-03-01T10:00:00"} ) + "\n" ) result = session_history("ollama") assert "ERROR:" in result def test_session_history_formats_decisions(self, tmp_path, mock_settings, monkeypatch): """Should format decision entries correctly.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) path = tmp_path / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: f.write( json.dumps( { "type": "decision", "decision": "Use local model", "timestamp": "2026-03-01T10:00:00", } ) + "\n" ) result = session_history("local model") assert "DECIDED:" in result def test_session_history_formats_tool_calls(self, tmp_path, mock_settings, monkeypatch): """Should format tool call entries correctly.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) path = tmp_path / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: f.write( json.dumps( { "type": "tool_call", "tool": "calculator", "result": "42", "timestamp": "2026-03-01T10:00:00", } ) + "\n" ) result = session_history("calculator") assert "tool:calculator" in result class TestSelfReflect: """Tests for the self_reflect() tool function.""" def _write_entries(self, logs_dir, entries): """Helper: write entries to today's session file.""" path = logs_dir / f"session_{date.today().isoformat()}.jsonl" with open(path, "w") as f: for entry in entries: f.write(json.dumps(entry) + "\n") def test_self_reflect_no_history(self, tmp_path, mock_settings, monkeypatch): """Should return a message when no history exists.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) result = self_reflect() assert "No conversation history" in result def test_self_reflect_detects_low_confidence(self, tmp_path, mock_settings, monkeypatch): """Should flag low-confidence responses.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) self._write_entries( tmp_path, [ { "type": "message", "role": "timmy", "content": "I think maybe it could be X", "confidence": 0.3, "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "timmy", "content": "The answer is Y", "confidence": 0.9, "timestamp": "2026-03-01T10:01:00", }, ], ) result = self_reflect() assert "Low-Confidence Responses (1)" in result assert "confidence=30%" in result def test_self_reflect_detects_errors(self, tmp_path, mock_settings, monkeypatch): """Should report errors in reflection.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) self._write_entries( tmp_path, [ { "type": "error", "error": "Ollama connection refused", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "user", "content": "hello", "timestamp": "2026-03-01T10:01:00", }, ], ) result = self_reflect() assert "Errors (1)" in result assert "Ollama connection refused" in result def test_self_reflect_detects_repeated_topics(self, tmp_path, mock_settings, monkeypatch): """Should identify recurring user topics.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) self._write_entries( tmp_path, [ { "type": "message", "role": "user", "content": "tell me about bitcoin", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "user", "content": "more about bitcoin please", "timestamp": "2026-03-01T10:01:00", }, { "type": "message", "role": "user", "content": "bitcoin price today", "timestamp": "2026-03-01T10:02:00", }, ], ) result = self_reflect() assert "Recurring Topics" in result assert "bitcoin" in result def test_self_reflect_healthy_session(self, tmp_path, mock_settings, monkeypatch): """Should report healthy when no issues found.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) self._write_entries( tmp_path, [ { "type": "message", "role": "user", "content": "hi", "timestamp": "2026-03-01T10:00:00", }, { "type": "message", "role": "timmy", "content": "Hello!", "confidence": 0.9, "timestamp": "2026-03-01T10:01:00", }, ], ) result = self_reflect() assert "Self-Reflection Report" in result assert "Keep up the good work" in result def test_self_reflect_includes_insights(self, tmp_path, mock_settings, monkeypatch): """Should include actionable insights section.""" sl = SessionLogger(logs_dir=tmp_path) monkeypatch.setattr("timmy.session_logger._session_logger", sl) self._write_entries( tmp_path, [ { "type": "message", "role": "timmy", "content": "I'm not sure about this", "confidence": 0.2, "timestamp": "2026-03-01T10:00:00", }, ], ) result = self_reflect() assert "Insights" in result assert "confidence was low" in result