Compare commits

..

2 Commits

Author SHA1 Message Date
39d28e81d4 test: Add session analytics tests (#753)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 35s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / e2e (pull_request) Successful in 3m59s
Tests / test (pull_request) Failing after 45m10s
2026-04-15 03:48:41 +00:00
7bdbbb726b feat: Add session analytics module (#753) 2026-04-15 03:48:19 +00:00
4 changed files with 300 additions and 288 deletions

189
agent/session_analytics.py Normal file
View File

@@ -0,0 +1,189 @@
"""
Session Analytics — Per-session token/cost/time tracking
Tracks resource consumption per session for transparency.
Issue: #753
"""
import json
import time
from dataclasses import dataclass, asdict, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
HERMES_HOME = Path.home() / ".hermes"
ANALYTICS_DIR = HERMES_HOME / "analytics"
# Cost per 1K tokens by provider (input/output)
COST_TABLE = {
"anthropic": {"input": 0.015, "output": 0.075},
"openai": {"input": 0.005, "output": 0.015},
"nous": {"input": 0.002, "output": 0.006},
"openrouter": {"input": 0.005, "output": 0.015},
"ollama": {"input": 0.0, "output": 0.0},
"local": {"input": 0.0, "output": 0.0},
}
@dataclass
class SessionStats:
"""Statistics for a single session."""
session_id: str
start_time: str
end_time: Optional[str] = None
# Token counts
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
# Tool usage
tool_calls: int = 0
tool_errors: int = 0
# Timing
wall_time_seconds: float = 0.0
api_calls: int = 0
# Cost
estimated_cost_usd: float = 0.0
provider: str = ""
model: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SessionTracker:
"""Track per-session analytics."""
def __init__(self, session_id: str, provider: str = "", model: str = ""):
self.session_id = session_id
self.provider = provider.lower() if provider else ""
self.model = model
self.start_time = time.time()
self.stats = SessionStats(
session_id=session_id,
start_time=datetime.now(timezone.utc).isoformat(),
provider=provider,
model=model
)
def record_tokens(self, input_tokens: int, output_tokens: int):
"""Record token usage."""
self.stats.input_tokens += input_tokens
self.stats.output_tokens += output_tokens
self.stats.total_tokens = self.stats.input_tokens + self.stats.output_tokens
# Estimate cost
costs = COST_TABLE.get(self.provider, {"input": 0.01, "output": 0.03})
cost = (input_tokens / 1000) * costs["input"] + (output_tokens / 1000) * costs["output"]
self.stats.estimated_cost_usd += cost
def record_tool_call(self, success: bool = True):
"""Record a tool call."""
self.stats.tool_calls += 1
if not success:
self.stats.tool_errors += 1
def record_api_call(self):
"""Record an API call."""
self.stats.api_calls += 1
def finish(self) -> SessionStats:
"""Finish tracking and return stats."""
self.stats.end_time = datetime.now(timezone.utc).isoformat()
self.stats.wall_time_seconds = time.time() - self.start_time
return self.stats
def get_current_stats(self) -> SessionStats:
"""Get current stats without finishing."""
self.stats.wall_time_seconds = time.time() - self.start_time
return self.stats
def format_stats(stats: SessionStats) -> str:
"""Format stats for display."""
lines = []
lines.append(f"Session: {stats.session_id[:20]}...")
lines.append(f"Provider: {stats.provider or 'unknown'}")
lines.append(f"Model: {stats.model or 'unknown'}")
lines.append("")
lines.append(f"Tokens: {stats.input_tokens:,} in / {stats.output_tokens:,} out ({stats.total_tokens:,} total)")
lines.append(f"Cost: ${stats.estimated_cost_usd:.4f}")
lines.append(f"API calls: {stats.api_calls}")
lines.append(f"Tool calls: {stats.tool_calls} ({stats.tool_errors} errors)")
lines.append(f"Wall time: {stats.wall_time_seconds:.1f}s")
return "\n".join(lines)
def save_session_stats(stats: SessionStats):
"""Save session stats to disk."""
ANALYTICS_DIR.mkdir(parents=True, exist_ok=True)
# Daily file
date_str = datetime.now().strftime("%Y-%m-%d")
stats_file = ANALYTICS_DIR / f"sessions_{date_str}.jsonl"
with open(stats_file, "a") as f:
f.write(json.dumps(stats.to_dict()) + "\n")
def get_daily_stats(date_str: Optional[str] = None) -> Dict[str, Any]:
"""Get aggregate stats for a day."""
if date_str is None:
date_str = datetime.now().strftime("%Y-%m-%d")
stats_file = ANALYTICS_DIR / f"sessions_{date_str}.jsonl"
if not stats_file.exists():
return {"date": date_str, "sessions": 0}
sessions = []
with open(stats_file) as f:
for line in f:
line = line.strip()
if line:
try:
sessions.append(json.loads(line))
except json.JSONDecodeError:
pass
if not sessions:
return {"date": date_str, "sessions": 0}
total_tokens = sum(s.get("total_tokens", 0) for s in sessions)
total_cost = sum(s.get("estimated_cost_usd", 0) for s in sessions)
total_time = sum(s.get("wall_time_seconds", 0) for s in sessions)
total_tool_calls = sum(s.get("tool_calls", 0) for s in sessions)
total_errors = sum(s.get("tool_errors", 0) for s in sessions)
return {
"date": date_str,
"sessions": len(sessions),
"total_tokens": total_tokens,
"total_cost_usd": round(total_cost, 4),
"total_wall_time_seconds": round(total_time, 1),
"total_tool_calls": total_tool_calls,
"total_tool_errors": total_errors,
"avg_tokens_per_session": total_tokens // len(sessions) if sessions else 0,
"avg_cost_per_session": round(total_cost / len(sessions), 4) if sessions else 0,
}
def format_daily_report(stats: Dict[str, Any]) -> str:
"""Format daily stats as report."""
lines = []
lines.append(f"# Session Analytics — {stats['date']}")
lines.append("")
lines.append(f"Sessions: {stats['sessions']}")
lines.append(f"Total tokens: {stats.get('total_tokens', 0):,}")
lines.append(f"Total cost: ${stats.get('total_cost_usd', 0):.4f}")
lines.append(f"Total wall time: {stats.get('total_wall_time_seconds', 0):.1f}s")
lines.append(f"Tool calls: {stats.get('total_tool_calls', 0)} ({stats.get('total_tool_errors', 0)} errors)")
lines.append("")
lines.append(f"Avg tokens/session: {stats.get('avg_tokens_per_session', 0):,}")
lines.append(f"Avg cost/session: ${stats.get('avg_cost_per_session', 0):.4f}")
return "\n".join(lines)

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -0,0 +1,111 @@
"""
Tests for session analytics
Issue: #753
"""
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from agent.session_analytics import (
SessionTracker,
SessionStats,
format_stats,
get_daily_stats,
format_daily_report,
)
class TestSessionStats(unittest.TestCase):
def test_defaults(self):
stats = SessionStats(session_id="test", start_time="2026-01-01")
self.assertEqual(stats.input_tokens, 0)
self.assertEqual(stats.output_tokens, 0)
self.assertEqual(stats.tool_calls, 0)
def test_to_dict(self):
stats = SessionStats(session_id="test", start_time="2026-01-01")
d = stats.to_dict()
self.assertEqual(d["session_id"], "test")
self.assertIn("input_tokens", d)
class TestSessionTracker(unittest.TestCase):
def test_record_tokens(self):
tracker = SessionTracker("test", provider="openai")
tracker.record_tokens(100, 50)
stats = tracker.get_current_stats()
self.assertEqual(stats.input_tokens, 100)
self.assertEqual(stats.output_tokens, 50)
self.assertGreater(stats.estimated_cost_usd, 0)
def test_record_tool_call(self):
tracker = SessionTracker("test")
tracker.record_tool_call(success=True)
tracker.record_tool_call(success=False)
stats = tracker.get_current_stats()
self.assertEqual(stats.tool_calls, 2)
self.assertEqual(stats.tool_errors, 1)
def test_free_provider(self):
tracker = SessionTracker("test", provider="ollama")
tracker.record_tokens(1000, 500)
stats = tracker.get_current_stats()
self.assertEqual(stats.estimated_cost_usd, 0.0)
def test_finish(self):
tracker = SessionTracker("test")
stats = tracker.finish()
self.assertIsNotNone(stats.end_time)
self.assertGreater(stats.wall_time_seconds, 0)
class TestFormatStats(unittest.TestCase):
def test_format(self):
stats = SessionStats(
session_id="test123",
start_time="2026-01-01",
input_tokens=1000,
output_tokens=500,
total_tokens=1500,
tool_calls=5,
tool_errors=1,
wall_time_seconds=30.5,
api_calls=3
)
formatted = format_stats(stats)
self.assertIn("1,000", formatted)
self.assertIn("500", formatted)
class TestDailyStats(unittest.TestCase):
def test_empty(self):
with patch("agent.session_analytics.ANALYTICS_DIR", Path(tempfile.mkdtemp())):
stats = get_daily_stats("2020-01-01")
self.assertEqual(stats["sessions"], 0)
def test_format_report(self):
stats = {
"date": "2026-04-14",
"sessions": 10,
"total_tokens": 50000,
"total_cost_usd": 0.50,
"total_wall_time_seconds": 300,
"total_tool_calls": 100,
"total_tool_errors": 5,
"avg_tokens_per_session": 5000,
"avg_cost_per_session": 0.05,
}
report = format_daily_report(stats)
self.assertIn("10", report)
self.assertIn("50,000", report)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"