Compare commits
1 Commits
step35/443
...
step35/446
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7647e9172a |
255
bin/request_log.py
Normal file
255
bin/request_log.py
Normal file
@@ -0,0 +1,255 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Request Log Telemetry — "Verify What Actually Happened"
|
||||
|
||||
Issue #446: [P2.5] request_log Telemetry Table
|
||||
|
||||
Every agent writes a row to request_log for every inference call.
|
||||
No exceptions. No summarizing. Actual rows.
|
||||
|
||||
This module provides:
|
||||
- log_inference(): write a telemetry row
|
||||
- query_requests(): read recent telemetry
|
||||
- did_agent_call_provider(): answer verification questions
|
||||
|
||||
Database: ~/.local/timmy/request_log.db
|
||||
Override via REQUEST_LOG_PATH environment variable.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import json
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
# Default DB location (matches ansible group_vars/wizards.yml)
|
||||
DEFAULT_DB_PATH = Path.home() / ".local" / "timmy" / "request_log.db"
|
||||
|
||||
|
||||
def get_db_path() -> Path:
|
||||
"""Return the configured request_log database path."""
|
||||
env_path = os.environ.get("REQUEST_LOG_PATH")
|
||||
if env_path:
|
||||
return Path(env_path).expanduser()
|
||||
return DEFAULT_DB_PATH
|
||||
|
||||
|
||||
def ensure_db() -> Path:
|
||||
"""
|
||||
Ensure the database and schema exist.
|
||||
Creates the DB and schema if missing.
|
||||
Returns the DB path.
|
||||
"""
|
||||
db_path = get_db_path()
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not db_path.exists():
|
||||
# Create with schema
|
||||
schema = """
|
||||
CREATE TABLE IF NOT EXISTS request_log (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
agent_name TEXT NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
endpoint TEXT NOT NULL,
|
||||
tokens_in INTEGER,
|
||||
tokens_out INTEGER,
|
||||
latency_ms INTEGER,
|
||||
status TEXT NOT NULL,
|
||||
error_message TEXT
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_request_log_agent
|
||||
ON request_log (agent_name, timestamp);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_request_log_provider
|
||||
ON request_log (provider, timestamp);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_request_log_status
|
||||
ON request_log (status, timestamp);
|
||||
"""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.executescript(schema)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return db_path
|
||||
|
||||
|
||||
def log_inference(
|
||||
*,
|
||||
agent_name: str,
|
||||
provider: str,
|
||||
model: str,
|
||||
endpoint: str,
|
||||
tokens_in: Optional[int] = None,
|
||||
tokens_out: Optional[int] = None,
|
||||
latency_ms: Optional[int] = None,
|
||||
status: str = "success",
|
||||
error_message: Optional[str] = None,
|
||||
db_path: Optional[Path] = None,
|
||||
) -> Optional[int]:
|
||||
"""
|
||||
Log a single inference request to the request_log table.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent making the call
|
||||
provider: Provider name (anthropic, openrouter, ollama, etc.)
|
||||
model: Model identifier
|
||||
endpoint: API endpoint called
|
||||
tokens_in: Input token count (optional but recommended)
|
||||
tokens_out: Output token count (optional but recommended)
|
||||
latency_ms: Latency in milliseconds (optional but recommended)
|
||||
status: One of 'success', 'error', 'timeout', 'fallback'
|
||||
error_message: Error text if status is error/timeout
|
||||
db_path: Override DB path (for testing)
|
||||
|
||||
Returns:
|
||||
Row ID if inserted, None on failure
|
||||
"""
|
||||
db = Path(db_path) if db_path else get_db_path()
|
||||
|
||||
try:
|
||||
# Ensure DB exists
|
||||
if not db.exists():
|
||||
ensure_db()
|
||||
|
||||
conn = sqlite3.connect(str(db))
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
INSERT INTO request_log
|
||||
(timestamp, agent_name, provider, model, endpoint,
|
||||
tokens_in, tokens_out, latency_ms, status, error_message)
|
||||
VALUES (datetime('now'), ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
agent_name, provider, model, endpoint,
|
||||
tokens_in, tokens_out, latency_ms, status, error_message
|
||||
))
|
||||
row_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return row_id
|
||||
except Exception as e:
|
||||
# Never break production — swallow errors for telemetry
|
||||
return None
|
||||
|
||||
|
||||
def query_requests(
|
||||
*,
|
||||
agent_name: Optional[str] = None,
|
||||
provider: Optional[str] = None,
|
||||
model: Optional[str] = None,
|
||||
hours: int = 1,
|
||||
status: Optional[str] = None,
|
||||
limit: int = 100,
|
||||
db_path: Optional[Path] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Query recent inference logs.
|
||||
|
||||
Args:
|
||||
agent_name: Filter by agent name
|
||||
provider: Filter by provider
|
||||
model: Filter by model
|
||||
hours: Lookback window (default 1 hour)
|
||||
status: Filter by status ('success', 'error', etc.)
|
||||
limit: Max rows to return
|
||||
db_path: Override DB path
|
||||
|
||||
Returns:
|
||||
List of matching records as dicts
|
||||
"""
|
||||
db = Path(db_path) if db_path else get_db_path()
|
||||
|
||||
if not db.exists():
|
||||
return []
|
||||
|
||||
conditions = ["timestamp > datetime('now', '-' || ? || ' hours')"]
|
||||
params = [hours]
|
||||
|
||||
if agent_name:
|
||||
conditions.append("agent_name = ?")
|
||||
params.append(agent_name)
|
||||
if provider:
|
||||
conditions.append("provider = ?")
|
||||
params.append(provider)
|
||||
if model:
|
||||
conditions.append("model = ?")
|
||||
params.append(model)
|
||||
if status:
|
||||
conditions.append("status = ?")
|
||||
params.append(status)
|
||||
|
||||
where_clause = " AND ".join(conditions)
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(f"""
|
||||
SELECT * FROM request_log
|
||||
WHERE {where_clause}
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
""", tuple(params) + (limit,))
|
||||
|
||||
rows = [dict(row) for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return rows
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def did_agent_call_provider(
|
||||
agent_name: str,
|
||||
provider: str,
|
||||
hours: int = 1,
|
||||
min_success_count: int = 1,
|
||||
db_path: Optional[Path] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Answer: "Did agent X actually call provider Y in the last N hours?"
|
||||
|
||||
Returns True if agent made at least min_success_count successful calls.
|
||||
"""
|
||||
rows = query_requests(
|
||||
agent_name=agent_name,
|
||||
provider=provider,
|
||||
hours=hours,
|
||||
status="success",
|
||||
db_path=db_path,
|
||||
)
|
||||
return len(rows) >= min_success_count
|
||||
|
||||
|
||||
def get_recent_activity_summary(hours: int = 1) -> Dict[str, Any]:
|
||||
"""Get aggregate statistics for recent activity (uses v_recent_activity view if available)."""
|
||||
db = get_db_path()
|
||||
if not db.exists():
|
||||
return {"error": "Database not found"}
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Try the view first
|
||||
try:
|
||||
cursor.execute("""
|
||||
SELECT agent_name, provider, model, status,
|
||||
COUNT(*) as call_count, AVG(latency_ms) as avg_latency
|
||||
FROM request_log
|
||||
WHERE timestamp > datetime('now', '-' || ? || ' hours')
|
||||
GROUP BY agent_name, provider, model, status
|
||||
""", (hours,))
|
||||
rows = [dict(row) for row in cursor.fetchall()]
|
||||
conn.close()
|
||||
return {"by_agent_provider": rows}
|
||||
except Exception:
|
||||
conn.close()
|
||||
return {"error": "query failed"}
|
||||
except Exception:
|
||||
return {"error": "db error"}
|
||||
260
tests/test_request_log.py
Normal file
260
tests/test_request_log.py
Normal file
@@ -0,0 +1,260 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for bin/request_log.py — Request Log Telemetry.
|
||||
Issue #446: [P2.5] request_log Telemetry Table — Verify What Actually Happened
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin"))
|
||||
from request_log import (
|
||||
get_db_path,
|
||||
ensure_db,
|
||||
log_inference,
|
||||
query_requests,
|
||||
did_agent_call_provider,
|
||||
get_recent_activity_summary,
|
||||
)
|
||||
|
||||
|
||||
class TestRequestLog(unittest.TestCase):
|
||||
def setUp(self):
|
||||
"""Create a temporary test database for each test."""
|
||||
self.tmpdir = Path(tempfile.mkdtemp())
|
||||
self.db_path = self.tmpdir / "test_request_log.db"
|
||||
# Patch the module's db path by overriding env
|
||||
self.original_env = os.environ.get("REQUEST_LOG_PATH")
|
||||
os.environ["REQUEST_LOG_PATH"] = str(self.db_path)
|
||||
# Clear any cached state
|
||||
if self.db_path.exists():
|
||||
self.db_path.unlink()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up test database."""
|
||||
if self.db_path.exists():
|
||||
self.db_path.unlink()
|
||||
# Restore env
|
||||
if self.original_env is not None:
|
||||
os.environ["REQUEST_LOG_PATH"] = self.original_env
|
||||
else:
|
||||
os.environ.pop("REQUEST_LOG_PATH", None)
|
||||
|
||||
def test_ensure_db_creates_schema(self):
|
||||
"""ensure_db() creates the database with correct schema."""
|
||||
db = ensure_db()
|
||||
self.assertTrue(db.exists())
|
||||
|
||||
# Check table exists
|
||||
conn = sqlite3.connect(str(db))
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='request_log'")
|
||||
result = cursor.fetchone()
|
||||
self.assertIsNotNone(result, "request_log table should exist")
|
||||
# Check indexes exist
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_request_log_agent'")
|
||||
self.assertIsNotNone(cursor.fetchone())
|
||||
conn.close()
|
||||
|
||||
def test_log_inference_inserts_row(self):
|
||||
"""log_inference() inserts a complete row."""
|
||||
row_id = log_inference(
|
||||
agent_name="test-agent",
|
||||
provider="anthropic",
|
||||
model="claude-sonnet-4-20250514",
|
||||
endpoint="/v1/messages",
|
||||
tokens_in=123,
|
||||
tokens_out=456,
|
||||
latency_ms=1500,
|
||||
status="success",
|
||||
)
|
||||
self.assertIsNotNone(row_id, "Should return row ID")
|
||||
self.assertGreater(row_id, 0)
|
||||
|
||||
db = get_db_path()
|
||||
conn = sqlite3.connect(str(db))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,))
|
||||
row = cursor.fetchone()
|
||||
conn.close()
|
||||
|
||||
self.assertEqual(row["agent_name"], "test-agent")
|
||||
self.assertEqual(row["provider"], "anthropic")
|
||||
self.assertEqual(row["model"], "claude-sonnet-4-20250514")
|
||||
self.assertEqual(row["endpoint"], "/v1/messages")
|
||||
self.assertEqual(row["tokens_in"], 123)
|
||||
self.assertEqual(row["tokens_out"], 456)
|
||||
self.assertEqual(row["latency_ms"], 1500)
|
||||
self.assertEqual(row["status"], "success")
|
||||
|
||||
def test_log_inference_minimal_fields(self):
|
||||
"""log_inference() works with only required fields."""
|
||||
row_id = log_inference(
|
||||
agent_name="min-agent",
|
||||
provider="ollama",
|
||||
model="hermes3:8b",
|
||||
endpoint="/v1/chat/completions",
|
||||
)
|
||||
self.assertIsNotNone(row_id)
|
||||
|
||||
db = get_db_path()
|
||||
conn = sqlite3.connect(str(db))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,))
|
||||
row = cursor.fetchone()
|
||||
conn.close()
|
||||
|
||||
self.assertEqual(row["agent_name"], "min-agent")
|
||||
self.assertEqual(row["tokens_in"], None)
|
||||
self.assertEqual(row["latency_ms"], None)
|
||||
|
||||
def test_log_inference_error_status(self):
|
||||
"""log_inference() records error status with message."""
|
||||
row_id = log_inference(
|
||||
agent_name="err-agent",
|
||||
provider="openrouter",
|
||||
model="claude-opus-4-6",
|
||||
endpoint="/v1/chat/completions",
|
||||
status="error",
|
||||
error_message="Rate limit exceeded",
|
||||
)
|
||||
self.assertIsNotNone(row_id)
|
||||
|
||||
db = get_db_path()
|
||||
conn = sqlite3.connect(str(db))
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT error_message FROM request_log WHERE id = ?", (row_id,))
|
||||
row = cursor.fetchone()
|
||||
conn.close()
|
||||
|
||||
self.assertEqual(row[0], "Rate limit exceeded")
|
||||
|
||||
def test_query_requests_filters_by_agent(self):
|
||||
"""query_requests() filters by agent_name."""
|
||||
log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e1")
|
||||
log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e2")
|
||||
log_inference(agent_name="agent-b", provider="p1", model="m1", endpoint="/e3")
|
||||
|
||||
rows = query_requests(agent_name="agent-a")
|
||||
self.assertEqual(len(rows), 2)
|
||||
for row in rows:
|
||||
self.assertEqual(row["agent_name"], "agent-a")
|
||||
|
||||
def test_query_requests_filters_by_provider(self):
|
||||
"""query_requests() filters by provider."""
|
||||
log_inference(agent_name="a1", provider="anthropic", model="claude-4", endpoint="/e")
|
||||
log_inference(agent_name="a2", provider="openrouter", model="claude-4", endpoint="/e")
|
||||
log_inference(agent_name="a3", provider="anthropic", model="claude-4", endpoint="/e")
|
||||
|
||||
rows = query_requests(provider="anthropic")
|
||||
self.assertEqual(len(rows), 2)
|
||||
for row in rows:
|
||||
self.assertEqual(row["provider"], "anthropic")
|
||||
|
||||
def test_query_requests_time_window(self):
|
||||
"""query_requests() respects hours parameter."""
|
||||
log_inference(agent_name="time-agent", provider="test", model="m", endpoint="/e")
|
||||
|
||||
rows = query_requests(hours=1)
|
||||
self.assertGreaterEqual(len(rows), 1)
|
||||
|
||||
# 24-hour window should include at least what 1-hour includes
|
||||
rows_recent = query_requests(hours=24)
|
||||
self.assertGreaterEqual(len(rows_recent), len(rows))
|
||||
|
||||
def test_did_agent_call_provider_positive(self):
|
||||
"""did_agent_call_provider() returns True when agent called provider."""
|
||||
log_inference(
|
||||
agent_name="codex-agent",
|
||||
provider="anthropic",
|
||||
model="claude-sonnet-4-20250514",
|
||||
endpoint="/v1/messages",
|
||||
status="success",
|
||||
)
|
||||
|
||||
result = did_agent_call_provider(
|
||||
agent_name="codex-agent",
|
||||
provider="anthropic",
|
||||
hours=24,
|
||||
)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_did_agent_call_provider_negative_wrong_agent(self):
|
||||
"""did_agent_call_provider() returns False for non-matching agent."""
|
||||
log_inference(
|
||||
agent_name="other-agent",
|
||||
provider="anthropic",
|
||||
model="claude-sonnet-4-20250514",
|
||||
endpoint="/v1/messages",
|
||||
)
|
||||
|
||||
result = did_agent_call_provider(
|
||||
agent_name="codex-agent",
|
||||
provider="anthropic",
|
||||
)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_did_agent_call_provider_negative_wrong_provider(self):
|
||||
"""did_agent_call_provider() returns False for non-matching provider."""
|
||||
log_inference(
|
||||
agent_name="codex-agent",
|
||||
provider="ollama",
|
||||
model="hermes3:8b",
|
||||
endpoint="/v1/chat/completions",
|
||||
)
|
||||
|
||||
result = did_agent_call_provider(
|
||||
agent_name="codex-agent",
|
||||
provider="anthropic",
|
||||
)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_did_agent_call_provider_min_success_count(self):
|
||||
"""did_agent_call_provider() respects min_success_count."""
|
||||
log_inference(
|
||||
agent_name="agent-x",
|
||||
provider="p",
|
||||
model="m",
|
||||
endpoint="/e",
|
||||
status="success",
|
||||
)
|
||||
|
||||
self.assertTrue(did_agent_call_provider("agent-x", "p", min_success_count=1))
|
||||
self.assertFalse(did_agent_call_provider("agent-x", "p", min_success_count=2))
|
||||
|
||||
def test_log_and_query_by_status(self):
|
||||
"""query_requests() can filter by status."""
|
||||
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="success")
|
||||
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="error")
|
||||
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="timeout")
|
||||
|
||||
success_rows = query_requests(status="success")
|
||||
error_rows = query_requests(status="error")
|
||||
|
||||
self.assertEqual(len(success_rows), 1)
|
||||
self.assertEqual(len(error_rows), 1)
|
||||
|
||||
def test_get_recent_activity_summary(self):
|
||||
"""get_recent_activity_summary() returns structured data."""
|
||||
log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=1000)
|
||||
log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=2000)
|
||||
|
||||
summary = get_recent_activity_summary(hours=24)
|
||||
self.assertIn("by_agent_provider", summary)
|
||||
found = any(
|
||||
r.get("agent_name") == "agent-1" and r.get("provider") == "anthropic"
|
||||
for r in summary.get("by_agent_provider", [])
|
||||
)
|
||||
self.assertTrue(found, "Should find agent-1/anthropic in summary")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user