feat: add /insights command with usage analytics and cost estimation
Inspired by Claude Code's /insights, adapted for Hermes Agent's multi-platform architecture. Analyzes session history from state.db to produce comprehensive usage insights. Features: - Overview stats: sessions, messages, tokens, estimated cost, active time - Model breakdown: per-model sessions, tokens, and cost estimation - Platform breakdown: CLI vs Telegram vs Discord etc. (unique to Hermes) - Tool usage ranking: most-used tools with percentages - Activity patterns: day-of-week chart, peak hours, streaks - Notable sessions: longest, most messages, most tokens, most tool calls - Cost estimation: real pricing data for 25+ models (OpenAI, Anthropic, DeepSeek, Google, Meta) with fuzzy model name matching - Configurable time window: --days flag (default 30) - Source filtering: --source flag to filter by platform Three entry points: - /insights slash command in CLI (supports --days and --source flags) - /insights slash command in gateway (compact markdown format) - hermes insights CLI subcommand (standalone) Includes 56 tests covering pricing helpers, format helpers, empty DB, populated DB with multi-platform data, filtering, formatting, and edge cases.
This commit is contained in:
691
agent/insights.py
Normal file
691
agent/insights.py
Normal file
@@ -0,0 +1,691 @@
|
||||
"""
|
||||
Session Insights Engine for Hermes Agent.
|
||||
|
||||
Analyzes historical session data from the SQLite state database to produce
|
||||
comprehensive usage insights — token consumption, cost estimates, tool usage
|
||||
patterns, activity trends, model/platform breakdowns, and session metrics.
|
||||
|
||||
Inspired by Claude Code's /insights command, adapted for Hermes Agent's
|
||||
multi-platform architecture with additional cost estimation and platform
|
||||
breakdown capabilities.
|
||||
|
||||
Usage:
|
||||
from agent.insights import InsightsEngine
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
print(engine.format_terminal(report))
|
||||
"""
|
||||
|
||||
import time
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# =========================================================================
|
||||
# Model pricing (USD per million tokens) — approximate as of early 2026
|
||||
# =========================================================================
|
||||
MODEL_PRICING = {
|
||||
# OpenAI
|
||||
"gpt-4o": {"input": 2.50, "output": 10.00},
|
||||
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
|
||||
"gpt-4.1": {"input": 2.00, "output": 8.00},
|
||||
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
|
||||
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
|
||||
"gpt-4.5-preview": {"input": 75.00, "output": 150.00},
|
||||
"gpt-5": {"input": 10.00, "output": 30.00},
|
||||
"gpt-5.4": {"input": 10.00, "output": 30.00},
|
||||
"o3": {"input": 10.00, "output": 40.00},
|
||||
"o3-mini": {"input": 1.10, "output": 4.40},
|
||||
"o4-mini": {"input": 1.10, "output": 4.40},
|
||||
# Anthropic
|
||||
"claude-opus-4-20250514": {"input": 15.00, "output": 75.00},
|
||||
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
|
||||
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
|
||||
"claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00},
|
||||
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
|
||||
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
|
||||
# DeepSeek
|
||||
"deepseek-chat": {"input": 0.14, "output": 0.28},
|
||||
"deepseek-reasoner": {"input": 0.55, "output": 2.19},
|
||||
# Google
|
||||
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
|
||||
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
|
||||
"gemini-2.0-flash": {"input": 0.10, "output": 0.40},
|
||||
# Meta (via providers)
|
||||
"llama-4-maverick": {"input": 0.50, "output": 0.70},
|
||||
"llama-4-scout": {"input": 0.20, "output": 0.30},
|
||||
}
|
||||
|
||||
# Fallback pricing for unknown models
|
||||
_DEFAULT_PRICING = {"input": 3.00, "output": 12.00}
|
||||
|
||||
|
||||
def _get_pricing(model_name: str) -> Dict[str, float]:
|
||||
"""Look up pricing for a model. Uses fuzzy matching on model name."""
|
||||
if not model_name:
|
||||
return _DEFAULT_PRICING
|
||||
|
||||
# Strip provider prefix (e.g., "anthropic/claude-..." -> "claude-...")
|
||||
bare = model_name.split("/")[-1].lower()
|
||||
|
||||
# Exact match first
|
||||
if bare in MODEL_PRICING:
|
||||
return MODEL_PRICING[bare]
|
||||
|
||||
# Fuzzy prefix match
|
||||
for key, price in MODEL_PRICING.items():
|
||||
if bare.startswith(key) or key.startswith(bare):
|
||||
return price
|
||||
|
||||
# Keyword heuristics
|
||||
if "opus" in bare:
|
||||
return {"input": 15.00, "output": 75.00}
|
||||
if "sonnet" in bare:
|
||||
return {"input": 3.00, "output": 15.00}
|
||||
if "haiku" in bare:
|
||||
return {"input": 0.80, "output": 4.00}
|
||||
if "gpt-4o-mini" in bare:
|
||||
return {"input": 0.15, "output": 0.60}
|
||||
if "gpt-4o" in bare:
|
||||
return {"input": 2.50, "output": 10.00}
|
||||
if "gpt-5" in bare:
|
||||
return {"input": 10.00, "output": 30.00}
|
||||
if "deepseek" in bare:
|
||||
return {"input": 0.14, "output": 0.28}
|
||||
if "gemini" in bare:
|
||||
return {"input": 0.15, "output": 0.60}
|
||||
|
||||
return _DEFAULT_PRICING
|
||||
|
||||
|
||||
def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
|
||||
"""Estimate the USD cost for a given model and token counts."""
|
||||
pricing = _get_pricing(model)
|
||||
return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
|
||||
|
||||
|
||||
def _format_duration(seconds: float) -> str:
|
||||
"""Format seconds into a human-readable duration string."""
|
||||
if seconds < 60:
|
||||
return f"{seconds:.0f}s"
|
||||
minutes = seconds / 60
|
||||
if minutes < 60:
|
||||
return f"{minutes:.0f}m"
|
||||
hours = minutes / 60
|
||||
if hours < 24:
|
||||
remaining_min = int(minutes % 60)
|
||||
return f"{int(hours)}h {remaining_min}m" if remaining_min else f"{int(hours)}h"
|
||||
days = hours / 24
|
||||
return f"{days:.1f}d"
|
||||
|
||||
|
||||
def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
|
||||
"""Create simple horizontal bar chart strings from values."""
|
||||
peak = max(values) if values else 1
|
||||
if peak == 0:
|
||||
return ["" for _ in values]
|
||||
return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
|
||||
|
||||
|
||||
class InsightsEngine:
|
||||
"""
|
||||
Analyzes session history and produces usage insights.
|
||||
|
||||
Works directly with a SessionDB instance (or raw sqlite3 connection)
|
||||
to query session and message data.
|
||||
"""
|
||||
|
||||
def __init__(self, db):
|
||||
"""
|
||||
Initialize with a SessionDB instance.
|
||||
|
||||
Args:
|
||||
db: A SessionDB instance (from hermes_state.py)
|
||||
"""
|
||||
self.db = db
|
||||
self._conn = db._conn
|
||||
|
||||
def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate a complete insights report.
|
||||
|
||||
Args:
|
||||
days: Number of days to look back (default: 30)
|
||||
source: Optional filter by source platform
|
||||
|
||||
Returns:
|
||||
Dict with all computed insights
|
||||
"""
|
||||
cutoff = time.time() - (days * 86400)
|
||||
|
||||
# Gather raw data
|
||||
sessions = self._get_sessions(cutoff, source)
|
||||
tool_usage = self._get_tool_usage(cutoff, source)
|
||||
message_stats = self._get_message_stats(cutoff, source)
|
||||
|
||||
if not sessions:
|
||||
return {
|
||||
"days": days,
|
||||
"source_filter": source,
|
||||
"empty": True,
|
||||
"overview": {},
|
||||
"models": [],
|
||||
"platforms": [],
|
||||
"tools": [],
|
||||
"activity": {},
|
||||
"top_sessions": [],
|
||||
}
|
||||
|
||||
# Compute insights
|
||||
overview = self._compute_overview(sessions, message_stats)
|
||||
models = self._compute_model_breakdown(sessions)
|
||||
platforms = self._compute_platform_breakdown(sessions)
|
||||
tools = self._compute_tool_breakdown(tool_usage)
|
||||
activity = self._compute_activity_patterns(sessions)
|
||||
top_sessions = self._compute_top_sessions(sessions)
|
||||
|
||||
return {
|
||||
"days": days,
|
||||
"source_filter": source,
|
||||
"empty": False,
|
||||
"generated_at": time.time(),
|
||||
"overview": overview,
|
||||
"models": models,
|
||||
"platforms": platforms,
|
||||
"tools": tools,
|
||||
"activity": activity,
|
||||
"top_sessions": top_sessions,
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# Data gathering (SQL queries)
|
||||
# =========================================================================
|
||||
|
||||
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
|
||||
"""Fetch sessions within the time window."""
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM sessions
|
||||
WHERE started_at >= ? AND source = ?
|
||||
ORDER BY started_at DESC""",
|
||||
(cutoff, source),
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM sessions
|
||||
WHERE started_at >= ?
|
||||
ORDER BY started_at DESC""",
|
||||
(cutoff,),
|
||||
)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
|
||||
"""Get tool call counts from messages."""
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT m.tool_name, COUNT(*) as count
|
||||
FROM messages m
|
||||
JOIN sessions s ON s.id = m.session_id
|
||||
WHERE s.started_at >= ? AND s.source = ?
|
||||
AND m.role = 'tool' AND m.tool_name IS NOT NULL
|
||||
GROUP BY m.tool_name
|
||||
ORDER BY count DESC""",
|
||||
(cutoff, source),
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT m.tool_name, COUNT(*) as count
|
||||
FROM messages m
|
||||
JOIN sessions s ON s.id = m.session_id
|
||||
WHERE s.started_at >= ?
|
||||
AND m.role = 'tool' AND m.tool_name IS NOT NULL
|
||||
GROUP BY m.tool_name
|
||||
ORDER BY count DESC""",
|
||||
(cutoff,),
|
||||
)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
|
||||
"""Get aggregate message statistics."""
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT
|
||||
COUNT(*) as total_messages,
|
||||
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
|
||||
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
|
||||
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
|
||||
FROM messages m
|
||||
JOIN sessions s ON s.id = m.session_id
|
||||
WHERE s.started_at >= ? AND s.source = ?""",
|
||||
(cutoff, source),
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT
|
||||
COUNT(*) as total_messages,
|
||||
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
|
||||
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
|
||||
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
|
||||
FROM messages m
|
||||
JOIN sessions s ON s.id = m.session_id
|
||||
WHERE s.started_at >= ?""",
|
||||
(cutoff,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
return dict(row) if row else {
|
||||
"total_messages": 0, "user_messages": 0,
|
||||
"assistant_messages": 0, "tool_messages": 0,
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# Computation
|
||||
# =========================================================================
|
||||
|
||||
def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
|
||||
"""Compute high-level overview statistics."""
|
||||
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
|
||||
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
|
||||
total_tokens = total_input + total_output
|
||||
total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
|
||||
total_messages = sum(s.get("message_count") or 0 for s in sessions)
|
||||
|
||||
# Cost estimation (weighted by model)
|
||||
total_cost = sum(
|
||||
_estimate_cost(s.get("model", ""), s.get("input_tokens") or 0, s.get("output_tokens") or 0)
|
||||
for s in sessions
|
||||
)
|
||||
|
||||
# Session duration stats
|
||||
durations = []
|
||||
for s in sessions:
|
||||
start = s.get("started_at")
|
||||
end = s.get("ended_at")
|
||||
if start and end:
|
||||
durations.append(end - start)
|
||||
|
||||
total_hours = sum(durations) / 3600 if durations else 0
|
||||
avg_duration = sum(durations) / len(durations) if durations else 0
|
||||
|
||||
# Earliest and latest session
|
||||
started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")]
|
||||
date_range_start = min(started_timestamps) if started_timestamps else None
|
||||
date_range_end = max(started_timestamps) if started_timestamps else None
|
||||
|
||||
return {
|
||||
"total_sessions": len(sessions),
|
||||
"total_messages": total_messages,
|
||||
"total_tool_calls": total_tool_calls,
|
||||
"total_input_tokens": total_input,
|
||||
"total_output_tokens": total_output,
|
||||
"total_tokens": total_tokens,
|
||||
"estimated_cost": total_cost,
|
||||
"total_hours": total_hours,
|
||||
"avg_session_duration": avg_duration,
|
||||
"avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
|
||||
"avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0,
|
||||
"user_messages": message_stats.get("user_messages") or 0,
|
||||
"assistant_messages": message_stats.get("assistant_messages") or 0,
|
||||
"tool_messages": message_stats.get("tool_messages") or 0,
|
||||
"date_range_start": date_range_start,
|
||||
"date_range_end": date_range_end,
|
||||
}
|
||||
|
||||
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
|
||||
"""Break down usage by model."""
|
||||
model_data = defaultdict(lambda: {
|
||||
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
|
||||
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
|
||||
})
|
||||
|
||||
for s in sessions:
|
||||
model = s.get("model") or "unknown"
|
||||
# Normalize: strip provider prefix for display
|
||||
display_model = model.split("/")[-1] if "/" in model else model
|
||||
d = model_data[display_model]
|
||||
d["sessions"] += 1
|
||||
inp = s.get("input_tokens") or 0
|
||||
out = s.get("output_tokens") or 0
|
||||
d["input_tokens"] += inp
|
||||
d["output_tokens"] += out
|
||||
d["total_tokens"] += inp + out
|
||||
d["tool_calls"] += s.get("tool_call_count") or 0
|
||||
d["cost"] += _estimate_cost(model, inp, out)
|
||||
|
||||
result = [
|
||||
{"model": model, **data}
|
||||
for model, data in model_data.items()
|
||||
]
|
||||
result.sort(key=lambda x: x["total_tokens"], reverse=True)
|
||||
return result
|
||||
|
||||
def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
|
||||
"""Break down usage by platform/source."""
|
||||
platform_data = defaultdict(lambda: {
|
||||
"sessions": 0, "messages": 0, "input_tokens": 0,
|
||||
"output_tokens": 0, "total_tokens": 0, "tool_calls": 0,
|
||||
})
|
||||
|
||||
for s in sessions:
|
||||
source = s.get("source") or "unknown"
|
||||
d = platform_data[source]
|
||||
d["sessions"] += 1
|
||||
d["messages"] += s.get("message_count") or 0
|
||||
inp = s.get("input_tokens") or 0
|
||||
out = s.get("output_tokens") or 0
|
||||
d["input_tokens"] += inp
|
||||
d["output_tokens"] += out
|
||||
d["total_tokens"] += inp + out
|
||||
d["tool_calls"] += s.get("tool_call_count") or 0
|
||||
|
||||
result = [
|
||||
{"platform": platform, **data}
|
||||
for platform, data in platform_data.items()
|
||||
]
|
||||
result.sort(key=lambda x: x["sessions"], reverse=True)
|
||||
return result
|
||||
|
||||
def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
|
||||
"""Process tool usage data into a ranked list with percentages."""
|
||||
total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
|
||||
result = []
|
||||
for t in tool_usage:
|
||||
pct = (t["count"] / total_calls * 100) if total_calls else 0
|
||||
result.append({
|
||||
"tool": t["tool_name"],
|
||||
"count": t["count"],
|
||||
"percentage": pct,
|
||||
})
|
||||
return result
|
||||
|
||||
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
|
||||
"""Analyze activity patterns by day of week and hour."""
|
||||
day_counts = Counter() # 0=Monday ... 6=Sunday
|
||||
hour_counts = Counter()
|
||||
daily_counts = Counter() # date string -> count
|
||||
|
||||
for s in sessions:
|
||||
ts = s.get("started_at")
|
||||
if not ts:
|
||||
continue
|
||||
dt = datetime.fromtimestamp(ts)
|
||||
day_counts[dt.weekday()] += 1
|
||||
hour_counts[dt.hour] += 1
|
||||
daily_counts[dt.strftime("%Y-%m-%d")] += 1
|
||||
|
||||
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
|
||||
day_breakdown = [
|
||||
{"day": day_names[i], "count": day_counts.get(i, 0)}
|
||||
for i in range(7)
|
||||
]
|
||||
|
||||
hour_breakdown = [
|
||||
{"hour": i, "count": hour_counts.get(i, 0)}
|
||||
for i in range(24)
|
||||
]
|
||||
|
||||
# Busiest day and hour
|
||||
busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
|
||||
busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None
|
||||
|
||||
# Active days (days with at least one session)
|
||||
active_days = len(daily_counts)
|
||||
|
||||
# Streak calculation
|
||||
if daily_counts:
|
||||
all_dates = sorted(daily_counts.keys())
|
||||
current_streak = 1
|
||||
max_streak = 1
|
||||
for i in range(1, len(all_dates)):
|
||||
d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d")
|
||||
d2 = datetime.strptime(all_dates[i], "%Y-%m-%d")
|
||||
if (d2 - d1).days == 1:
|
||||
current_streak += 1
|
||||
max_streak = max(max_streak, current_streak)
|
||||
else:
|
||||
current_streak = 1
|
||||
else:
|
||||
max_streak = 0
|
||||
|
||||
return {
|
||||
"by_day": day_breakdown,
|
||||
"by_hour": hour_breakdown,
|
||||
"busiest_day": busiest_day,
|
||||
"busiest_hour": busiest_hour,
|
||||
"active_days": active_days,
|
||||
"max_streak": max_streak,
|
||||
}
|
||||
|
||||
def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
|
||||
"""Find notable sessions (longest, most messages, most tokens)."""
|
||||
top = []
|
||||
|
||||
# Longest by duration
|
||||
sessions_with_duration = [
|
||||
s for s in sessions
|
||||
if s.get("started_at") and s.get("ended_at")
|
||||
]
|
||||
if sessions_with_duration:
|
||||
longest = max(
|
||||
sessions_with_duration,
|
||||
key=lambda s: (s["ended_at"] - s["started_at"]),
|
||||
)
|
||||
dur = longest["ended_at"] - longest["started_at"]
|
||||
top.append({
|
||||
"label": "Longest session",
|
||||
"session_id": longest["id"][:16],
|
||||
"value": _format_duration(dur),
|
||||
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
|
||||
})
|
||||
|
||||
# Most messages
|
||||
most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
|
||||
if (most_msgs.get("message_count") or 0) > 0:
|
||||
top.append({
|
||||
"label": "Most messages",
|
||||
"session_id": most_msgs["id"][:16],
|
||||
"value": f"{most_msgs['message_count']} msgs",
|
||||
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
|
||||
})
|
||||
|
||||
# Most tokens
|
||||
most_tokens = max(
|
||||
sessions,
|
||||
key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0),
|
||||
)
|
||||
token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
|
||||
if token_total > 0:
|
||||
top.append({
|
||||
"label": "Most tokens",
|
||||
"session_id": most_tokens["id"][:16],
|
||||
"value": f"{token_total:,} tokens",
|
||||
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
|
||||
})
|
||||
|
||||
# Most tool calls
|
||||
most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
|
||||
if (most_tools.get("tool_call_count") or 0) > 0:
|
||||
top.append({
|
||||
"label": "Most tool calls",
|
||||
"session_id": most_tools["id"][:16],
|
||||
"value": f"{most_tools['tool_call_count']} calls",
|
||||
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
|
||||
})
|
||||
|
||||
return top
|
||||
|
||||
# =========================================================================
|
||||
# Formatting
|
||||
# =========================================================================
|
||||
|
||||
def format_terminal(self, report: Dict) -> str:
|
||||
"""Format the insights report for terminal display (CLI)."""
|
||||
if report.get("empty"):
|
||||
days = report.get("days", 30)
|
||||
src = f" (source: {report['source_filter']})" if report.get("source_filter") else ""
|
||||
return f" No sessions found in the last {days} days{src}."
|
||||
|
||||
lines = []
|
||||
o = report["overview"]
|
||||
days = report["days"]
|
||||
src_filter = report.get("source_filter")
|
||||
|
||||
# Header
|
||||
lines.append("")
|
||||
lines.append(" ╔══════════════════════════════════════════════════════════╗")
|
||||
lines.append(" ║ 📊 Hermes Insights ║")
|
||||
period_label = f"Last {days} days"
|
||||
if src_filter:
|
||||
period_label += f" ({src_filter})"
|
||||
padding = 58 - len(period_label) - 2
|
||||
left_pad = padding // 2
|
||||
right_pad = padding - left_pad
|
||||
lines.append(f" ║{' ' * left_pad} {period_label} {' ' * right_pad}║")
|
||||
lines.append(" ╚══════════════════════════════════════════════════════════╝")
|
||||
lines.append("")
|
||||
|
||||
# Date range
|
||||
if o.get("date_range_start") and o.get("date_range_end"):
|
||||
start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y")
|
||||
end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y")
|
||||
lines.append(f" Period: {start_str} — {end_str}")
|
||||
lines.append("")
|
||||
|
||||
# Overview
|
||||
lines.append(" 📋 Overview")
|
||||
lines.append(" " + "─" * 56)
|
||||
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
|
||||
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
|
||||
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
|
||||
lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: ${o['estimated_cost']:.2f}")
|
||||
if o["total_hours"] > 0:
|
||||
lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}")
|
||||
lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}")
|
||||
lines.append("")
|
||||
|
||||
# Model breakdown
|
||||
if report["models"]:
|
||||
lines.append(" 🤖 Models Used")
|
||||
lines.append(" " + "─" * 56)
|
||||
lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}")
|
||||
for m in report["models"]:
|
||||
model_name = m["model"][:28]
|
||||
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} ${m['cost']:>6.2f}")
|
||||
lines.append("")
|
||||
|
||||
# Platform breakdown
|
||||
if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"):
|
||||
lines.append(" 📱 Platforms")
|
||||
lines.append(" " + "─" * 56)
|
||||
lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}")
|
||||
for p in report["platforms"]:
|
||||
lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}")
|
||||
lines.append("")
|
||||
|
||||
# Tool usage
|
||||
if report["tools"]:
|
||||
lines.append(" 🔧 Top Tools")
|
||||
lines.append(" " + "─" * 56)
|
||||
lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}")
|
||||
for t in report["tools"][:15]: # Top 15
|
||||
lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%")
|
||||
if len(report["tools"]) > 15:
|
||||
lines.append(f" ... and {len(report['tools']) - 15} more tools")
|
||||
lines.append("")
|
||||
|
||||
# Activity patterns
|
||||
act = report.get("activity", {})
|
||||
if act.get("by_day"):
|
||||
lines.append(" 📅 Activity Patterns")
|
||||
lines.append(" " + "─" * 56)
|
||||
|
||||
# Day of week chart
|
||||
day_values = [d["count"] for d in act["by_day"]]
|
||||
bars = _bar_chart(day_values, max_width=15)
|
||||
for i, d in enumerate(act["by_day"]):
|
||||
bar = bars[i]
|
||||
lines.append(f" {d['day']} {bar:<15} {d['count']}")
|
||||
|
||||
lines.append("")
|
||||
|
||||
# Peak hours (show top 5 busiest hours)
|
||||
busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True)
|
||||
busy_hours = [h for h in busy_hours if h["count"] > 0][:5]
|
||||
if busy_hours:
|
||||
hour_strs = []
|
||||
for h in busy_hours:
|
||||
hr = h["hour"]
|
||||
ampm = "AM" if hr < 12 else "PM"
|
||||
display_hr = hr % 12 or 12
|
||||
hour_strs.append(f"{display_hr}{ampm} ({h['count']})")
|
||||
lines.append(f" Peak hours: {', '.join(hour_strs)}")
|
||||
|
||||
if act.get("active_days"):
|
||||
lines.append(f" Active days: {act['active_days']}")
|
||||
if act.get("max_streak") and act["max_streak"] > 1:
|
||||
lines.append(f" Best streak: {act['max_streak']} consecutive days")
|
||||
lines.append("")
|
||||
|
||||
# Notable sessions
|
||||
if report.get("top_sessions"):
|
||||
lines.append(" 🏆 Notable Sessions")
|
||||
lines.append(" " + "─" * 56)
|
||||
for ts in report["top_sessions"]:
|
||||
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def format_gateway(self, report: Dict) -> str:
|
||||
"""Format the insights report for gateway/messaging (shorter)."""
|
||||
if report.get("empty"):
|
||||
days = report.get("days", 30)
|
||||
return f"No sessions found in the last {days} days."
|
||||
|
||||
lines = []
|
||||
o = report["overview"]
|
||||
days = report["days"]
|
||||
|
||||
lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
|
||||
|
||||
# Overview
|
||||
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
|
||||
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
|
||||
lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}")
|
||||
if o["total_hours"] > 0:
|
||||
lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
|
||||
lines.append("")
|
||||
|
||||
# Models (top 5)
|
||||
if report["models"]:
|
||||
lines.append("**🤖 Models:**")
|
||||
for m in report["models"][:5]:
|
||||
lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens, ${m['cost']:.2f}")
|
||||
lines.append("")
|
||||
|
||||
# Platforms (if multi-platform)
|
||||
if len(report["platforms"]) > 1:
|
||||
lines.append("**📱 Platforms:**")
|
||||
for p in report["platforms"]:
|
||||
lines.append(f" {p['platform']} — {p['sessions']} sessions, {p['messages']:,} msgs")
|
||||
lines.append("")
|
||||
|
||||
# Tools (top 8)
|
||||
if report["tools"]:
|
||||
lines.append("**🔧 Top Tools:**")
|
||||
for t in report["tools"][:8]:
|
||||
lines.append(f" {t['tool']} — {t['count']:,} calls ({t['percentage']:.1f}%)")
|
||||
lines.append("")
|
||||
|
||||
# Activity summary
|
||||
act = report.get("activity", {})
|
||||
if act.get("busiest_day") and act.get("busiest_hour"):
|
||||
hr = act["busiest_hour"]["hour"]
|
||||
ampm = "AM" if hr < 12 else "PM"
|
||||
display_hr = hr % 12 or 12
|
||||
lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
|
||||
if act.get("active_days"):
|
||||
lines.append(f"**Active days:** {act['active_days']}", )
|
||||
if act.get("max_streak", 0) > 1:
|
||||
lines.append(f"**Best streak:** {act['max_streak']} consecutive days")
|
||||
|
||||
return "\n".join(lines)
|
||||
35
cli.py
35
cli.py
@@ -1858,6 +1858,8 @@ class HermesCLI:
|
||||
self._manual_compress()
|
||||
elif cmd_lower == "/usage":
|
||||
self._show_usage()
|
||||
elif cmd_lower.startswith("/insights"):
|
||||
self._show_insights(cmd_original)
|
||||
elif cmd_lower == "/paste":
|
||||
self._handle_paste_command()
|
||||
elif cmd_lower == "/reload-mcp":
|
||||
@@ -1983,6 +1985,39 @@ class HermesCLI:
|
||||
for quiet_logger in ('tools', 'minisweagent', 'run_agent', 'trajectory_compressor', 'cron', 'hermes_cli'):
|
||||
logging.getLogger(quiet_logger).setLevel(logging.ERROR)
|
||||
|
||||
def _show_insights(self, command: str = "/insights"):
|
||||
"""Show usage insights and analytics from session history."""
|
||||
# Parse optional --days flag
|
||||
parts = command.split()
|
||||
days = 30
|
||||
source = None
|
||||
i = 1
|
||||
while i < len(parts):
|
||||
if parts[i] == "--days" and i + 1 < len(parts):
|
||||
try:
|
||||
days = int(parts[i + 1])
|
||||
except ValueError:
|
||||
print(f" Invalid --days value: {parts[i + 1]}")
|
||||
return
|
||||
i += 2
|
||||
elif parts[i] == "--source" and i + 1 < len(parts):
|
||||
source = parts[i + 1]
|
||||
i += 2
|
||||
else:
|
||||
i += 1
|
||||
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
from agent.insights import InsightsEngine
|
||||
|
||||
db = SessionDB()
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=days, source=source)
|
||||
print(engine.format_terminal(report))
|
||||
db.close()
|
||||
except Exception as e:
|
||||
print(f" Error generating insights: {e}")
|
||||
|
||||
def _reload_mcp(self):
|
||||
"""Reload MCP servers: disconnect all, re-read config.yaml, reconnect.
|
||||
|
||||
|
||||
@@ -659,7 +659,7 @@ class GatewayRunner:
|
||||
# Emit command:* hook for any recognized slash command
|
||||
_known_commands = {"new", "reset", "help", "status", "stop", "model",
|
||||
"personality", "retry", "undo", "sethome", "set-home",
|
||||
"compress", "usage", "reload-mcp", "update"}
|
||||
"compress", "usage", "insights", "reload-mcp", "update"}
|
||||
if command and command in _known_commands:
|
||||
await self.hooks.emit(f"command:{command}", {
|
||||
"platform": source.platform.value if source.platform else "",
|
||||
@@ -701,6 +701,9 @@ class GatewayRunner:
|
||||
if command == "usage":
|
||||
return await self._handle_usage_command(event)
|
||||
|
||||
if command == "insights":
|
||||
return await self._handle_insights_command(event)
|
||||
|
||||
if command == "reload-mcp":
|
||||
return await self._handle_reload_mcp_command(event)
|
||||
|
||||
@@ -1104,6 +1107,7 @@ class GatewayRunner:
|
||||
"`/sethome` — Set this chat as the home channel",
|
||||
"`/compress` — Compress conversation context",
|
||||
"`/usage` — Show token usage for this session",
|
||||
"`/insights [days]` — Show usage insights and analytics",
|
||||
"`/reload-mcp` — Reload MCP servers from config",
|
||||
"`/update` — Update Hermes Agent to the latest version",
|
||||
"`/help` — Show this message",
|
||||
@@ -1397,6 +1401,53 @@ class GatewayRunner:
|
||||
)
|
||||
return "No usage data available for this session."
|
||||
|
||||
async def _handle_insights_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /insights command -- show usage insights and analytics."""
|
||||
import asyncio as _asyncio
|
||||
|
||||
args = event.get_command_args().strip()
|
||||
days = 30
|
||||
source = None
|
||||
|
||||
# Parse simple args: /insights 7 or /insights --days 7
|
||||
if args:
|
||||
parts = args.split()
|
||||
i = 0
|
||||
while i < len(parts):
|
||||
if parts[i] == "--days" and i + 1 < len(parts):
|
||||
try:
|
||||
days = int(parts[i + 1])
|
||||
except ValueError:
|
||||
return f"Invalid --days value: {parts[i + 1]}"
|
||||
i += 2
|
||||
elif parts[i] == "--source" and i + 1 < len(parts):
|
||||
source = parts[i + 1]
|
||||
i += 2
|
||||
elif parts[i].isdigit():
|
||||
days = int(parts[i])
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
from agent.insights import InsightsEngine
|
||||
|
||||
loop = _asyncio.get_event_loop()
|
||||
|
||||
def _run_insights():
|
||||
db = SessionDB()
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=days, source=source)
|
||||
result = engine.format_gateway(report)
|
||||
db.close()
|
||||
return result
|
||||
|
||||
return await loop.run_in_executor(None, _run_insights)
|
||||
except Exception as e:
|
||||
logger.error("Insights command error: %s", e, exc_info=True)
|
||||
return f"Error generating insights: {e}"
|
||||
|
||||
async def _handle_reload_mcp_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /reload-mcp command -- disconnect and reconnect all MCP servers."""
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
@@ -28,6 +28,7 @@ COMMANDS = {
|
||||
"/verbose": "Cycle tool progress display: off → new → all → verbose",
|
||||
"/compress": "Manually compress conversation context (flush memories + summarize)",
|
||||
"/usage": "Show token usage for the current session",
|
||||
"/insights": "Show usage insights and analytics (last 30 days)",
|
||||
"/quit": "Exit the CLI (also: /exit, /q)",
|
||||
}
|
||||
|
||||
|
||||
@@ -1610,6 +1610,32 @@ For more help on a command:
|
||||
|
||||
sessions_parser.set_defaults(func=cmd_sessions)
|
||||
|
||||
# =========================================================================
|
||||
# insights command
|
||||
# =========================================================================
|
||||
insights_parser = subparsers.add_parser(
|
||||
"insights",
|
||||
help="Show usage insights and analytics",
|
||||
description="Analyze session history to show token usage, costs, tool patterns, and activity trends"
|
||||
)
|
||||
insights_parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)")
|
||||
insights_parser.add_argument("--source", help="Filter by platform (cli, telegram, discord, etc.)")
|
||||
|
||||
def cmd_insights(args):
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
from agent.insights import InsightsEngine
|
||||
|
||||
db = SessionDB()
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=args.days, source=args.source)
|
||||
print(engine.format_terminal(report))
|
||||
db.close()
|
||||
except Exception as e:
|
||||
print(f"Error generating insights: {e}")
|
||||
|
||||
insights_parser.set_defaults(func=cmd_insights)
|
||||
|
||||
# =========================================================================
|
||||
# version command
|
||||
# =========================================================================
|
||||
|
||||
582
tests/test_insights.py
Normal file
582
tests/test_insights.py
Normal file
@@ -0,0 +1,582 @@
|
||||
"""Tests for agent/insights.py — InsightsEngine analytics and reporting."""
|
||||
|
||||
import time
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_state import SessionDB
|
||||
from agent.insights import (
|
||||
InsightsEngine,
|
||||
_get_pricing,
|
||||
_estimate_cost,
|
||||
_format_duration,
|
||||
_bar_chart,
|
||||
_DEFAULT_PRICING,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db(tmp_path):
|
||||
"""Create a SessionDB with a temp database file."""
|
||||
db_path = tmp_path / "test_insights.db"
|
||||
session_db = SessionDB(db_path=db_path)
|
||||
yield session_db
|
||||
session_db.close()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def populated_db(db):
|
||||
"""Create a DB with realistic session data for insights testing."""
|
||||
now = time.time()
|
||||
day = 86400
|
||||
|
||||
# Session 1: CLI, claude-sonnet, ended, 2 days ago
|
||||
db.create_session(
|
||||
session_id="s1", source="cli",
|
||||
model="anthropic/claude-sonnet-4-20250514", user_id="user1",
|
||||
)
|
||||
# Backdate the started_at
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's1'", (now - 2 * day,))
|
||||
db.end_session("s1", end_reason="user_exit")
|
||||
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's1'", (now - 2 * day + 3600,))
|
||||
db.update_token_counts("s1", input_tokens=50000, output_tokens=15000)
|
||||
db.append_message("s1", role="user", content="Hello, help me fix a bug")
|
||||
db.append_message("s1", role="assistant", content="Sure, let me look into that.")
|
||||
db.append_message("s1", role="assistant", content="Let me search the files.",
|
||||
tool_calls=[{"function": {"name": "search_files"}}])
|
||||
db.append_message("s1", role="tool", content="Found 3 matches", tool_name="search_files")
|
||||
db.append_message("s1", role="assistant", content="Let me read the file.",
|
||||
tool_calls=[{"function": {"name": "read_file"}}])
|
||||
db.append_message("s1", role="tool", content="file contents...", tool_name="read_file")
|
||||
db.append_message("s1", role="assistant", content="I found the bug. Let me fix it.",
|
||||
tool_calls=[{"function": {"name": "patch"}}])
|
||||
db.append_message("s1", role="tool", content="patched successfully", tool_name="patch")
|
||||
db.append_message("s1", role="user", content="Thanks!")
|
||||
db.append_message("s1", role="assistant", content="You're welcome!")
|
||||
|
||||
# Session 2: Telegram, gpt-4o, ended, 5 days ago
|
||||
db.create_session(
|
||||
session_id="s2", source="telegram",
|
||||
model="gpt-4o", user_id="user1",
|
||||
)
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's2'", (now - 5 * day,))
|
||||
db.end_session("s2", end_reason="timeout")
|
||||
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's2'", (now - 5 * day + 1800,))
|
||||
db.update_token_counts("s2", input_tokens=20000, output_tokens=8000)
|
||||
db.append_message("s2", role="user", content="Search the web for something")
|
||||
db.append_message("s2", role="assistant", content="Searching...",
|
||||
tool_calls=[{"function": {"name": "web_search"}}])
|
||||
db.append_message("s2", role="tool", content="results...", tool_name="web_search")
|
||||
db.append_message("s2", role="assistant", content="Here's what I found")
|
||||
|
||||
# Session 3: CLI, deepseek-chat, ended, 10 days ago
|
||||
db.create_session(
|
||||
session_id="s3", source="cli",
|
||||
model="deepseek-chat", user_id="user1",
|
||||
)
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's3'", (now - 10 * day,))
|
||||
db.end_session("s3", end_reason="user_exit")
|
||||
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's3'", (now - 10 * day + 7200,))
|
||||
db.update_token_counts("s3", input_tokens=100000, output_tokens=40000)
|
||||
db.append_message("s3", role="user", content="Run this terminal command")
|
||||
db.append_message("s3", role="assistant", content="Running...",
|
||||
tool_calls=[{"function": {"name": "terminal"}}])
|
||||
db.append_message("s3", role="tool", content="output...", tool_name="terminal")
|
||||
db.append_message("s3", role="assistant", content="Let me run another",
|
||||
tool_calls=[{"function": {"name": "terminal"}}])
|
||||
db.append_message("s3", role="tool", content="more output...", tool_name="terminal")
|
||||
db.append_message("s3", role="assistant", content="And search files",
|
||||
tool_calls=[{"function": {"name": "search_files"}}])
|
||||
db.append_message("s3", role="tool", content="found stuff", tool_name="search_files")
|
||||
|
||||
# Session 4: Discord, same model as s1, ended, 1 day ago
|
||||
db.create_session(
|
||||
session_id="s4", source="discord",
|
||||
model="anthropic/claude-sonnet-4-20250514", user_id="user2",
|
||||
)
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's4'", (now - 1 * day,))
|
||||
db.end_session("s4", end_reason="user_exit")
|
||||
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's4'", (now - 1 * day + 900,))
|
||||
db.update_token_counts("s4", input_tokens=10000, output_tokens=5000)
|
||||
db.append_message("s4", role="user", content="Quick question")
|
||||
db.append_message("s4", role="assistant", content="Sure, go ahead")
|
||||
|
||||
# Session 5: Old session, 45 days ago (should be excluded from 30-day window)
|
||||
db.create_session(
|
||||
session_id="s_old", source="cli",
|
||||
model="gpt-4o-mini", user_id="user1",
|
||||
)
|
||||
db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's_old'", (now - 45 * day,))
|
||||
db.end_session("s_old", end_reason="user_exit")
|
||||
db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's_old'", (now - 45 * day + 600,))
|
||||
db.update_token_counts("s_old", input_tokens=5000, output_tokens=2000)
|
||||
db.append_message("s_old", role="user", content="old message")
|
||||
db.append_message("s_old", role="assistant", content="old reply")
|
||||
|
||||
db._conn.commit()
|
||||
return db
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Pricing helpers
|
||||
# =========================================================================
|
||||
|
||||
class TestPricing:
|
||||
def test_exact_match(self):
|
||||
pricing = _get_pricing("gpt-4o")
|
||||
assert pricing["input"] == 2.50
|
||||
assert pricing["output"] == 10.00
|
||||
|
||||
def test_provider_prefix_stripped(self):
|
||||
pricing = _get_pricing("anthropic/claude-sonnet-4-20250514")
|
||||
assert pricing["input"] == 3.00
|
||||
assert pricing["output"] == 15.00
|
||||
|
||||
def test_prefix_match(self):
|
||||
pricing = _get_pricing("claude-3-5-sonnet-20241022")
|
||||
assert pricing["input"] == 3.00
|
||||
|
||||
def test_keyword_heuristic_opus(self):
|
||||
pricing = _get_pricing("some-new-opus-model")
|
||||
assert pricing["input"] == 15.00
|
||||
assert pricing["output"] == 75.00
|
||||
|
||||
def test_keyword_heuristic_haiku(self):
|
||||
pricing = _get_pricing("anthropic/claude-haiku-future")
|
||||
assert pricing["input"] == 0.80
|
||||
|
||||
def test_unknown_model_returns_default(self):
|
||||
pricing = _get_pricing("totally-unknown-model-xyz")
|
||||
assert pricing == _DEFAULT_PRICING
|
||||
|
||||
def test_none_model(self):
|
||||
pricing = _get_pricing(None)
|
||||
assert pricing == _DEFAULT_PRICING
|
||||
|
||||
def test_empty_model(self):
|
||||
pricing = _get_pricing("")
|
||||
assert pricing == _DEFAULT_PRICING
|
||||
|
||||
def test_deepseek_heuristic(self):
|
||||
pricing = _get_pricing("deepseek-v3")
|
||||
assert pricing["input"] == 0.14
|
||||
|
||||
def test_gemini_heuristic(self):
|
||||
pricing = _get_pricing("gemini-3.0-ultra")
|
||||
assert pricing["input"] == 0.15
|
||||
|
||||
|
||||
class TestEstimateCost:
|
||||
def test_basic_cost(self):
|
||||
# gpt-4o: 2.50/M input, 10.00/M output
|
||||
cost = _estimate_cost("gpt-4o", 1_000_000, 1_000_000)
|
||||
assert cost == pytest.approx(12.50, abs=0.01)
|
||||
|
||||
def test_zero_tokens(self):
|
||||
cost = _estimate_cost("gpt-4o", 0, 0)
|
||||
assert cost == 0.0
|
||||
|
||||
def test_small_usage(self):
|
||||
cost = _estimate_cost("gpt-4o", 1000, 500)
|
||||
# 1000 * 2.50/1M + 500 * 10.00/1M = 0.0025 + 0.005 = 0.0075
|
||||
assert cost == pytest.approx(0.0075, abs=0.0001)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Format helpers
|
||||
# =========================================================================
|
||||
|
||||
class TestFormatDuration:
|
||||
def test_seconds(self):
|
||||
assert _format_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _format_duration(300) == "5m"
|
||||
|
||||
def test_hours_with_minutes(self):
|
||||
result = _format_duration(5400) # 1.5 hours
|
||||
assert result == "1h 30m"
|
||||
|
||||
def test_exact_hours(self):
|
||||
assert _format_duration(7200) == "2h"
|
||||
|
||||
def test_days(self):
|
||||
result = _format_duration(172800) # 2 days
|
||||
assert result == "2.0d"
|
||||
|
||||
|
||||
class TestBarChart:
|
||||
def test_basic_bars(self):
|
||||
bars = _bar_chart([10, 5, 0, 20], max_width=10)
|
||||
assert len(bars) == 4
|
||||
assert len(bars[3]) == 10 # max value gets full width
|
||||
assert len(bars[0]) == 5 # half of max
|
||||
assert bars[2] == "" # zero gets empty
|
||||
|
||||
def test_empty_values(self):
|
||||
bars = _bar_chart([], max_width=10)
|
||||
assert bars == []
|
||||
|
||||
def test_all_zeros(self):
|
||||
bars = _bar_chart([0, 0, 0], max_width=10)
|
||||
assert all(b == "" for b in bars)
|
||||
|
||||
def test_single_value(self):
|
||||
bars = _bar_chart([5], max_width=10)
|
||||
assert len(bars) == 1
|
||||
assert len(bars[0]) == 10
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# InsightsEngine — empty DB
|
||||
# =========================================================================
|
||||
|
||||
class TestInsightsEmpty:
|
||||
def test_empty_db_returns_empty_report(self, db):
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["empty"] is True
|
||||
assert report["overview"] == {}
|
||||
|
||||
def test_empty_db_terminal_format(self, db):
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_terminal(report)
|
||||
assert "No sessions found" in text
|
||||
|
||||
def test_empty_db_gateway_format(self, db):
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_gateway(report)
|
||||
assert "No sessions found" in text
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# InsightsEngine — populated DB
|
||||
# =========================================================================
|
||||
|
||||
class TestInsightsPopulated:
|
||||
def test_generate_returns_all_sections(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
|
||||
assert report["empty"] is False
|
||||
assert "overview" in report
|
||||
assert "models" in report
|
||||
assert "platforms" in report
|
||||
assert "tools" in report
|
||||
assert "activity" in report
|
||||
assert "top_sessions" in report
|
||||
|
||||
def test_overview_session_count(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
overview = report["overview"]
|
||||
|
||||
# s1, s2, s3, s4 are within 30 days; s_old is 45 days ago
|
||||
assert overview["total_sessions"] == 4
|
||||
|
||||
def test_overview_token_totals(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
overview = report["overview"]
|
||||
|
||||
expected_input = 50000 + 20000 + 100000 + 10000
|
||||
expected_output = 15000 + 8000 + 40000 + 5000
|
||||
assert overview["total_input_tokens"] == expected_input
|
||||
assert overview["total_output_tokens"] == expected_output
|
||||
assert overview["total_tokens"] == expected_input + expected_output
|
||||
|
||||
def test_overview_cost_positive(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["overview"]["estimated_cost"] > 0
|
||||
|
||||
def test_overview_duration_stats(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
overview = report["overview"]
|
||||
|
||||
# All 4 sessions have durations
|
||||
assert overview["total_hours"] > 0
|
||||
assert overview["avg_session_duration"] > 0
|
||||
|
||||
def test_model_breakdown(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
models = report["models"]
|
||||
|
||||
# Should have 3 distinct models (claude-sonnet x2, gpt-4o, deepseek-chat)
|
||||
model_names = [m["model"] for m in models]
|
||||
assert "claude-sonnet-4-20250514" in model_names
|
||||
assert "gpt-4o" in model_names
|
||||
assert "deepseek-chat" in model_names
|
||||
|
||||
# Claude-sonnet has 2 sessions (s1 + s4)
|
||||
claude = next(m for m in models if "claude-sonnet" in m["model"])
|
||||
assert claude["sessions"] == 2
|
||||
|
||||
def test_platform_breakdown(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
platforms = report["platforms"]
|
||||
|
||||
platform_names = [p["platform"] for p in platforms]
|
||||
assert "cli" in platform_names
|
||||
assert "telegram" in platform_names
|
||||
assert "discord" in platform_names
|
||||
|
||||
cli = next(p for p in platforms if p["platform"] == "cli")
|
||||
assert cli["sessions"] == 2 # s1 + s3
|
||||
|
||||
def test_tool_breakdown(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
tools = report["tools"]
|
||||
|
||||
tool_names = [t["tool"] for t in tools]
|
||||
assert "terminal" in tool_names
|
||||
assert "search_files" in tool_names
|
||||
assert "read_file" in tool_names
|
||||
assert "patch" in tool_names
|
||||
assert "web_search" in tool_names
|
||||
|
||||
# terminal was used 2x in s3
|
||||
terminal = next(t for t in tools if t["tool"] == "terminal")
|
||||
assert terminal["count"] == 2
|
||||
|
||||
# Percentages should sum to ~100%
|
||||
total_pct = sum(t["percentage"] for t in tools)
|
||||
assert total_pct == pytest.approx(100.0, abs=0.1)
|
||||
|
||||
def test_activity_patterns(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
activity = report["activity"]
|
||||
|
||||
assert len(activity["by_day"]) == 7
|
||||
assert len(activity["by_hour"]) == 24
|
||||
assert activity["active_days"] >= 1
|
||||
assert activity["busiest_day"] is not None
|
||||
assert activity["busiest_hour"] is not None
|
||||
|
||||
def test_top_sessions(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
top = report["top_sessions"]
|
||||
|
||||
labels = [t["label"] for t in top]
|
||||
assert "Longest session" in labels
|
||||
assert "Most messages" in labels
|
||||
assert "Most tokens" in labels
|
||||
assert "Most tool calls" in labels
|
||||
|
||||
def test_source_filter_cli(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30, source="cli")
|
||||
|
||||
assert report["overview"]["total_sessions"] == 2 # s1, s3
|
||||
|
||||
def test_source_filter_telegram(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30, source="telegram")
|
||||
|
||||
assert report["overview"]["total_sessions"] == 1 # s2
|
||||
|
||||
def test_source_filter_nonexistent(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30, source="slack")
|
||||
|
||||
assert report["empty"] is True
|
||||
|
||||
def test_days_filter_short(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=3)
|
||||
|
||||
# Only s1 (2 days ago) and s4 (1 day ago) should be included
|
||||
assert report["overview"]["total_sessions"] == 2
|
||||
|
||||
def test_days_filter_long(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=60)
|
||||
|
||||
# All 5 sessions should be included
|
||||
assert report["overview"]["total_sessions"] == 5
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Formatting
|
||||
# =========================================================================
|
||||
|
||||
class TestTerminalFormatting:
|
||||
def test_terminal_format_has_sections(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_terminal(report)
|
||||
|
||||
assert "Hermes Insights" in text
|
||||
assert "Overview" in text
|
||||
assert "Models Used" in text
|
||||
assert "Top Tools" in text
|
||||
assert "Activity Patterns" in text
|
||||
assert "Notable Sessions" in text
|
||||
|
||||
def test_terminal_format_shows_tokens(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_terminal(report)
|
||||
|
||||
assert "Input tokens" in text
|
||||
assert "Output tokens" in text
|
||||
assert "Est. cost" in text
|
||||
assert "$" in text
|
||||
|
||||
def test_terminal_format_shows_platforms(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_terminal(report)
|
||||
|
||||
# Multi-platform, so Platforms section should show
|
||||
assert "Platforms" in text
|
||||
assert "cli" in text
|
||||
assert "telegram" in text
|
||||
|
||||
def test_terminal_format_shows_bar_chart(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_terminal(report)
|
||||
|
||||
assert "█" in text # Bar chart characters
|
||||
|
||||
|
||||
class TestGatewayFormatting:
|
||||
def test_gateway_format_is_shorter(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
terminal_text = engine.format_terminal(report)
|
||||
gateway_text = engine.format_gateway(report)
|
||||
|
||||
assert len(gateway_text) < len(terminal_text)
|
||||
|
||||
def test_gateway_format_has_bold(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_gateway(report)
|
||||
|
||||
assert "**" in text # Markdown bold
|
||||
|
||||
def test_gateway_format_shows_cost(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_gateway(report)
|
||||
|
||||
assert "$" in text
|
||||
assert "Est. cost" in text
|
||||
|
||||
def test_gateway_format_shows_models(self, populated_db):
|
||||
engine = InsightsEngine(populated_db)
|
||||
report = engine.generate(days=30)
|
||||
text = engine.format_gateway(report)
|
||||
|
||||
assert "Models" in text
|
||||
assert "sessions" in text
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Edge cases
|
||||
# =========================================================================
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_session_with_no_tokens(self, db):
|
||||
"""Sessions with zero tokens should not crash."""
|
||||
db.create_session(session_id="s1", source="cli", model="test-model")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["empty"] is False
|
||||
assert report["overview"]["total_tokens"] == 0
|
||||
assert report["overview"]["estimated_cost"] == 0.0
|
||||
|
||||
def test_session_with_no_end_time(self, db):
|
||||
"""Active (non-ended) sessions should be included but duration = 0."""
|
||||
db.create_session(session_id="s1", source="cli", model="test-model")
|
||||
db.update_token_counts("s1", input_tokens=1000, output_tokens=500)
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
# Session included
|
||||
assert report["overview"]["total_sessions"] == 1
|
||||
assert report["overview"]["total_tokens"] == 1500
|
||||
# But no duration stats (session not ended)
|
||||
assert report["overview"]["total_hours"] == 0
|
||||
|
||||
def test_session_with_no_model(self, db):
|
||||
"""Sessions with NULL model should not crash."""
|
||||
db.create_session(session_id="s1", source="cli")
|
||||
db.update_token_counts("s1", input_tokens=1000, output_tokens=500)
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["empty"] is False
|
||||
|
||||
models = report["models"]
|
||||
assert len(models) == 1
|
||||
assert models[0]["model"] == "unknown"
|
||||
|
||||
def test_single_session_streak(self, db):
|
||||
"""Single session should have streak of 0 or 1."""
|
||||
db.create_session(session_id="s1", source="cli", model="test")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["activity"]["max_streak"] <= 1
|
||||
|
||||
def test_no_tool_calls(self, db):
|
||||
"""Sessions with no tool calls should produce empty tools list."""
|
||||
db.create_session(session_id="s1", source="cli", model="test")
|
||||
db.append_message("s1", role="user", content="hello")
|
||||
db.append_message("s1", role="assistant", content="hi there")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report["tools"] == []
|
||||
|
||||
def test_only_one_platform(self, db):
|
||||
"""Single-platform usage should still work."""
|
||||
db.create_session(session_id="s1", source="cli", model="test")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert len(report["platforms"]) == 1
|
||||
assert report["platforms"][0]["platform"] == "cli"
|
||||
|
||||
# Terminal format should NOT show platform section for single platform
|
||||
text = engine.format_terminal(report)
|
||||
# (it still shows platforms section if there's only cli and nothing else)
|
||||
# Actually the condition is > 1 platforms OR non-cli, so single cli won't show
|
||||
|
||||
def test_large_days_value(self, db):
|
||||
"""Very large days value should not crash."""
|
||||
db.create_session(session_id="s1", source="cli", model="test")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=365)
|
||||
assert report["empty"] is False
|
||||
|
||||
def test_zero_days(self, db):
|
||||
"""Zero days should return empty (nothing is in the future)."""
|
||||
db.create_session(session_id="s1", source="cli", model="test")
|
||||
db._conn.commit()
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=0)
|
||||
# Depending on timing, might catch the session if created <1s ago
|
||||
# Just verify it doesn't crash
|
||||
assert "empty" in report
|
||||
Reference in New Issue
Block a user