From b52b37ae64811c7f9297b86348290b80e1212b11 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Fri, 6 Mar 2026 14:04:59 -0800 Subject: [PATCH] feat: add /insights command with usage analytics and cost estimation Inspired by Claude Code's /insights, adapted for Hermes Agent's multi-platform architecture. Analyzes session history from state.db to produce comprehensive usage insights. Features: - Overview stats: sessions, messages, tokens, estimated cost, active time - Model breakdown: per-model sessions, tokens, and cost estimation - Platform breakdown: CLI vs Telegram vs Discord etc. (unique to Hermes) - Tool usage ranking: most-used tools with percentages - Activity patterns: day-of-week chart, peak hours, streaks - Notable sessions: longest, most messages, most tokens, most tool calls - Cost estimation: real pricing data for 25+ models (OpenAI, Anthropic, DeepSeek, Google, Meta) with fuzzy model name matching - Configurable time window: --days flag (default 30) - Source filtering: --source flag to filter by platform Three entry points: - /insights slash command in CLI (supports --days and --source flags) - /insights slash command in gateway (compact markdown format) - hermes insights CLI subcommand (standalone) Includes 56 tests covering pricing helpers, format helpers, empty DB, populated DB with multi-platform data, filtering, formatting, and edge cases. --- agent/insights.py | 691 +++++++++++++++++++++++++++++++++++++++++ cli.py | 35 +++ gateway/run.py | 53 +++- hermes_cli/commands.py | 1 + hermes_cli/main.py | 26 ++ tests/test_insights.py | 582 ++++++++++++++++++++++++++++++++++ 6 files changed, 1387 insertions(+), 1 deletion(-) create mode 100644 agent/insights.py create mode 100644 tests/test_insights.py diff --git a/agent/insights.py b/agent/insights.py new file mode 100644 index 000000000..44783ce2a --- /dev/null +++ b/agent/insights.py @@ -0,0 +1,691 @@ +""" +Session Insights Engine for Hermes Agent. + +Analyzes historical session data from the SQLite state database to produce +comprehensive usage insights — token consumption, cost estimates, tool usage +patterns, activity trends, model/platform breakdowns, and session metrics. + +Inspired by Claude Code's /insights command, adapted for Hermes Agent's +multi-platform architecture with additional cost estimation and platform +breakdown capabilities. + +Usage: + from agent.insights import InsightsEngine + engine = InsightsEngine(db) + report = engine.generate(days=30) + print(engine.format_terminal(report)) +""" + +import time +from collections import Counter, defaultdict +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +# ========================================================================= +# Model pricing (USD per million tokens) — approximate as of early 2026 +# ========================================================================= +MODEL_PRICING = { + # OpenAI + "gpt-4o": {"input": 2.50, "output": 10.00}, + "gpt-4o-mini": {"input": 0.15, "output": 0.60}, + "gpt-4.1": {"input": 2.00, "output": 8.00}, + "gpt-4.1-mini": {"input": 0.40, "output": 1.60}, + "gpt-4.1-nano": {"input": 0.10, "output": 0.40}, + "gpt-4.5-preview": {"input": 75.00, "output": 150.00}, + "gpt-5": {"input": 10.00, "output": 30.00}, + "gpt-5.4": {"input": 10.00, "output": 30.00}, + "o3": {"input": 10.00, "output": 40.00}, + "o3-mini": {"input": 1.10, "output": 4.40}, + "o4-mini": {"input": 1.10, "output": 4.40}, + # Anthropic + "claude-opus-4-20250514": {"input": 15.00, "output": 75.00}, + "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00}, + "claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00}, + "claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00}, + "claude-3-opus-20240229": {"input": 15.00, "output": 75.00}, + "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25}, + # DeepSeek + "deepseek-chat": {"input": 0.14, "output": 0.28}, + "deepseek-reasoner": {"input": 0.55, "output": 2.19}, + # Google + "gemini-2.5-pro": {"input": 1.25, "output": 10.00}, + "gemini-2.5-flash": {"input": 0.15, "output": 0.60}, + "gemini-2.0-flash": {"input": 0.10, "output": 0.40}, + # Meta (via providers) + "llama-4-maverick": {"input": 0.50, "output": 0.70}, + "llama-4-scout": {"input": 0.20, "output": 0.30}, +} + +# Fallback pricing for unknown models +_DEFAULT_PRICING = {"input": 3.00, "output": 12.00} + + +def _get_pricing(model_name: str) -> Dict[str, float]: + """Look up pricing for a model. Uses fuzzy matching on model name.""" + if not model_name: + return _DEFAULT_PRICING + + # Strip provider prefix (e.g., "anthropic/claude-..." -> "claude-...") + bare = model_name.split("/")[-1].lower() + + # Exact match first + if bare in MODEL_PRICING: + return MODEL_PRICING[bare] + + # Fuzzy prefix match + for key, price in MODEL_PRICING.items(): + if bare.startswith(key) or key.startswith(bare): + return price + + # Keyword heuristics + if "opus" in bare: + return {"input": 15.00, "output": 75.00} + if "sonnet" in bare: + return {"input": 3.00, "output": 15.00} + if "haiku" in bare: + return {"input": 0.80, "output": 4.00} + if "gpt-4o-mini" in bare: + return {"input": 0.15, "output": 0.60} + if "gpt-4o" in bare: + return {"input": 2.50, "output": 10.00} + if "gpt-5" in bare: + return {"input": 10.00, "output": 30.00} + if "deepseek" in bare: + return {"input": 0.14, "output": 0.28} + if "gemini" in bare: + return {"input": 0.15, "output": 0.60} + + return _DEFAULT_PRICING + + +def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: + """Estimate the USD cost for a given model and token counts.""" + pricing = _get_pricing(model) + return (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + + +def _format_duration(seconds: float) -> str: + """Format seconds into a human-readable duration string.""" + if seconds < 60: + return f"{seconds:.0f}s" + minutes = seconds / 60 + if minutes < 60: + return f"{minutes:.0f}m" + hours = minutes / 60 + if hours < 24: + remaining_min = int(minutes % 60) + return f"{int(hours)}h {remaining_min}m" if remaining_min else f"{int(hours)}h" + days = hours / 24 + return f"{days:.1f}d" + + +def _bar_chart(values: List[int], max_width: int = 20) -> List[str]: + """Create simple horizontal bar chart strings from values.""" + peak = max(values) if values else 1 + if peak == 0: + return ["" for _ in values] + return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values] + + +class InsightsEngine: + """ + Analyzes session history and produces usage insights. + + Works directly with a SessionDB instance (or raw sqlite3 connection) + to query session and message data. + """ + + def __init__(self, db): + """ + Initialize with a SessionDB instance. + + Args: + db: A SessionDB instance (from hermes_state.py) + """ + self.db = db + self._conn = db._conn + + def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]: + """ + Generate a complete insights report. + + Args: + days: Number of days to look back (default: 30) + source: Optional filter by source platform + + Returns: + Dict with all computed insights + """ + cutoff = time.time() - (days * 86400) + + # Gather raw data + sessions = self._get_sessions(cutoff, source) + tool_usage = self._get_tool_usage(cutoff, source) + message_stats = self._get_message_stats(cutoff, source) + + if not sessions: + return { + "days": days, + "source_filter": source, + "empty": True, + "overview": {}, + "models": [], + "platforms": [], + "tools": [], + "activity": {}, + "top_sessions": [], + } + + # Compute insights + overview = self._compute_overview(sessions, message_stats) + models = self._compute_model_breakdown(sessions) + platforms = self._compute_platform_breakdown(sessions) + tools = self._compute_tool_breakdown(tool_usage) + activity = self._compute_activity_patterns(sessions) + top_sessions = self._compute_top_sessions(sessions) + + return { + "days": days, + "source_filter": source, + "empty": False, + "generated_at": time.time(), + "overview": overview, + "models": models, + "platforms": platforms, + "tools": tools, + "activity": activity, + "top_sessions": top_sessions, + } + + # ========================================================================= + # Data gathering (SQL queries) + # ========================================================================= + + def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]: + """Fetch sessions within the time window.""" + if source: + cursor = self._conn.execute( + """SELECT * FROM sessions + WHERE started_at >= ? AND source = ? + ORDER BY started_at DESC""", + (cutoff, source), + ) + else: + cursor = self._conn.execute( + """SELECT * FROM sessions + WHERE started_at >= ? + ORDER BY started_at DESC""", + (cutoff,), + ) + return [dict(row) for row in cursor.fetchall()] + + def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]: + """Get tool call counts from messages.""" + if source: + cursor = self._conn.execute( + """SELECT m.tool_name, COUNT(*) as count + FROM messages m + JOIN sessions s ON s.id = m.session_id + WHERE s.started_at >= ? AND s.source = ? + AND m.role = 'tool' AND m.tool_name IS NOT NULL + GROUP BY m.tool_name + ORDER BY count DESC""", + (cutoff, source), + ) + else: + cursor = self._conn.execute( + """SELECT m.tool_name, COUNT(*) as count + FROM messages m + JOIN sessions s ON s.id = m.session_id + WHERE s.started_at >= ? + AND m.role = 'tool' AND m.tool_name IS NOT NULL + GROUP BY m.tool_name + ORDER BY count DESC""", + (cutoff,), + ) + return [dict(row) for row in cursor.fetchall()] + + def _get_message_stats(self, cutoff: float, source: str = None) -> Dict: + """Get aggregate message statistics.""" + if source: + cursor = self._conn.execute( + """SELECT + COUNT(*) as total_messages, + SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, + SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, + SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages + FROM messages m + JOIN sessions s ON s.id = m.session_id + WHERE s.started_at >= ? AND s.source = ?""", + (cutoff, source), + ) + else: + cursor = self._conn.execute( + """SELECT + COUNT(*) as total_messages, + SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages, + SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages, + SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages + FROM messages m + JOIN sessions s ON s.id = m.session_id + WHERE s.started_at >= ?""", + (cutoff,), + ) + row = cursor.fetchone() + return dict(row) if row else { + "total_messages": 0, "user_messages": 0, + "assistant_messages": 0, "tool_messages": 0, + } + + # ========================================================================= + # Computation + # ========================================================================= + + def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict: + """Compute high-level overview statistics.""" + total_input = sum(s.get("input_tokens") or 0 for s in sessions) + total_output = sum(s.get("output_tokens") or 0 for s in sessions) + total_tokens = total_input + total_output + total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions) + total_messages = sum(s.get("message_count") or 0 for s in sessions) + + # Cost estimation (weighted by model) + total_cost = sum( + _estimate_cost(s.get("model", ""), s.get("input_tokens") or 0, s.get("output_tokens") or 0) + for s in sessions + ) + + # Session duration stats + durations = [] + for s in sessions: + start = s.get("started_at") + end = s.get("ended_at") + if start and end: + durations.append(end - start) + + total_hours = sum(durations) / 3600 if durations else 0 + avg_duration = sum(durations) / len(durations) if durations else 0 + + # Earliest and latest session + started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")] + date_range_start = min(started_timestamps) if started_timestamps else None + date_range_end = max(started_timestamps) if started_timestamps else None + + return { + "total_sessions": len(sessions), + "total_messages": total_messages, + "total_tool_calls": total_tool_calls, + "total_input_tokens": total_input, + "total_output_tokens": total_output, + "total_tokens": total_tokens, + "estimated_cost": total_cost, + "total_hours": total_hours, + "avg_session_duration": avg_duration, + "avg_messages_per_session": total_messages / len(sessions) if sessions else 0, + "avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0, + "user_messages": message_stats.get("user_messages") or 0, + "assistant_messages": message_stats.get("assistant_messages") or 0, + "tool_messages": message_stats.get("tool_messages") or 0, + "date_range_start": date_range_start, + "date_range_end": date_range_end, + } + + def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]: + """Break down usage by model.""" + model_data = defaultdict(lambda: { + "sessions": 0, "input_tokens": 0, "output_tokens": 0, + "total_tokens": 0, "tool_calls": 0, "cost": 0.0, + }) + + for s in sessions: + model = s.get("model") or "unknown" + # Normalize: strip provider prefix for display + display_model = model.split("/")[-1] if "/" in model else model + d = model_data[display_model] + d["sessions"] += 1 + inp = s.get("input_tokens") or 0 + out = s.get("output_tokens") or 0 + d["input_tokens"] += inp + d["output_tokens"] += out + d["total_tokens"] += inp + out + d["tool_calls"] += s.get("tool_call_count") or 0 + d["cost"] += _estimate_cost(model, inp, out) + + result = [ + {"model": model, **data} + for model, data in model_data.items() + ] + result.sort(key=lambda x: x["total_tokens"], reverse=True) + return result + + def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]: + """Break down usage by platform/source.""" + platform_data = defaultdict(lambda: { + "sessions": 0, "messages": 0, "input_tokens": 0, + "output_tokens": 0, "total_tokens": 0, "tool_calls": 0, + }) + + for s in sessions: + source = s.get("source") or "unknown" + d = platform_data[source] + d["sessions"] += 1 + d["messages"] += s.get("message_count") or 0 + inp = s.get("input_tokens") or 0 + out = s.get("output_tokens") or 0 + d["input_tokens"] += inp + d["output_tokens"] += out + d["total_tokens"] += inp + out + d["tool_calls"] += s.get("tool_call_count") or 0 + + result = [ + {"platform": platform, **data} + for platform, data in platform_data.items() + ] + result.sort(key=lambda x: x["sessions"], reverse=True) + return result + + def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]: + """Process tool usage data into a ranked list with percentages.""" + total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0 + result = [] + for t in tool_usage: + pct = (t["count"] / total_calls * 100) if total_calls else 0 + result.append({ + "tool": t["tool_name"], + "count": t["count"], + "percentage": pct, + }) + return result + + def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict: + """Analyze activity patterns by day of week and hour.""" + day_counts = Counter() # 0=Monday ... 6=Sunday + hour_counts = Counter() + daily_counts = Counter() # date string -> count + + for s in sessions: + ts = s.get("started_at") + if not ts: + continue + dt = datetime.fromtimestamp(ts) + day_counts[dt.weekday()] += 1 + hour_counts[dt.hour] += 1 + daily_counts[dt.strftime("%Y-%m-%d")] += 1 + + day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] + day_breakdown = [ + {"day": day_names[i], "count": day_counts.get(i, 0)} + for i in range(7) + ] + + hour_breakdown = [ + {"hour": i, "count": hour_counts.get(i, 0)} + for i in range(24) + ] + + # Busiest day and hour + busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None + busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None + + # Active days (days with at least one session) + active_days = len(daily_counts) + + # Streak calculation + if daily_counts: + all_dates = sorted(daily_counts.keys()) + current_streak = 1 + max_streak = 1 + for i in range(1, len(all_dates)): + d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d") + d2 = datetime.strptime(all_dates[i], "%Y-%m-%d") + if (d2 - d1).days == 1: + current_streak += 1 + max_streak = max(max_streak, current_streak) + else: + current_streak = 1 + else: + max_streak = 0 + + return { + "by_day": day_breakdown, + "by_hour": hour_breakdown, + "busiest_day": busiest_day, + "busiest_hour": busiest_hour, + "active_days": active_days, + "max_streak": max_streak, + } + + def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]: + """Find notable sessions (longest, most messages, most tokens).""" + top = [] + + # Longest by duration + sessions_with_duration = [ + s for s in sessions + if s.get("started_at") and s.get("ended_at") + ] + if sessions_with_duration: + longest = max( + sessions_with_duration, + key=lambda s: (s["ended_at"] - s["started_at"]), + ) + dur = longest["ended_at"] - longest["started_at"] + top.append({ + "label": "Longest session", + "session_id": longest["id"][:16], + "value": _format_duration(dur), + "date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"), + }) + + # Most messages + most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0) + if (most_msgs.get("message_count") or 0) > 0: + top.append({ + "label": "Most messages", + "session_id": most_msgs["id"][:16], + "value": f"{most_msgs['message_count']} msgs", + "date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?", + }) + + # Most tokens + most_tokens = max( + sessions, + key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0), + ) + token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0) + if token_total > 0: + top.append({ + "label": "Most tokens", + "session_id": most_tokens["id"][:16], + "value": f"{token_total:,} tokens", + "date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?", + }) + + # Most tool calls + most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0) + if (most_tools.get("tool_call_count") or 0) > 0: + top.append({ + "label": "Most tool calls", + "session_id": most_tools["id"][:16], + "value": f"{most_tools['tool_call_count']} calls", + "date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?", + }) + + return top + + # ========================================================================= + # Formatting + # ========================================================================= + + def format_terminal(self, report: Dict) -> str: + """Format the insights report for terminal display (CLI).""" + if report.get("empty"): + days = report.get("days", 30) + src = f" (source: {report['source_filter']})" if report.get("source_filter") else "" + return f" No sessions found in the last {days} days{src}." + + lines = [] + o = report["overview"] + days = report["days"] + src_filter = report.get("source_filter") + + # Header + lines.append("") + lines.append(" ╔══════════════════════════════════════════════════════════╗") + lines.append(" ║ 📊 Hermes Insights ║") + period_label = f"Last {days} days" + if src_filter: + period_label += f" ({src_filter})" + padding = 58 - len(period_label) - 2 + left_pad = padding // 2 + right_pad = padding - left_pad + lines.append(f" ║{' ' * left_pad} {period_label} {' ' * right_pad}║") + lines.append(" ╚══════════════════════════════════════════════════════════╝") + lines.append("") + + # Date range + if o.get("date_range_start") and o.get("date_range_end"): + start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y") + end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y") + lines.append(f" Period: {start_str} — {end_str}") + lines.append("") + + # Overview + lines.append(" 📋 Overview") + lines.append(" " + "─" * 56) + lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}") + lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}") + lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}") + lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: ${o['estimated_cost']:.2f}") + if o["total_hours"] > 0: + lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}") + lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}") + lines.append("") + + # Model breakdown + if report["models"]: + lines.append(" 🤖 Models Used") + lines.append(" " + "─" * 56) + lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}") + for m in report["models"]: + model_name = m["model"][:28] + lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} ${m['cost']:>6.2f}") + lines.append("") + + # Platform breakdown + if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"): + lines.append(" 📱 Platforms") + lines.append(" " + "─" * 56) + lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}") + for p in report["platforms"]: + lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}") + lines.append("") + + # Tool usage + if report["tools"]: + lines.append(" 🔧 Top Tools") + lines.append(" " + "─" * 56) + lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}") + for t in report["tools"][:15]: # Top 15 + lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%") + if len(report["tools"]) > 15: + lines.append(f" ... and {len(report['tools']) - 15} more tools") + lines.append("") + + # Activity patterns + act = report.get("activity", {}) + if act.get("by_day"): + lines.append(" 📅 Activity Patterns") + lines.append(" " + "─" * 56) + + # Day of week chart + day_values = [d["count"] for d in act["by_day"]] + bars = _bar_chart(day_values, max_width=15) + for i, d in enumerate(act["by_day"]): + bar = bars[i] + lines.append(f" {d['day']} {bar:<15} {d['count']}") + + lines.append("") + + # Peak hours (show top 5 busiest hours) + busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True) + busy_hours = [h for h in busy_hours if h["count"] > 0][:5] + if busy_hours: + hour_strs = [] + for h in busy_hours: + hr = h["hour"] + ampm = "AM" if hr < 12 else "PM" + display_hr = hr % 12 or 12 + hour_strs.append(f"{display_hr}{ampm} ({h['count']})") + lines.append(f" Peak hours: {', '.join(hour_strs)}") + + if act.get("active_days"): + lines.append(f" Active days: {act['active_days']}") + if act.get("max_streak") and act["max_streak"] > 1: + lines.append(f" Best streak: {act['max_streak']} consecutive days") + lines.append("") + + # Notable sessions + if report.get("top_sessions"): + lines.append(" 🏆 Notable Sessions") + lines.append(" " + "─" * 56) + for ts in report["top_sessions"]: + lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})") + lines.append("") + + return "\n".join(lines) + + def format_gateway(self, report: Dict) -> str: + """Format the insights report for gateway/messaging (shorter).""" + if report.get("empty"): + days = report.get("days", 30) + return f"No sessions found in the last {days} days." + + lines = [] + o = report["overview"] + days = report["days"] + + lines.append(f"📊 **Hermes Insights** — Last {days} days\n") + + # Overview + lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}") + lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})") + lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}") + if o["total_hours"] > 0: + lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}") + lines.append("") + + # Models (top 5) + if report["models"]: + lines.append("**🤖 Models:**") + for m in report["models"][:5]: + lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens, ${m['cost']:.2f}") + lines.append("") + + # Platforms (if multi-platform) + if len(report["platforms"]) > 1: + lines.append("**📱 Platforms:**") + for p in report["platforms"]: + lines.append(f" {p['platform']} — {p['sessions']} sessions, {p['messages']:,} msgs") + lines.append("") + + # Tools (top 8) + if report["tools"]: + lines.append("**🔧 Top Tools:**") + for t in report["tools"][:8]: + lines.append(f" {t['tool']} — {t['count']:,} calls ({t['percentage']:.1f}%)") + lines.append("") + + # Activity summary + act = report.get("activity", {}) + if act.get("busiest_day") and act.get("busiest_hour"): + hr = act["busiest_hour"]["hour"] + ampm = "AM" if hr < 12 else "PM" + display_hr = hr % 12 or 12 + lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)") + if act.get("active_days"): + lines.append(f"**Active days:** {act['active_days']}", ) + if act.get("max_streak", 0) > 1: + lines.append(f"**Best streak:** {act['max_streak']} consecutive days") + + return "\n".join(lines) diff --git a/cli.py b/cli.py index 850db4102..98dfcd91f 100755 --- a/cli.py +++ b/cli.py @@ -1858,6 +1858,8 @@ class HermesCLI: self._manual_compress() elif cmd_lower == "/usage": self._show_usage() + elif cmd_lower.startswith("/insights"): + self._show_insights(cmd_original) elif cmd_lower == "/paste": self._handle_paste_command() elif cmd_lower == "/reload-mcp": @@ -1983,6 +1985,39 @@ class HermesCLI: for quiet_logger in ('tools', 'minisweagent', 'run_agent', 'trajectory_compressor', 'cron', 'hermes_cli'): logging.getLogger(quiet_logger).setLevel(logging.ERROR) + def _show_insights(self, command: str = "/insights"): + """Show usage insights and analytics from session history.""" + # Parse optional --days flag + parts = command.split() + days = 30 + source = None + i = 1 + while i < len(parts): + if parts[i] == "--days" and i + 1 < len(parts): + try: + days = int(parts[i + 1]) + except ValueError: + print(f" Invalid --days value: {parts[i + 1]}") + return + i += 2 + elif parts[i] == "--source" and i + 1 < len(parts): + source = parts[i + 1] + i += 2 + else: + i += 1 + + try: + from hermes_state import SessionDB + from agent.insights import InsightsEngine + + db = SessionDB() + engine = InsightsEngine(db) + report = engine.generate(days=days, source=source) + print(engine.format_terminal(report)) + db.close() + except Exception as e: + print(f" Error generating insights: {e}") + def _reload_mcp(self): """Reload MCP servers: disconnect all, re-read config.yaml, reconnect. diff --git a/gateway/run.py b/gateway/run.py index 59f74b39b..9e05e9988 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -659,7 +659,7 @@ class GatewayRunner: # Emit command:* hook for any recognized slash command _known_commands = {"new", "reset", "help", "status", "stop", "model", "personality", "retry", "undo", "sethome", "set-home", - "compress", "usage", "reload-mcp", "update"} + "compress", "usage", "insights", "reload-mcp", "update"} if command and command in _known_commands: await self.hooks.emit(f"command:{command}", { "platform": source.platform.value if source.platform else "", @@ -701,6 +701,9 @@ class GatewayRunner: if command == "usage": return await self._handle_usage_command(event) + if command == "insights": + return await self._handle_insights_command(event) + if command == "reload-mcp": return await self._handle_reload_mcp_command(event) @@ -1104,6 +1107,7 @@ class GatewayRunner: "`/sethome` — Set this chat as the home channel", "`/compress` — Compress conversation context", "`/usage` — Show token usage for this session", + "`/insights [days]` — Show usage insights and analytics", "`/reload-mcp` — Reload MCP servers from config", "`/update` — Update Hermes Agent to the latest version", "`/help` — Show this message", @@ -1397,6 +1401,53 @@ class GatewayRunner: ) return "No usage data available for this session." + async def _handle_insights_command(self, event: MessageEvent) -> str: + """Handle /insights command -- show usage insights and analytics.""" + import asyncio as _asyncio + + args = event.get_command_args().strip() + days = 30 + source = None + + # Parse simple args: /insights 7 or /insights --days 7 + if args: + parts = args.split() + i = 0 + while i < len(parts): + if parts[i] == "--days" and i + 1 < len(parts): + try: + days = int(parts[i + 1]) + except ValueError: + return f"Invalid --days value: {parts[i + 1]}" + i += 2 + elif parts[i] == "--source" and i + 1 < len(parts): + source = parts[i + 1] + i += 2 + elif parts[i].isdigit(): + days = int(parts[i]) + i += 1 + else: + i += 1 + + try: + from hermes_state import SessionDB + from agent.insights import InsightsEngine + + loop = _asyncio.get_event_loop() + + def _run_insights(): + db = SessionDB() + engine = InsightsEngine(db) + report = engine.generate(days=days, source=source) + result = engine.format_gateway(report) + db.close() + return result + + return await loop.run_in_executor(None, _run_insights) + except Exception as e: + logger.error("Insights command error: %s", e, exc_info=True) + return f"Error generating insights: {e}" + async def _handle_reload_mcp_command(self, event: MessageEvent) -> str: """Handle /reload-mcp command -- disconnect and reconnect all MCP servers.""" loop = asyncio.get_event_loop() diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index b091a7905..887476339 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -28,6 +28,7 @@ COMMANDS = { "/verbose": "Cycle tool progress display: off → new → all → verbose", "/compress": "Manually compress conversation context (flush memories + summarize)", "/usage": "Show token usage for the current session", + "/insights": "Show usage insights and analytics (last 30 days)", "/quit": "Exit the CLI (also: /exit, /q)", } diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 72a442e04..d868a34d2 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -1610,6 +1610,32 @@ For more help on a command: sessions_parser.set_defaults(func=cmd_sessions) + # ========================================================================= + # insights command + # ========================================================================= + insights_parser = subparsers.add_parser( + "insights", + help="Show usage insights and analytics", + description="Analyze session history to show token usage, costs, tool patterns, and activity trends" + ) + insights_parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)") + insights_parser.add_argument("--source", help="Filter by platform (cli, telegram, discord, etc.)") + + def cmd_insights(args): + try: + from hermes_state import SessionDB + from agent.insights import InsightsEngine + + db = SessionDB() + engine = InsightsEngine(db) + report = engine.generate(days=args.days, source=args.source) + print(engine.format_terminal(report)) + db.close() + except Exception as e: + print(f"Error generating insights: {e}") + + insights_parser.set_defaults(func=cmd_insights) + # ========================================================================= # version command # ========================================================================= diff --git a/tests/test_insights.py b/tests/test_insights.py new file mode 100644 index 000000000..3cc7c7e80 --- /dev/null +++ b/tests/test_insights.py @@ -0,0 +1,582 @@ +"""Tests for agent/insights.py — InsightsEngine analytics and reporting.""" + +import time +import pytest +from pathlib import Path + +from hermes_state import SessionDB +from agent.insights import ( + InsightsEngine, + _get_pricing, + _estimate_cost, + _format_duration, + _bar_chart, + _DEFAULT_PRICING, +) + + +@pytest.fixture() +def db(tmp_path): + """Create a SessionDB with a temp database file.""" + db_path = tmp_path / "test_insights.db" + session_db = SessionDB(db_path=db_path) + yield session_db + session_db.close() + + +@pytest.fixture() +def populated_db(db): + """Create a DB with realistic session data for insights testing.""" + now = time.time() + day = 86400 + + # Session 1: CLI, claude-sonnet, ended, 2 days ago + db.create_session( + session_id="s1", source="cli", + model="anthropic/claude-sonnet-4-20250514", user_id="user1", + ) + # Backdate the started_at + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's1'", (now - 2 * day,)) + db.end_session("s1", end_reason="user_exit") + db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's1'", (now - 2 * day + 3600,)) + db.update_token_counts("s1", input_tokens=50000, output_tokens=15000) + db.append_message("s1", role="user", content="Hello, help me fix a bug") + db.append_message("s1", role="assistant", content="Sure, let me look into that.") + db.append_message("s1", role="assistant", content="Let me search the files.", + tool_calls=[{"function": {"name": "search_files"}}]) + db.append_message("s1", role="tool", content="Found 3 matches", tool_name="search_files") + db.append_message("s1", role="assistant", content="Let me read the file.", + tool_calls=[{"function": {"name": "read_file"}}]) + db.append_message("s1", role="tool", content="file contents...", tool_name="read_file") + db.append_message("s1", role="assistant", content="I found the bug. Let me fix it.", + tool_calls=[{"function": {"name": "patch"}}]) + db.append_message("s1", role="tool", content="patched successfully", tool_name="patch") + db.append_message("s1", role="user", content="Thanks!") + db.append_message("s1", role="assistant", content="You're welcome!") + + # Session 2: Telegram, gpt-4o, ended, 5 days ago + db.create_session( + session_id="s2", source="telegram", + model="gpt-4o", user_id="user1", + ) + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's2'", (now - 5 * day,)) + db.end_session("s2", end_reason="timeout") + db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's2'", (now - 5 * day + 1800,)) + db.update_token_counts("s2", input_tokens=20000, output_tokens=8000) + db.append_message("s2", role="user", content="Search the web for something") + db.append_message("s2", role="assistant", content="Searching...", + tool_calls=[{"function": {"name": "web_search"}}]) + db.append_message("s2", role="tool", content="results...", tool_name="web_search") + db.append_message("s2", role="assistant", content="Here's what I found") + + # Session 3: CLI, deepseek-chat, ended, 10 days ago + db.create_session( + session_id="s3", source="cli", + model="deepseek-chat", user_id="user1", + ) + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's3'", (now - 10 * day,)) + db.end_session("s3", end_reason="user_exit") + db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's3'", (now - 10 * day + 7200,)) + db.update_token_counts("s3", input_tokens=100000, output_tokens=40000) + db.append_message("s3", role="user", content="Run this terminal command") + db.append_message("s3", role="assistant", content="Running...", + tool_calls=[{"function": {"name": "terminal"}}]) + db.append_message("s3", role="tool", content="output...", tool_name="terminal") + db.append_message("s3", role="assistant", content="Let me run another", + tool_calls=[{"function": {"name": "terminal"}}]) + db.append_message("s3", role="tool", content="more output...", tool_name="terminal") + db.append_message("s3", role="assistant", content="And search files", + tool_calls=[{"function": {"name": "search_files"}}]) + db.append_message("s3", role="tool", content="found stuff", tool_name="search_files") + + # Session 4: Discord, same model as s1, ended, 1 day ago + db.create_session( + session_id="s4", source="discord", + model="anthropic/claude-sonnet-4-20250514", user_id="user2", + ) + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's4'", (now - 1 * day,)) + db.end_session("s4", end_reason="user_exit") + db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's4'", (now - 1 * day + 900,)) + db.update_token_counts("s4", input_tokens=10000, output_tokens=5000) + db.append_message("s4", role="user", content="Quick question") + db.append_message("s4", role="assistant", content="Sure, go ahead") + + # Session 5: Old session, 45 days ago (should be excluded from 30-day window) + db.create_session( + session_id="s_old", source="cli", + model="gpt-4o-mini", user_id="user1", + ) + db._conn.execute("UPDATE sessions SET started_at = ? WHERE id = 's_old'", (now - 45 * day,)) + db.end_session("s_old", end_reason="user_exit") + db._conn.execute("UPDATE sessions SET ended_at = ? WHERE id = 's_old'", (now - 45 * day + 600,)) + db.update_token_counts("s_old", input_tokens=5000, output_tokens=2000) + db.append_message("s_old", role="user", content="old message") + db.append_message("s_old", role="assistant", content="old reply") + + db._conn.commit() + return db + + +# ========================================================================= +# Pricing helpers +# ========================================================================= + +class TestPricing: + def test_exact_match(self): + pricing = _get_pricing("gpt-4o") + assert pricing["input"] == 2.50 + assert pricing["output"] == 10.00 + + def test_provider_prefix_stripped(self): + pricing = _get_pricing("anthropic/claude-sonnet-4-20250514") + assert pricing["input"] == 3.00 + assert pricing["output"] == 15.00 + + def test_prefix_match(self): + pricing = _get_pricing("claude-3-5-sonnet-20241022") + assert pricing["input"] == 3.00 + + def test_keyword_heuristic_opus(self): + pricing = _get_pricing("some-new-opus-model") + assert pricing["input"] == 15.00 + assert pricing["output"] == 75.00 + + def test_keyword_heuristic_haiku(self): + pricing = _get_pricing("anthropic/claude-haiku-future") + assert pricing["input"] == 0.80 + + def test_unknown_model_returns_default(self): + pricing = _get_pricing("totally-unknown-model-xyz") + assert pricing == _DEFAULT_PRICING + + def test_none_model(self): + pricing = _get_pricing(None) + assert pricing == _DEFAULT_PRICING + + def test_empty_model(self): + pricing = _get_pricing("") + assert pricing == _DEFAULT_PRICING + + def test_deepseek_heuristic(self): + pricing = _get_pricing("deepseek-v3") + assert pricing["input"] == 0.14 + + def test_gemini_heuristic(self): + pricing = _get_pricing("gemini-3.0-ultra") + assert pricing["input"] == 0.15 + + +class TestEstimateCost: + def test_basic_cost(self): + # gpt-4o: 2.50/M input, 10.00/M output + cost = _estimate_cost("gpt-4o", 1_000_000, 1_000_000) + assert cost == pytest.approx(12.50, abs=0.01) + + def test_zero_tokens(self): + cost = _estimate_cost("gpt-4o", 0, 0) + assert cost == 0.0 + + def test_small_usage(self): + cost = _estimate_cost("gpt-4o", 1000, 500) + # 1000 * 2.50/1M + 500 * 10.00/1M = 0.0025 + 0.005 = 0.0075 + assert cost == pytest.approx(0.0075, abs=0.0001) + + +# ========================================================================= +# Format helpers +# ========================================================================= + +class TestFormatDuration: + def test_seconds(self): + assert _format_duration(45) == "45s" + + def test_minutes(self): + assert _format_duration(300) == "5m" + + def test_hours_with_minutes(self): + result = _format_duration(5400) # 1.5 hours + assert result == "1h 30m" + + def test_exact_hours(self): + assert _format_duration(7200) == "2h" + + def test_days(self): + result = _format_duration(172800) # 2 days + assert result == "2.0d" + + +class TestBarChart: + def test_basic_bars(self): + bars = _bar_chart([10, 5, 0, 20], max_width=10) + assert len(bars) == 4 + assert len(bars[3]) == 10 # max value gets full width + assert len(bars[0]) == 5 # half of max + assert bars[2] == "" # zero gets empty + + def test_empty_values(self): + bars = _bar_chart([], max_width=10) + assert bars == [] + + def test_all_zeros(self): + bars = _bar_chart([0, 0, 0], max_width=10) + assert all(b == "" for b in bars) + + def test_single_value(self): + bars = _bar_chart([5], max_width=10) + assert len(bars) == 1 + assert len(bars[0]) == 10 + + +# ========================================================================= +# InsightsEngine — empty DB +# ========================================================================= + +class TestInsightsEmpty: + def test_empty_db_returns_empty_report(self, db): + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["empty"] is True + assert report["overview"] == {} + + def test_empty_db_terminal_format(self, db): + engine = InsightsEngine(db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + assert "No sessions found" in text + + def test_empty_db_gateway_format(self, db): + engine = InsightsEngine(db) + report = engine.generate(days=30) + text = engine.format_gateway(report) + assert "No sessions found" in text + + +# ========================================================================= +# InsightsEngine — populated DB +# ========================================================================= + +class TestInsightsPopulated: + def test_generate_returns_all_sections(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + + assert report["empty"] is False + assert "overview" in report + assert "models" in report + assert "platforms" in report + assert "tools" in report + assert "activity" in report + assert "top_sessions" in report + + def test_overview_session_count(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + overview = report["overview"] + + # s1, s2, s3, s4 are within 30 days; s_old is 45 days ago + assert overview["total_sessions"] == 4 + + def test_overview_token_totals(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + overview = report["overview"] + + expected_input = 50000 + 20000 + 100000 + 10000 + expected_output = 15000 + 8000 + 40000 + 5000 + assert overview["total_input_tokens"] == expected_input + assert overview["total_output_tokens"] == expected_output + assert overview["total_tokens"] == expected_input + expected_output + + def test_overview_cost_positive(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + assert report["overview"]["estimated_cost"] > 0 + + def test_overview_duration_stats(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + overview = report["overview"] + + # All 4 sessions have durations + assert overview["total_hours"] > 0 + assert overview["avg_session_duration"] > 0 + + def test_model_breakdown(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + models = report["models"] + + # Should have 3 distinct models (claude-sonnet x2, gpt-4o, deepseek-chat) + model_names = [m["model"] for m in models] + assert "claude-sonnet-4-20250514" in model_names + assert "gpt-4o" in model_names + assert "deepseek-chat" in model_names + + # Claude-sonnet has 2 sessions (s1 + s4) + claude = next(m for m in models if "claude-sonnet" in m["model"]) + assert claude["sessions"] == 2 + + def test_platform_breakdown(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + platforms = report["platforms"] + + platform_names = [p["platform"] for p in platforms] + assert "cli" in platform_names + assert "telegram" in platform_names + assert "discord" in platform_names + + cli = next(p for p in platforms if p["platform"] == "cli") + assert cli["sessions"] == 2 # s1 + s3 + + def test_tool_breakdown(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + tools = report["tools"] + + tool_names = [t["tool"] for t in tools] + assert "terminal" in tool_names + assert "search_files" in tool_names + assert "read_file" in tool_names + assert "patch" in tool_names + assert "web_search" in tool_names + + # terminal was used 2x in s3 + terminal = next(t for t in tools if t["tool"] == "terminal") + assert terminal["count"] == 2 + + # Percentages should sum to ~100% + total_pct = sum(t["percentage"] for t in tools) + assert total_pct == pytest.approx(100.0, abs=0.1) + + def test_activity_patterns(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + activity = report["activity"] + + assert len(activity["by_day"]) == 7 + assert len(activity["by_hour"]) == 24 + assert activity["active_days"] >= 1 + assert activity["busiest_day"] is not None + assert activity["busiest_hour"] is not None + + def test_top_sessions(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + top = report["top_sessions"] + + labels = [t["label"] for t in top] + assert "Longest session" in labels + assert "Most messages" in labels + assert "Most tokens" in labels + assert "Most tool calls" in labels + + def test_source_filter_cli(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30, source="cli") + + assert report["overview"]["total_sessions"] == 2 # s1, s3 + + def test_source_filter_telegram(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30, source="telegram") + + assert report["overview"]["total_sessions"] == 1 # s2 + + def test_source_filter_nonexistent(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30, source="slack") + + assert report["empty"] is True + + def test_days_filter_short(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=3) + + # Only s1 (2 days ago) and s4 (1 day ago) should be included + assert report["overview"]["total_sessions"] == 2 + + def test_days_filter_long(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=60) + + # All 5 sessions should be included + assert report["overview"]["total_sessions"] == 5 + + +# ========================================================================= +# Formatting +# ========================================================================= + +class TestTerminalFormatting: + def test_terminal_format_has_sections(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + + assert "Hermes Insights" in text + assert "Overview" in text + assert "Models Used" in text + assert "Top Tools" in text + assert "Activity Patterns" in text + assert "Notable Sessions" in text + + def test_terminal_format_shows_tokens(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + + assert "Input tokens" in text + assert "Output tokens" in text + assert "Est. cost" in text + assert "$" in text + + def test_terminal_format_shows_platforms(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + + # Multi-platform, so Platforms section should show + assert "Platforms" in text + assert "cli" in text + assert "telegram" in text + + def test_terminal_format_shows_bar_chart(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_terminal(report) + + assert "█" in text # Bar chart characters + + +class TestGatewayFormatting: + def test_gateway_format_is_shorter(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + terminal_text = engine.format_terminal(report) + gateway_text = engine.format_gateway(report) + + assert len(gateway_text) < len(terminal_text) + + def test_gateway_format_has_bold(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_gateway(report) + + assert "**" in text # Markdown bold + + def test_gateway_format_shows_cost(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_gateway(report) + + assert "$" in text + assert "Est. cost" in text + + def test_gateway_format_shows_models(self, populated_db): + engine = InsightsEngine(populated_db) + report = engine.generate(days=30) + text = engine.format_gateway(report) + + assert "Models" in text + assert "sessions" in text + + +# ========================================================================= +# Edge cases +# ========================================================================= + +class TestEdgeCases: + def test_session_with_no_tokens(self, db): + """Sessions with zero tokens should not crash.""" + db.create_session(session_id="s1", source="cli", model="test-model") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["empty"] is False + assert report["overview"]["total_tokens"] == 0 + assert report["overview"]["estimated_cost"] == 0.0 + + def test_session_with_no_end_time(self, db): + """Active (non-ended) sessions should be included but duration = 0.""" + db.create_session(session_id="s1", source="cli", model="test-model") + db.update_token_counts("s1", input_tokens=1000, output_tokens=500) + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + # Session included + assert report["overview"]["total_sessions"] == 1 + assert report["overview"]["total_tokens"] == 1500 + # But no duration stats (session not ended) + assert report["overview"]["total_hours"] == 0 + + def test_session_with_no_model(self, db): + """Sessions with NULL model should not crash.""" + db.create_session(session_id="s1", source="cli") + db.update_token_counts("s1", input_tokens=1000, output_tokens=500) + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["empty"] is False + + models = report["models"] + assert len(models) == 1 + assert models[0]["model"] == "unknown" + + def test_single_session_streak(self, db): + """Single session should have streak of 0 or 1.""" + db.create_session(session_id="s1", source="cli", model="test") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["activity"]["max_streak"] <= 1 + + def test_no_tool_calls(self, db): + """Sessions with no tool calls should produce empty tools list.""" + db.create_session(session_id="s1", source="cli", model="test") + db.append_message("s1", role="user", content="hello") + db.append_message("s1", role="assistant", content="hi there") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert report["tools"] == [] + + def test_only_one_platform(self, db): + """Single-platform usage should still work.""" + db.create_session(session_id="s1", source="cli", model="test") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=30) + assert len(report["platforms"]) == 1 + assert report["platforms"][0]["platform"] == "cli" + + # Terminal format should NOT show platform section for single platform + text = engine.format_terminal(report) + # (it still shows platforms section if there's only cli and nothing else) + # Actually the condition is > 1 platforms OR non-cli, so single cli won't show + + def test_large_days_value(self, db): + """Very large days value should not crash.""" + db.create_session(session_id="s1", source="cli", model="test") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=365) + assert report["empty"] is False + + def test_zero_days(self, db): + """Zero days should return empty (nothing is in the future).""" + db.create_session(session_id="s1", source="cli", model="test") + db._conn.commit() + + engine = InsightsEngine(db) + report = engine.generate(days=0) + # Depending on timing, might catch the session if created <1s ago + # Just verify it doesn't crash + assert "empty" in report