diff --git a/bin/request_log.py b/bin/request_log.py new file mode 100644 index 00000000..e410d455 --- /dev/null +++ b/bin/request_log.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +Request Log Telemetry — "Verify What Actually Happened" + +Issue #446: [P2.5] request_log Telemetry Table + +Every agent writes a row to request_log for every inference call. +No exceptions. No summarizing. Actual rows. + +This module provides: + - log_inference(): write a telemetry row + - query_requests(): read recent telemetry + - did_agent_call_provider(): answer verification questions + +Database: ~/.local/timmy/request_log.db +Override via REQUEST_LOG_PATH environment variable. +""" + +import os +import sqlite3 +import sys +import json +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Optional, Dict, Any, List + +# Default DB location (matches ansible group_vars/wizards.yml) +DEFAULT_DB_PATH = Path.home() / ".local" / "timmy" / "request_log.db" + + +def get_db_path() -> Path: + """Return the configured request_log database path.""" + env_path = os.environ.get("REQUEST_LOG_PATH") + if env_path: + return Path(env_path).expanduser() + return DEFAULT_DB_PATH + + +def ensure_db() -> Path: + """ + Ensure the database and schema exist. + Creates the DB and schema if missing. + Returns the DB path. + """ + db_path = get_db_path() + db_path.parent.mkdir(parents=True, exist_ok=True) + + if not db_path.exists(): + # Create with schema + schema = """ +CREATE TABLE IF NOT EXISTS request_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT (datetime('now')), + agent_name TEXT NOT NULL, + provider TEXT NOT NULL, + model TEXT NOT NULL, + endpoint TEXT NOT NULL, + tokens_in INTEGER, + tokens_out INTEGER, + latency_ms INTEGER, + status TEXT NOT NULL, + error_message TEXT +); + +CREATE INDEX IF NOT EXISTS idx_request_log_agent + ON request_log (agent_name, timestamp); + +CREATE INDEX IF NOT EXISTS idx_request_log_provider + ON request_log (provider, timestamp); + +CREATE INDEX IF NOT EXISTS idx_request_log_status + ON request_log (status, timestamp); +""" + conn = sqlite3.connect(str(db_path)) + conn.executescript(schema) + conn.commit() + conn.close() + + return db_path + + +def log_inference( + *, + agent_name: str, + provider: str, + model: str, + endpoint: str, + tokens_in: Optional[int] = None, + tokens_out: Optional[int] = None, + latency_ms: Optional[int] = None, + status: str = "success", + error_message: Optional[str] = None, + db_path: Optional[Path] = None, +) -> Optional[int]: + """ + Log a single inference request to the request_log table. + + Args: + agent_name: Name of the agent making the call + provider: Provider name (anthropic, openrouter, ollama, etc.) + model: Model identifier + endpoint: API endpoint called + tokens_in: Input token count (optional but recommended) + tokens_out: Output token count (optional but recommended) + latency_ms: Latency in milliseconds (optional but recommended) + status: One of 'success', 'error', 'timeout', 'fallback' + error_message: Error text if status is error/timeout + db_path: Override DB path (for testing) + + Returns: + Row ID if inserted, None on failure + """ + db = Path(db_path) if db_path else get_db_path() + + try: + # Ensure DB exists + if not db.exists(): + ensure_db() + + conn = sqlite3.connect(str(db)) + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO request_log + (timestamp, agent_name, provider, model, endpoint, + tokens_in, tokens_out, latency_ms, status, error_message) + VALUES (datetime('now'), ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + agent_name, provider, model, endpoint, + tokens_in, tokens_out, latency_ms, status, error_message + )) + row_id = cursor.lastrowid + conn.commit() + conn.close() + return row_id + except Exception as e: + # Never break production — swallow errors for telemetry + return None + + +def query_requests( + *, + agent_name: Optional[str] = None, + provider: Optional[str] = None, + model: Optional[str] = None, + hours: int = 1, + status: Optional[str] = None, + limit: int = 100, + db_path: Optional[Path] = None, +) -> List[Dict[str, Any]]: + """ + Query recent inference logs. + + Args: + agent_name: Filter by agent name + provider: Filter by provider + model: Filter by model + hours: Lookback window (default 1 hour) + status: Filter by status ('success', 'error', etc.) + limit: Max rows to return + db_path: Override DB path + + Returns: + List of matching records as dicts + """ + db = Path(db_path) if db_path else get_db_path() + + if not db.exists(): + return [] + + conditions = ["timestamp > datetime('now', '-' || ? || ' hours')"] + params = [hours] + + if agent_name: + conditions.append("agent_name = ?") + params.append(agent_name) + if provider: + conditions.append("provider = ?") + params.append(provider) + if model: + conditions.append("model = ?") + params.append(model) + if status: + conditions.append("status = ?") + params.append(status) + + where_clause = " AND ".join(conditions) + + try: + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute(f""" + SELECT * FROM request_log + WHERE {where_clause} + ORDER BY timestamp DESC + LIMIT ? + """, tuple(params) + (limit,)) + + rows = [dict(row) for row in cursor.fetchall()] + conn.close() + return rows + except Exception: + return [] + + +def did_agent_call_provider( + agent_name: str, + provider: str, + hours: int = 1, + min_success_count: int = 1, + db_path: Optional[Path] = None, +) -> bool: + """ + Answer: "Did agent X actually call provider Y in the last N hours?" + + Returns True if agent made at least min_success_count successful calls. + """ + rows = query_requests( + agent_name=agent_name, + provider=provider, + hours=hours, + status="success", + db_path=db_path, + ) + return len(rows) >= min_success_count + + +def get_recent_activity_summary(hours: int = 1) -> Dict[str, Any]: + """Get aggregate statistics for recent activity (uses v_recent_activity view if available).""" + db = get_db_path() + if not db.exists(): + return {"error": "Database not found"} + + try: + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # Try the view first + try: + cursor.execute(""" + SELECT agent_name, provider, model, status, + COUNT(*) as call_count, AVG(latency_ms) as avg_latency + FROM request_log + WHERE timestamp > datetime('now', '-' || ? || ' hours') + GROUP BY agent_name, provider, model, status + """, (hours,)) + rows = [dict(row) for row in cursor.fetchall()] + conn.close() + return {"by_agent_provider": rows} + except Exception: + conn.close() + return {"error": "query failed"} + except Exception: + return {"error": "db error"} diff --git a/tests/test_request_log.py b/tests/test_request_log.py new file mode 100644 index 00000000..f09e34a1 --- /dev/null +++ b/tests/test_request_log.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +""" +Tests for bin/request_log.py — Request Log Telemetry. +Issue #446: [P2.5] request_log Telemetry Table — Verify What Actually Happened +""" + +import json +import sqlite3 +import tempfile +import unittest +from pathlib import Path +from datetime import datetime, timezone, timedelta + +import sys +import os +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin")) +from request_log import ( + get_db_path, + ensure_db, + log_inference, + query_requests, + did_agent_call_provider, + get_recent_activity_summary, +) + + +class TestRequestLog(unittest.TestCase): + def setUp(self): + """Create a temporary test database for each test.""" + self.tmpdir = Path(tempfile.mkdtemp()) + self.db_path = self.tmpdir / "test_request_log.db" + # Patch the module's db path by overriding env + self.original_env = os.environ.get("REQUEST_LOG_PATH") + os.environ["REQUEST_LOG_PATH"] = str(self.db_path) + # Clear any cached state + if self.db_path.exists(): + self.db_path.unlink() + + def tearDown(self): + """Clean up test database.""" + if self.db_path.exists(): + self.db_path.unlink() + # Restore env + if self.original_env is not None: + os.environ["REQUEST_LOG_PATH"] = self.original_env + else: + os.environ.pop("REQUEST_LOG_PATH", None) + + def test_ensure_db_creates_schema(self): + """ensure_db() creates the database with correct schema.""" + db = ensure_db() + self.assertTrue(db.exists()) + + # Check table exists + conn = sqlite3.connect(str(db)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='request_log'") + result = cursor.fetchone() + self.assertIsNotNone(result, "request_log table should exist") + # Check indexes exist + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_request_log_agent'") + self.assertIsNotNone(cursor.fetchone()) + conn.close() + + def test_log_inference_inserts_row(self): + """log_inference() inserts a complete row.""" + row_id = log_inference( + agent_name="test-agent", + provider="anthropic", + model="claude-sonnet-4-20250514", + endpoint="/v1/messages", + tokens_in=123, + tokens_out=456, + latency_ms=1500, + status="success", + ) + self.assertIsNotNone(row_id, "Should return row ID") + self.assertGreater(row_id, 0) + + db = get_db_path() + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,)) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row["agent_name"], "test-agent") + self.assertEqual(row["provider"], "anthropic") + self.assertEqual(row["model"], "claude-sonnet-4-20250514") + self.assertEqual(row["endpoint"], "/v1/messages") + self.assertEqual(row["tokens_in"], 123) + self.assertEqual(row["tokens_out"], 456) + self.assertEqual(row["latency_ms"], 1500) + self.assertEqual(row["status"], "success") + + def test_log_inference_minimal_fields(self): + """log_inference() works with only required fields.""" + row_id = log_inference( + agent_name="min-agent", + provider="ollama", + model="hermes3:8b", + endpoint="/v1/chat/completions", + ) + self.assertIsNotNone(row_id) + + db = get_db_path() + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,)) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row["agent_name"], "min-agent") + self.assertEqual(row["tokens_in"], None) + self.assertEqual(row["latency_ms"], None) + + def test_log_inference_error_status(self): + """log_inference() records error status with message.""" + row_id = log_inference( + agent_name="err-agent", + provider="openrouter", + model="claude-opus-4-6", + endpoint="/v1/chat/completions", + status="error", + error_message="Rate limit exceeded", + ) + self.assertIsNotNone(row_id) + + db = get_db_path() + conn = sqlite3.connect(str(db)) + cursor = conn.cursor() + cursor.execute("SELECT error_message FROM request_log WHERE id = ?", (row_id,)) + row = cursor.fetchone() + conn.close() + + self.assertEqual(row[0], "Rate limit exceeded") + + def test_query_requests_filters_by_agent(self): + """query_requests() filters by agent_name.""" + log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e1") + log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e2") + log_inference(agent_name="agent-b", provider="p1", model="m1", endpoint="/e3") + + rows = query_requests(agent_name="agent-a") + self.assertEqual(len(rows), 2) + for row in rows: + self.assertEqual(row["agent_name"], "agent-a") + + def test_query_requests_filters_by_provider(self): + """query_requests() filters by provider.""" + log_inference(agent_name="a1", provider="anthropic", model="claude-4", endpoint="/e") + log_inference(agent_name="a2", provider="openrouter", model="claude-4", endpoint="/e") + log_inference(agent_name="a3", provider="anthropic", model="claude-4", endpoint="/e") + + rows = query_requests(provider="anthropic") + self.assertEqual(len(rows), 2) + for row in rows: + self.assertEqual(row["provider"], "anthropic") + + def test_query_requests_time_window(self): + """query_requests() respects hours parameter.""" + log_inference(agent_name="time-agent", provider="test", model="m", endpoint="/e") + + rows = query_requests(hours=1) + self.assertGreaterEqual(len(rows), 1) + + # 24-hour window should include at least what 1-hour includes + rows_recent = query_requests(hours=24) + self.assertGreaterEqual(len(rows_recent), len(rows)) + + def test_did_agent_call_provider_positive(self): + """did_agent_call_provider() returns True when agent called provider.""" + log_inference( + agent_name="codex-agent", + provider="anthropic", + model="claude-sonnet-4-20250514", + endpoint="/v1/messages", + status="success", + ) + + result = did_agent_call_provider( + agent_name="codex-agent", + provider="anthropic", + hours=24, + ) + self.assertTrue(result) + + def test_did_agent_call_provider_negative_wrong_agent(self): + """did_agent_call_provider() returns False for non-matching agent.""" + log_inference( + agent_name="other-agent", + provider="anthropic", + model="claude-sonnet-4-20250514", + endpoint="/v1/messages", + ) + + result = did_agent_call_provider( + agent_name="codex-agent", + provider="anthropic", + ) + self.assertFalse(result) + + def test_did_agent_call_provider_negative_wrong_provider(self): + """did_agent_call_provider() returns False for non-matching provider.""" + log_inference( + agent_name="codex-agent", + provider="ollama", + model="hermes3:8b", + endpoint="/v1/chat/completions", + ) + + result = did_agent_call_provider( + agent_name="codex-agent", + provider="anthropic", + ) + self.assertFalse(result) + + def test_did_agent_call_provider_min_success_count(self): + """did_agent_call_provider() respects min_success_count.""" + log_inference( + agent_name="agent-x", + provider="p", + model="m", + endpoint="/e", + status="success", + ) + + self.assertTrue(did_agent_call_provider("agent-x", "p", min_success_count=1)) + self.assertFalse(did_agent_call_provider("agent-x", "p", min_success_count=2)) + + def test_log_and_query_by_status(self): + """query_requests() can filter by status.""" + log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="success") + log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="error") + log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="timeout") + + success_rows = query_requests(status="success") + error_rows = query_requests(status="error") + + self.assertEqual(len(success_rows), 1) + self.assertEqual(len(error_rows), 1) + + def test_get_recent_activity_summary(self): + """get_recent_activity_summary() returns structured data.""" + log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=1000) + log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=2000) + + summary = get_recent_activity_summary(hours=24) + self.assertIn("by_agent_provider", summary) + found = any( + r.get("agent_name") == "agent-1" and r.get("provider") == "anthropic" + for r in summary.get("by_agent_provider", []) + ) + self.assertTrue(found, "Should find agent-1/anthropic in summary") + + +if __name__ == "__main__": + unittest.main()