#!/usr/bin/env python3 """ Tests for bin/request_log.py — Request Log Telemetry. Issue #446: [P2.5] request_log Telemetry Table — Verify What Actually Happened """ import json import sqlite3 import tempfile import unittest from pathlib import Path from datetime import datetime, timezone, timedelta import sys import os sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin")) from request_log import ( get_db_path, ensure_db, log_inference, query_requests, did_agent_call_provider, get_recent_activity_summary, ) class TestRequestLog(unittest.TestCase): def setUp(self): """Create a temporary test database for each test.""" self.tmpdir = Path(tempfile.mkdtemp()) self.db_path = self.tmpdir / "test_request_log.db" # Patch the module's db path by overriding env self.original_env = os.environ.get("REQUEST_LOG_PATH") os.environ["REQUEST_LOG_PATH"] = str(self.db_path) # Clear any cached state if self.db_path.exists(): self.db_path.unlink() def tearDown(self): """Clean up test database.""" if self.db_path.exists(): self.db_path.unlink() # Restore env if self.original_env is not None: os.environ["REQUEST_LOG_PATH"] = self.original_env else: os.environ.pop("REQUEST_LOG_PATH", None) def test_ensure_db_creates_schema(self): """ensure_db() creates the database with correct schema.""" db = ensure_db() self.assertTrue(db.exists()) # Check table exists conn = sqlite3.connect(str(db)) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='request_log'") result = cursor.fetchone() self.assertIsNotNone(result, "request_log table should exist") # Check indexes exist cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_request_log_agent'") self.assertIsNotNone(cursor.fetchone()) conn.close() def test_log_inference_inserts_row(self): """log_inference() inserts a complete row.""" row_id = log_inference( agent_name="test-agent", provider="anthropic", model="claude-sonnet-4-20250514", endpoint="/v1/messages", tokens_in=123, tokens_out=456, latency_ms=1500, status="success", ) self.assertIsNotNone(row_id, "Should return row ID") self.assertGreater(row_id, 0) db = get_db_path() conn = sqlite3.connect(str(db)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,)) row = cursor.fetchone() conn.close() self.assertEqual(row["agent_name"], "test-agent") self.assertEqual(row["provider"], "anthropic") self.assertEqual(row["model"], "claude-sonnet-4-20250514") self.assertEqual(row["endpoint"], "/v1/messages") self.assertEqual(row["tokens_in"], 123) self.assertEqual(row["tokens_out"], 456) self.assertEqual(row["latency_ms"], 1500) self.assertEqual(row["status"], "success") def test_log_inference_minimal_fields(self): """log_inference() works with only required fields.""" row_id = log_inference( agent_name="min-agent", provider="ollama", model="hermes3:8b", endpoint="/v1/chat/completions", ) self.assertIsNotNone(row_id) db = get_db_path() conn = sqlite3.connect(str(db)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,)) row = cursor.fetchone() conn.close() self.assertEqual(row["agent_name"], "min-agent") self.assertEqual(row["tokens_in"], None) self.assertEqual(row["latency_ms"], None) def test_log_inference_error_status(self): """log_inference() records error status with message.""" row_id = log_inference( agent_name="err-agent", provider="openrouter", model="claude-opus-4-6", endpoint="/v1/chat/completions", status="error", error_message="Rate limit exceeded", ) self.assertIsNotNone(row_id) db = get_db_path() conn = sqlite3.connect(str(db)) cursor = conn.cursor() cursor.execute("SELECT error_message FROM request_log WHERE id = ?", (row_id,)) row = cursor.fetchone() conn.close() self.assertEqual(row[0], "Rate limit exceeded") def test_query_requests_filters_by_agent(self): """query_requests() filters by agent_name.""" log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e1") log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e2") log_inference(agent_name="agent-b", provider="p1", model="m1", endpoint="/e3") rows = query_requests(agent_name="agent-a") self.assertEqual(len(rows), 2) for row in rows: self.assertEqual(row["agent_name"], "agent-a") def test_query_requests_filters_by_provider(self): """query_requests() filters by provider.""" log_inference(agent_name="a1", provider="anthropic", model="claude-4", endpoint="/e") log_inference(agent_name="a2", provider="openrouter", model="claude-4", endpoint="/e") log_inference(agent_name="a3", provider="anthropic", model="claude-4", endpoint="/e") rows = query_requests(provider="anthropic") self.assertEqual(len(rows), 2) for row in rows: self.assertEqual(row["provider"], "anthropic") def test_query_requests_time_window(self): """query_requests() respects hours parameter.""" log_inference(agent_name="time-agent", provider="test", model="m", endpoint="/e") rows = query_requests(hours=1) self.assertGreaterEqual(len(rows), 1) # 24-hour window should include at least what 1-hour includes rows_recent = query_requests(hours=24) self.assertGreaterEqual(len(rows_recent), len(rows)) def test_did_agent_call_provider_positive(self): """did_agent_call_provider() returns True when agent called provider.""" log_inference( agent_name="codex-agent", provider="anthropic", model="claude-sonnet-4-20250514", endpoint="/v1/messages", status="success", ) result = did_agent_call_provider( agent_name="codex-agent", provider="anthropic", hours=24, ) self.assertTrue(result) def test_did_agent_call_provider_negative_wrong_agent(self): """did_agent_call_provider() returns False for non-matching agent.""" log_inference( agent_name="other-agent", provider="anthropic", model="claude-sonnet-4-20250514", endpoint="/v1/messages", ) result = did_agent_call_provider( agent_name="codex-agent", provider="anthropic", ) self.assertFalse(result) def test_did_agent_call_provider_negative_wrong_provider(self): """did_agent_call_provider() returns False for non-matching provider.""" log_inference( agent_name="codex-agent", provider="ollama", model="hermes3:8b", endpoint="/v1/chat/completions", ) result = did_agent_call_provider( agent_name="codex-agent", provider="anthropic", ) self.assertFalse(result) def test_did_agent_call_provider_min_success_count(self): """did_agent_call_provider() respects min_success_count.""" log_inference( agent_name="agent-x", provider="p", model="m", endpoint="/e", status="success", ) self.assertTrue(did_agent_call_provider("agent-x", "p", min_success_count=1)) self.assertFalse(did_agent_call_provider("agent-x", "p", min_success_count=2)) def test_log_and_query_by_status(self): """query_requests() can filter by status.""" log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="success") log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="error") log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="timeout") success_rows = query_requests(status="success") error_rows = query_requests(status="error") self.assertEqual(len(success_rows), 1) self.assertEqual(len(error_rows), 1) def test_get_recent_activity_summary(self): """get_recent_activity_summary() returns structured data.""" log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=1000) log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=2000) summary = get_recent_activity_summary(hours=24) self.assertIn("by_agent_provider", summary) found = any( r.get("agent_name") == "agent-1" and r.get("provider") == "anthropic" for r in summary.get("by_agent_provider", []) ) self.assertTrue(found, "Should find agent-1/anthropic in summary") if __name__ == "__main__": unittest.main()