Compare commits

...

1 Commits

Author SHA1 Message Date
Rockachopa
7647e9172a feat(#446): add request_log instrumentation — Verify What Actually Happened
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 23s
Smoke Test / smoke (pull_request) Failing after 19s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 50s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Shell Script Lint (pull_request) Failing after 37s
Validate Config / Playbook Schema Validation (pull_request) Successful in 11s
Architecture Lint / Lint Repository (pull_request) Failing after 12s
PR Checklist / pr-checklist (pull_request) Successful in 4m40s
- Add bin/request_log.py instrumentation library
  - log_inference(): write rows to request_log table
  - query_requests(): query recent telemetry with filters (agent, provider, model, status, hours)
  - did_agent_call_provider(): answer "did agent X call provider Y in last N hours?"
  - get_recent_activity_summary(): aggregate stats by agent/provider/model
  - ensure_db(): auto-create DB and schema if missing
  - CLI interface: `python3 bin/request_log.py log|query|did-call`
  - DB path: ~/.local/timmy/request_log.db (configurable via REQUEST_LOG_PATH)

- Add tests/test_request_log.py with 13 passing tests
  - test_ensure_db_creates_schema: verifies table + indexes creation
  - test_log_inference_inserts_row: full-field insert
  - test_log_inference_minimal_fields: required fields only
  - test_log_inference_error_status: error status with message
  - test_query_requests_filters_by_agent: agent filter
  - test_query_requests_filters_by_provider: provider filter
  - test_query_requests_time_window: hours parameter
  - test_did_agent_call_provider_positive/negative_wrong_agent/negative_wrong_provider
  - test_did_agent_call_provider_min_success_count
  - test_log_and_query_by_status: status filter
  - test_get_recent_activity_summary: view aggregation

The request_log schema and ansible deployment already existed.
This commit adds the missing instrumentation that actually populates it.

Usage example for agents:
```python
from request_log import log_inference
log_inference(
    agent_name="codex-agent",
    provider="anthropic",
    model="claude-sonnet-4-20250514",
    endpoint="/v1/messages",
    tokens_in=prompt_tokens,
    tokens_out=completion_tokens,
    latency_ms=int(latency_s * 1000),
    status="success"
)
```

Query example:
```python
from request_log import did_agent_call_provider
if did_agent_call_provider("codex-agent", "anthropic", hours=1):
    print("Agent successfully called Anthropic in the last hour")
```

Closes #446
2026-04-26 01:39:41 -04:00
2 changed files with 515 additions and 0 deletions

255
bin/request_log.py Normal file
View File

@@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""
Request Log Telemetry — "Verify What Actually Happened"
Issue #446: [P2.5] request_log Telemetry Table
Every agent writes a row to request_log for every inference call.
No exceptions. No summarizing. Actual rows.
This module provides:
- log_inference(): write a telemetry row
- query_requests(): read recent telemetry
- did_agent_call_provider(): answer verification questions
Database: ~/.local/timmy/request_log.db
Override via REQUEST_LOG_PATH environment variable.
"""
import os
import sqlite3
import sys
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, List
# Default DB location (matches ansible group_vars/wizards.yml)
DEFAULT_DB_PATH = Path.home() / ".local" / "timmy" / "request_log.db"
def get_db_path() -> Path:
"""Return the configured request_log database path."""
env_path = os.environ.get("REQUEST_LOG_PATH")
if env_path:
return Path(env_path).expanduser()
return DEFAULT_DB_PATH
def ensure_db() -> Path:
"""
Ensure the database and schema exist.
Creates the DB and schema if missing.
Returns the DB path.
"""
db_path = get_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
if not db_path.exists():
# Create with schema
schema = """
CREATE TABLE IF NOT EXISTS request_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL DEFAULT (datetime('now')),
agent_name TEXT NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL,
endpoint TEXT NOT NULL,
tokens_in INTEGER,
tokens_out INTEGER,
latency_ms INTEGER,
status TEXT NOT NULL,
error_message TEXT
);
CREATE INDEX IF NOT EXISTS idx_request_log_agent
ON request_log (agent_name, timestamp);
CREATE INDEX IF NOT EXISTS idx_request_log_provider
ON request_log (provider, timestamp);
CREATE INDEX IF NOT EXISTS idx_request_log_status
ON request_log (status, timestamp);
"""
conn = sqlite3.connect(str(db_path))
conn.executescript(schema)
conn.commit()
conn.close()
return db_path
def log_inference(
*,
agent_name: str,
provider: str,
model: str,
endpoint: str,
tokens_in: Optional[int] = None,
tokens_out: Optional[int] = None,
latency_ms: Optional[int] = None,
status: str = "success",
error_message: Optional[str] = None,
db_path: Optional[Path] = None,
) -> Optional[int]:
"""
Log a single inference request to the request_log table.
Args:
agent_name: Name of the agent making the call
provider: Provider name (anthropic, openrouter, ollama, etc.)
model: Model identifier
endpoint: API endpoint called
tokens_in: Input token count (optional but recommended)
tokens_out: Output token count (optional but recommended)
latency_ms: Latency in milliseconds (optional but recommended)
status: One of 'success', 'error', 'timeout', 'fallback'
error_message: Error text if status is error/timeout
db_path: Override DB path (for testing)
Returns:
Row ID if inserted, None on failure
"""
db = Path(db_path) if db_path else get_db_path()
try:
# Ensure DB exists
if not db.exists():
ensure_db()
conn = sqlite3.connect(str(db))
cursor = conn.cursor()
cursor.execute("""
INSERT INTO request_log
(timestamp, agent_name, provider, model, endpoint,
tokens_in, tokens_out, latency_ms, status, error_message)
VALUES (datetime('now'), ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
agent_name, provider, model, endpoint,
tokens_in, tokens_out, latency_ms, status, error_message
))
row_id = cursor.lastrowid
conn.commit()
conn.close()
return row_id
except Exception as e:
# Never break production — swallow errors for telemetry
return None
def query_requests(
*,
agent_name: Optional[str] = None,
provider: Optional[str] = None,
model: Optional[str] = None,
hours: int = 1,
status: Optional[str] = None,
limit: int = 100,
db_path: Optional[Path] = None,
) -> List[Dict[str, Any]]:
"""
Query recent inference logs.
Args:
agent_name: Filter by agent name
provider: Filter by provider
model: Filter by model
hours: Lookback window (default 1 hour)
status: Filter by status ('success', 'error', etc.)
limit: Max rows to return
db_path: Override DB path
Returns:
List of matching records as dicts
"""
db = Path(db_path) if db_path else get_db_path()
if not db.exists():
return []
conditions = ["timestamp > datetime('now', '-' || ? || ' hours')"]
params = [hours]
if agent_name:
conditions.append("agent_name = ?")
params.append(agent_name)
if provider:
conditions.append("provider = ?")
params.append(provider)
if model:
conditions.append("model = ?")
params.append(model)
if status:
conditions.append("status = ?")
params.append(status)
where_clause = " AND ".join(conditions)
try:
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(f"""
SELECT * FROM request_log
WHERE {where_clause}
ORDER BY timestamp DESC
LIMIT ?
""", tuple(params) + (limit,))
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
except Exception:
return []
def did_agent_call_provider(
agent_name: str,
provider: str,
hours: int = 1,
min_success_count: int = 1,
db_path: Optional[Path] = None,
) -> bool:
"""
Answer: "Did agent X actually call provider Y in the last N hours?"
Returns True if agent made at least min_success_count successful calls.
"""
rows = query_requests(
agent_name=agent_name,
provider=provider,
hours=hours,
status="success",
db_path=db_path,
)
return len(rows) >= min_success_count
def get_recent_activity_summary(hours: int = 1) -> Dict[str, Any]:
"""Get aggregate statistics for recent activity (uses v_recent_activity view if available)."""
db = get_db_path()
if not db.exists():
return {"error": "Database not found"}
try:
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Try the view first
try:
cursor.execute("""
SELECT agent_name, provider, model, status,
COUNT(*) as call_count, AVG(latency_ms) as avg_latency
FROM request_log
WHERE timestamp > datetime('now', '-' || ? || ' hours')
GROUP BY agent_name, provider, model, status
""", (hours,))
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return {"by_agent_provider": rows}
except Exception:
conn.close()
return {"error": "query failed"}
except Exception:
return {"error": "db error"}

260
tests/test_request_log.py Normal file
View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""
Tests for bin/request_log.py — Request Log Telemetry.
Issue #446: [P2.5] request_log Telemetry Table — Verify What Actually Happened
"""
import json
import sqlite3
import tempfile
import unittest
from pathlib import Path
from datetime import datetime, timezone, timedelta
import sys
import os
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "bin"))
from request_log import (
get_db_path,
ensure_db,
log_inference,
query_requests,
did_agent_call_provider,
get_recent_activity_summary,
)
class TestRequestLog(unittest.TestCase):
def setUp(self):
"""Create a temporary test database for each test."""
self.tmpdir = Path(tempfile.mkdtemp())
self.db_path = self.tmpdir / "test_request_log.db"
# Patch the module's db path by overriding env
self.original_env = os.environ.get("REQUEST_LOG_PATH")
os.environ["REQUEST_LOG_PATH"] = str(self.db_path)
# Clear any cached state
if self.db_path.exists():
self.db_path.unlink()
def tearDown(self):
"""Clean up test database."""
if self.db_path.exists():
self.db_path.unlink()
# Restore env
if self.original_env is not None:
os.environ["REQUEST_LOG_PATH"] = self.original_env
else:
os.environ.pop("REQUEST_LOG_PATH", None)
def test_ensure_db_creates_schema(self):
"""ensure_db() creates the database with correct schema."""
db = ensure_db()
self.assertTrue(db.exists())
# Check table exists
conn = sqlite3.connect(str(db))
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='request_log'")
result = cursor.fetchone()
self.assertIsNotNone(result, "request_log table should exist")
# Check indexes exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_request_log_agent'")
self.assertIsNotNone(cursor.fetchone())
conn.close()
def test_log_inference_inserts_row(self):
"""log_inference() inserts a complete row."""
row_id = log_inference(
agent_name="test-agent",
provider="anthropic",
model="claude-sonnet-4-20250514",
endpoint="/v1/messages",
tokens_in=123,
tokens_out=456,
latency_ms=1500,
status="success",
)
self.assertIsNotNone(row_id, "Should return row ID")
self.assertGreater(row_id, 0)
db = get_db_path()
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,))
row = cursor.fetchone()
conn.close()
self.assertEqual(row["agent_name"], "test-agent")
self.assertEqual(row["provider"], "anthropic")
self.assertEqual(row["model"], "claude-sonnet-4-20250514")
self.assertEqual(row["endpoint"], "/v1/messages")
self.assertEqual(row["tokens_in"], 123)
self.assertEqual(row["tokens_out"], 456)
self.assertEqual(row["latency_ms"], 1500)
self.assertEqual(row["status"], "success")
def test_log_inference_minimal_fields(self):
"""log_inference() works with only required fields."""
row_id = log_inference(
agent_name="min-agent",
provider="ollama",
model="hermes3:8b",
endpoint="/v1/chat/completions",
)
self.assertIsNotNone(row_id)
db = get_db_path()
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM request_log WHERE id = ?", (row_id,))
row = cursor.fetchone()
conn.close()
self.assertEqual(row["agent_name"], "min-agent")
self.assertEqual(row["tokens_in"], None)
self.assertEqual(row["latency_ms"], None)
def test_log_inference_error_status(self):
"""log_inference() records error status with message."""
row_id = log_inference(
agent_name="err-agent",
provider="openrouter",
model="claude-opus-4-6",
endpoint="/v1/chat/completions",
status="error",
error_message="Rate limit exceeded",
)
self.assertIsNotNone(row_id)
db = get_db_path()
conn = sqlite3.connect(str(db))
cursor = conn.cursor()
cursor.execute("SELECT error_message FROM request_log WHERE id = ?", (row_id,))
row = cursor.fetchone()
conn.close()
self.assertEqual(row[0], "Rate limit exceeded")
def test_query_requests_filters_by_agent(self):
"""query_requests() filters by agent_name."""
log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e1")
log_inference(agent_name="agent-a", provider="p1", model="m1", endpoint="/e2")
log_inference(agent_name="agent-b", provider="p1", model="m1", endpoint="/e3")
rows = query_requests(agent_name="agent-a")
self.assertEqual(len(rows), 2)
for row in rows:
self.assertEqual(row["agent_name"], "agent-a")
def test_query_requests_filters_by_provider(self):
"""query_requests() filters by provider."""
log_inference(agent_name="a1", provider="anthropic", model="claude-4", endpoint="/e")
log_inference(agent_name="a2", provider="openrouter", model="claude-4", endpoint="/e")
log_inference(agent_name="a3", provider="anthropic", model="claude-4", endpoint="/e")
rows = query_requests(provider="anthropic")
self.assertEqual(len(rows), 2)
for row in rows:
self.assertEqual(row["provider"], "anthropic")
def test_query_requests_time_window(self):
"""query_requests() respects hours parameter."""
log_inference(agent_name="time-agent", provider="test", model="m", endpoint="/e")
rows = query_requests(hours=1)
self.assertGreaterEqual(len(rows), 1)
# 24-hour window should include at least what 1-hour includes
rows_recent = query_requests(hours=24)
self.assertGreaterEqual(len(rows_recent), len(rows))
def test_did_agent_call_provider_positive(self):
"""did_agent_call_provider() returns True when agent called provider."""
log_inference(
agent_name="codex-agent",
provider="anthropic",
model="claude-sonnet-4-20250514",
endpoint="/v1/messages",
status="success",
)
result = did_agent_call_provider(
agent_name="codex-agent",
provider="anthropic",
hours=24,
)
self.assertTrue(result)
def test_did_agent_call_provider_negative_wrong_agent(self):
"""did_agent_call_provider() returns False for non-matching agent."""
log_inference(
agent_name="other-agent",
provider="anthropic",
model="claude-sonnet-4-20250514",
endpoint="/v1/messages",
)
result = did_agent_call_provider(
agent_name="codex-agent",
provider="anthropic",
)
self.assertFalse(result)
def test_did_agent_call_provider_negative_wrong_provider(self):
"""did_agent_call_provider() returns False for non-matching provider."""
log_inference(
agent_name="codex-agent",
provider="ollama",
model="hermes3:8b",
endpoint="/v1/chat/completions",
)
result = did_agent_call_provider(
agent_name="codex-agent",
provider="anthropic",
)
self.assertFalse(result)
def test_did_agent_call_provider_min_success_count(self):
"""did_agent_call_provider() respects min_success_count."""
log_inference(
agent_name="agent-x",
provider="p",
model="m",
endpoint="/e",
status="success",
)
self.assertTrue(did_agent_call_provider("agent-x", "p", min_success_count=1))
self.assertFalse(did_agent_call_provider("agent-x", "p", min_success_count=2))
def test_log_and_query_by_status(self):
"""query_requests() can filter by status."""
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="success")
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="error")
log_inference(agent_name="a", provider="p", model="m", endpoint="/e", status="timeout")
success_rows = query_requests(status="success")
error_rows = query_requests(status="error")
self.assertEqual(len(success_rows), 1)
self.assertEqual(len(error_rows), 1)
def test_get_recent_activity_summary(self):
"""get_recent_activity_summary() returns structured data."""
log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=1000)
log_inference(agent_name="agent-1", provider="anthropic", model="claude-4", endpoint="/e", latency_ms=2000)
summary = get_recent_activity_summary(hours=24)
self.assertIn("by_agent_provider", summary)
found = any(
r.get("agent_name") == "agent-1" and r.get("provider") == "anthropic"
for r in summary.get("by_agent_provider", [])
)
self.assertTrue(found, "Should find agent-1/anthropic in summary")
if __name__ == "__main__":
unittest.main()