Three categories of cleanup, all zero-behavioral-change:
1. F-strings without placeholders (154 fixes across 29 files)
- Converted f'...' to '...' where no {expression} was present
- Heaviest files: run_agent.py (24), cli.py (20), honcho_integration/cli.py (34)
2. Simplify defensive patterns in run_agent.py
- Added explicit self._is_anthropic_oauth = False in __init__ (before
the api_mode branch that conditionally sets it)
- Replaced 7x getattr(self, '_is_anthropic_oauth', False) with direct
self._is_anthropic_oauth (attribute always initialized now)
- Added _is_openrouter_url() and _is_anthropic_url() helper methods
- Replaced 3 inline 'openrouter' in self._base_url_lower checks
3. Remove dead code in small files
- hermes_cli/claw.py: removed unused 'total' computation
- tools/fuzzy_match.py: removed unused strip_indent() function and
pattern_stripped variable
Full test suite: 6184 passed, 0 failures
E2E PTY: banner clean, tool calls work, zero garbled ANSI
793 lines
33 KiB
Python
793 lines
33 KiB
Python
"""
|
|
Session Insights Engine for Hermes Agent.
|
|
|
|
Analyzes historical session data from the SQLite state database to produce
|
|
comprehensive usage insights — token consumption, cost estimates, tool usage
|
|
patterns, activity trends, model/platform breakdowns, and session metrics.
|
|
|
|
Inspired by Claude Code's /insights command, adapted for Hermes Agent's
|
|
multi-platform architecture with additional cost estimation and platform
|
|
breakdown capabilities.
|
|
|
|
Usage:
|
|
from agent.insights import InsightsEngine
|
|
engine = InsightsEngine(db)
|
|
report = engine.generate(days=30)
|
|
print(engine.format_terminal(report))
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List
|
|
|
|
from agent.usage_pricing import (
|
|
CanonicalUsage,
|
|
DEFAULT_PRICING,
|
|
estimate_usage_cost,
|
|
format_duration_compact,
|
|
get_pricing,
|
|
has_known_pricing,
|
|
)
|
|
|
|
_DEFAULT_PRICING = DEFAULT_PRICING
|
|
|
|
|
|
def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool:
|
|
"""Check if a model has known pricing (vs unknown/custom endpoint)."""
|
|
return has_known_pricing(model_name, provider=provider, base_url=base_url)
|
|
|
|
|
|
def _get_pricing(model_name: str) -> Dict[str, float]:
|
|
"""Look up pricing for a model. Uses fuzzy matching on model name.
|
|
|
|
Returns _DEFAULT_PRICING (zero cost) for unknown/custom models —
|
|
we can't assume costs for self-hosted endpoints, local inference, etc.
|
|
"""
|
|
return get_pricing(model_name)
|
|
|
|
|
|
def _estimate_cost(
|
|
session_or_model: Dict[str, Any] | str,
|
|
input_tokens: int = 0,
|
|
output_tokens: int = 0,
|
|
*,
|
|
cache_read_tokens: int = 0,
|
|
cache_write_tokens: int = 0,
|
|
provider: str = None,
|
|
base_url: str = None,
|
|
) -> tuple[float, str]:
|
|
"""Estimate the USD cost for a session row or a model/token tuple."""
|
|
if isinstance(session_or_model, dict):
|
|
session = session_or_model
|
|
model = session.get("model") or ""
|
|
usage = CanonicalUsage(
|
|
input_tokens=session.get("input_tokens") or 0,
|
|
output_tokens=session.get("output_tokens") or 0,
|
|
cache_read_tokens=session.get("cache_read_tokens") or 0,
|
|
cache_write_tokens=session.get("cache_write_tokens") or 0,
|
|
)
|
|
provider = session.get("billing_provider")
|
|
base_url = session.get("billing_base_url")
|
|
else:
|
|
model = session_or_model or ""
|
|
usage = CanonicalUsage(
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
cache_read_tokens=cache_read_tokens,
|
|
cache_write_tokens=cache_write_tokens,
|
|
)
|
|
result = estimate_usage_cost(
|
|
model,
|
|
usage,
|
|
provider=provider,
|
|
base_url=base_url,
|
|
)
|
|
return float(result.amount_usd or 0.0), result.status
|
|
|
|
|
|
def _format_duration(seconds: float) -> str:
|
|
"""Format seconds into a human-readable duration string."""
|
|
return format_duration_compact(seconds)
|
|
|
|
|
|
def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
|
|
"""Create simple horizontal bar chart strings from values."""
|
|
peak = max(values) if values else 1
|
|
if peak == 0:
|
|
return ["" for _ in values]
|
|
return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
|
|
|
|
|
|
class InsightsEngine:
|
|
"""
|
|
Analyzes session history and produces usage insights.
|
|
|
|
Works directly with a SessionDB instance (or raw sqlite3 connection)
|
|
to query session and message data.
|
|
"""
|
|
|
|
def __init__(self, db):
|
|
"""
|
|
Initialize with a SessionDB instance.
|
|
|
|
Args:
|
|
db: A SessionDB instance (from hermes_state.py)
|
|
"""
|
|
self.db = db
|
|
self._conn = db._conn
|
|
|
|
def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Generate a complete insights report.
|
|
|
|
Args:
|
|
days: Number of days to look back (default: 30)
|
|
source: Optional filter by source platform
|
|
|
|
Returns:
|
|
Dict with all computed insights
|
|
"""
|
|
cutoff = time.time() - (days * 86400)
|
|
|
|
# Gather raw data
|
|
sessions = self._get_sessions(cutoff, source)
|
|
tool_usage = self._get_tool_usage(cutoff, source)
|
|
message_stats = self._get_message_stats(cutoff, source)
|
|
|
|
if not sessions:
|
|
return {
|
|
"days": days,
|
|
"source_filter": source,
|
|
"empty": True,
|
|
"overview": {},
|
|
"models": [],
|
|
"platforms": [],
|
|
"tools": [],
|
|
"activity": {},
|
|
"top_sessions": [],
|
|
}
|
|
|
|
# Compute insights
|
|
overview = self._compute_overview(sessions, message_stats)
|
|
models = self._compute_model_breakdown(sessions)
|
|
platforms = self._compute_platform_breakdown(sessions)
|
|
tools = self._compute_tool_breakdown(tool_usage)
|
|
activity = self._compute_activity_patterns(sessions)
|
|
top_sessions = self._compute_top_sessions(sessions)
|
|
|
|
return {
|
|
"days": days,
|
|
"source_filter": source,
|
|
"empty": False,
|
|
"generated_at": time.time(),
|
|
"overview": overview,
|
|
"models": models,
|
|
"platforms": platforms,
|
|
"tools": tools,
|
|
"activity": activity,
|
|
"top_sessions": top_sessions,
|
|
}
|
|
|
|
# =========================================================================
|
|
# Data gathering (SQL queries)
|
|
# =========================================================================
|
|
|
|
# Columns we actually need (skip system_prompt, model_config blobs)
|
|
_SESSION_COLS = ("id, source, model, started_at, ended_at, "
|
|
"message_count, tool_call_count, input_tokens, output_tokens, "
|
|
"cache_read_tokens, cache_write_tokens, billing_provider, "
|
|
"billing_base_url, billing_mode, estimated_cost_usd, "
|
|
"actual_cost_usd, cost_status, cost_source")
|
|
|
|
# Pre-computed query strings — f-string evaluated once at class definition,
|
|
# not at runtime, so no user-controlled value can alter the query structure.
|
|
_GET_SESSIONS_WITH_SOURCE = (
|
|
f"SELECT {_SESSION_COLS} FROM sessions"
|
|
" WHERE started_at >= ? AND source = ?"
|
|
" ORDER BY started_at DESC"
|
|
)
|
|
_GET_SESSIONS_ALL = (
|
|
f"SELECT {_SESSION_COLS} FROM sessions"
|
|
" WHERE started_at >= ?"
|
|
" ORDER BY started_at DESC"
|
|
)
|
|
|
|
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
|
|
"""Fetch sessions within the time window."""
|
|
if source:
|
|
cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source))
|
|
else:
|
|
cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,))
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
|
|
"""Get tool call counts from messages.
|
|
|
|
Uses two sources:
|
|
1. tool_name column on 'tool' role messages (set by gateway)
|
|
2. tool_calls JSON on 'assistant' role messages (covers CLI where
|
|
tool_name is not populated on tool responses)
|
|
"""
|
|
tool_counts = Counter()
|
|
|
|
# Source 1: explicit tool_name on tool response messages
|
|
if source:
|
|
cursor = self._conn.execute(
|
|
"""SELECT m.tool_name, COUNT(*) as count
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ? AND s.source = ?
|
|
AND m.role = 'tool' AND m.tool_name IS NOT NULL
|
|
GROUP BY m.tool_name
|
|
ORDER BY count DESC""",
|
|
(cutoff, source),
|
|
)
|
|
else:
|
|
cursor = self._conn.execute(
|
|
"""SELECT m.tool_name, COUNT(*) as count
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ?
|
|
AND m.role = 'tool' AND m.tool_name IS NOT NULL
|
|
GROUP BY m.tool_name
|
|
ORDER BY count DESC""",
|
|
(cutoff,),
|
|
)
|
|
for row in cursor.fetchall():
|
|
tool_counts[row["tool_name"]] += row["count"]
|
|
|
|
# Source 2: extract from tool_calls JSON on assistant messages
|
|
# (covers CLI sessions where tool_name is NULL on tool responses)
|
|
if source:
|
|
cursor2 = self._conn.execute(
|
|
"""SELECT m.tool_calls
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ? AND s.source = ?
|
|
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
|
|
(cutoff, source),
|
|
)
|
|
else:
|
|
cursor2 = self._conn.execute(
|
|
"""SELECT m.tool_calls
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ?
|
|
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
|
|
(cutoff,),
|
|
)
|
|
|
|
tool_calls_counts = Counter()
|
|
for row in cursor2.fetchall():
|
|
try:
|
|
calls = row["tool_calls"]
|
|
if isinstance(calls, str):
|
|
calls = json.loads(calls)
|
|
if isinstance(calls, list):
|
|
for call in calls:
|
|
func = call.get("function", {}) if isinstance(call, dict) else {}
|
|
name = func.get("name")
|
|
if name:
|
|
tool_calls_counts[name] += 1
|
|
except (json.JSONDecodeError, TypeError, AttributeError):
|
|
continue
|
|
|
|
# Merge: prefer tool_name source, supplement with tool_calls source
|
|
# for tools not already counted
|
|
if not tool_counts and tool_calls_counts:
|
|
# No tool_name data at all — use tool_calls exclusively
|
|
tool_counts = tool_calls_counts
|
|
elif tool_counts and tool_calls_counts:
|
|
# Both sources have data — use whichever has the higher count per tool
|
|
# (they may overlap, so take the max to avoid double-counting)
|
|
all_tools = set(tool_counts) | set(tool_calls_counts)
|
|
merged = Counter()
|
|
for tool in all_tools:
|
|
merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0))
|
|
tool_counts = merged
|
|
|
|
# Convert to the expected format
|
|
return [
|
|
{"tool_name": name, "count": count}
|
|
for name, count in tool_counts.most_common()
|
|
]
|
|
|
|
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
|
|
"""Get aggregate message statistics."""
|
|
if source:
|
|
cursor = self._conn.execute(
|
|
"""SELECT
|
|
COUNT(*) as total_messages,
|
|
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
|
|
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
|
|
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ? AND s.source = ?""",
|
|
(cutoff, source),
|
|
)
|
|
else:
|
|
cursor = self._conn.execute(
|
|
"""SELECT
|
|
COUNT(*) as total_messages,
|
|
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
|
|
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
|
|
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
|
|
FROM messages m
|
|
JOIN sessions s ON s.id = m.session_id
|
|
WHERE s.started_at >= ?""",
|
|
(cutoff,),
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else {
|
|
"total_messages": 0, "user_messages": 0,
|
|
"assistant_messages": 0, "tool_messages": 0,
|
|
}
|
|
|
|
# =========================================================================
|
|
# Computation
|
|
# =========================================================================
|
|
|
|
def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
|
|
"""Compute high-level overview statistics."""
|
|
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
|
|
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
|
|
total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions)
|
|
total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions)
|
|
total_tokens = total_input + total_output + total_cache_read + total_cache_write
|
|
total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
|
|
total_messages = sum(s.get("message_count") or 0 for s in sessions)
|
|
|
|
# Cost estimation (weighted by model)
|
|
total_cost = 0.0
|
|
actual_cost = 0.0
|
|
models_with_pricing = set()
|
|
models_without_pricing = set()
|
|
unknown_cost_sessions = 0
|
|
included_cost_sessions = 0
|
|
for s in sessions:
|
|
model = s.get("model") or ""
|
|
estimated, status = _estimate_cost(s)
|
|
total_cost += estimated
|
|
actual_cost += s.get("actual_cost_usd") or 0.0
|
|
display = model.split("/")[-1] if "/" in model else (model or "unknown")
|
|
if status == "included":
|
|
included_cost_sessions += 1
|
|
elif status == "unknown":
|
|
unknown_cost_sessions += 1
|
|
if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")):
|
|
models_with_pricing.add(display)
|
|
else:
|
|
models_without_pricing.add(display)
|
|
|
|
# Session duration stats (guard against negative durations from clock drift)
|
|
durations = []
|
|
for s in sessions:
|
|
start = s.get("started_at")
|
|
end = s.get("ended_at")
|
|
if start and end and end > start:
|
|
durations.append(end - start)
|
|
|
|
total_hours = sum(durations) / 3600 if durations else 0
|
|
avg_duration = sum(durations) / len(durations) if durations else 0
|
|
|
|
# Earliest and latest session
|
|
started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")]
|
|
date_range_start = min(started_timestamps) if started_timestamps else None
|
|
date_range_end = max(started_timestamps) if started_timestamps else None
|
|
|
|
return {
|
|
"total_sessions": len(sessions),
|
|
"total_messages": total_messages,
|
|
"total_tool_calls": total_tool_calls,
|
|
"total_input_tokens": total_input,
|
|
"total_output_tokens": total_output,
|
|
"total_cache_read_tokens": total_cache_read,
|
|
"total_cache_write_tokens": total_cache_write,
|
|
"total_tokens": total_tokens,
|
|
"estimated_cost": total_cost,
|
|
"actual_cost": actual_cost,
|
|
"total_hours": total_hours,
|
|
"avg_session_duration": avg_duration,
|
|
"avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
|
|
"avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0,
|
|
"user_messages": message_stats.get("user_messages") or 0,
|
|
"assistant_messages": message_stats.get("assistant_messages") or 0,
|
|
"tool_messages": message_stats.get("tool_messages") or 0,
|
|
"date_range_start": date_range_start,
|
|
"date_range_end": date_range_end,
|
|
"models_with_pricing": sorted(models_with_pricing),
|
|
"models_without_pricing": sorted(models_without_pricing),
|
|
"unknown_cost_sessions": unknown_cost_sessions,
|
|
"included_cost_sessions": included_cost_sessions,
|
|
}
|
|
|
|
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
|
|
"""Break down usage by model."""
|
|
model_data = defaultdict(lambda: {
|
|
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
|
|
"cache_read_tokens": 0, "cache_write_tokens": 0,
|
|
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
|
|
})
|
|
|
|
for s in sessions:
|
|
model = s.get("model") or "unknown"
|
|
# Normalize: strip provider prefix for display
|
|
display_model = model.split("/")[-1] if "/" in model else model
|
|
d = model_data[display_model]
|
|
d["sessions"] += 1
|
|
inp = s.get("input_tokens") or 0
|
|
out = s.get("output_tokens") or 0
|
|
cache_read = s.get("cache_read_tokens") or 0
|
|
cache_write = s.get("cache_write_tokens") or 0
|
|
d["input_tokens"] += inp
|
|
d["output_tokens"] += out
|
|
d["cache_read_tokens"] += cache_read
|
|
d["cache_write_tokens"] += cache_write
|
|
d["total_tokens"] += inp + out + cache_read + cache_write
|
|
d["tool_calls"] += s.get("tool_call_count") or 0
|
|
estimate, status = _estimate_cost(s)
|
|
d["cost"] += estimate
|
|
d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url"))
|
|
d["cost_status"] = status
|
|
|
|
result = [
|
|
{"model": model, **data}
|
|
for model, data in model_data.items()
|
|
]
|
|
# Sort by tokens first, fall back to session count when tokens are 0
|
|
result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True)
|
|
return result
|
|
|
|
def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
|
|
"""Break down usage by platform/source."""
|
|
platform_data = defaultdict(lambda: {
|
|
"sessions": 0, "messages": 0, "input_tokens": 0,
|
|
"output_tokens": 0, "cache_read_tokens": 0,
|
|
"cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0,
|
|
})
|
|
|
|
for s in sessions:
|
|
source = s.get("source") or "unknown"
|
|
d = platform_data[source]
|
|
d["sessions"] += 1
|
|
d["messages"] += s.get("message_count") or 0
|
|
inp = s.get("input_tokens") or 0
|
|
out = s.get("output_tokens") or 0
|
|
cache_read = s.get("cache_read_tokens") or 0
|
|
cache_write = s.get("cache_write_tokens") or 0
|
|
d["input_tokens"] += inp
|
|
d["output_tokens"] += out
|
|
d["cache_read_tokens"] += cache_read
|
|
d["cache_write_tokens"] += cache_write
|
|
d["total_tokens"] += inp + out + cache_read + cache_write
|
|
d["tool_calls"] += s.get("tool_call_count") or 0
|
|
|
|
result = [
|
|
{"platform": platform, **data}
|
|
for platform, data in platform_data.items()
|
|
]
|
|
result.sort(key=lambda x: x["sessions"], reverse=True)
|
|
return result
|
|
|
|
def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
|
|
"""Process tool usage data into a ranked list with percentages."""
|
|
total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
|
|
result = []
|
|
for t in tool_usage:
|
|
pct = (t["count"] / total_calls * 100) if total_calls else 0
|
|
result.append({
|
|
"tool": t["tool_name"],
|
|
"count": t["count"],
|
|
"percentage": pct,
|
|
})
|
|
return result
|
|
|
|
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
|
|
"""Analyze activity patterns by day of week and hour."""
|
|
day_counts = Counter() # 0=Monday ... 6=Sunday
|
|
hour_counts = Counter()
|
|
daily_counts = Counter() # date string -> count
|
|
|
|
for s in sessions:
|
|
ts = s.get("started_at")
|
|
if not ts:
|
|
continue
|
|
dt = datetime.fromtimestamp(ts)
|
|
day_counts[dt.weekday()] += 1
|
|
hour_counts[dt.hour] += 1
|
|
daily_counts[dt.strftime("%Y-%m-%d")] += 1
|
|
|
|
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
|
|
day_breakdown = [
|
|
{"day": day_names[i], "count": day_counts.get(i, 0)}
|
|
for i in range(7)
|
|
]
|
|
|
|
hour_breakdown = [
|
|
{"hour": i, "count": hour_counts.get(i, 0)}
|
|
for i in range(24)
|
|
]
|
|
|
|
# Busiest day and hour
|
|
busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
|
|
busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None
|
|
|
|
# Active days (days with at least one session)
|
|
active_days = len(daily_counts)
|
|
|
|
# Streak calculation
|
|
if daily_counts:
|
|
all_dates = sorted(daily_counts.keys())
|
|
current_streak = 1
|
|
max_streak = 1
|
|
for i in range(1, len(all_dates)):
|
|
d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d")
|
|
d2 = datetime.strptime(all_dates[i], "%Y-%m-%d")
|
|
if (d2 - d1).days == 1:
|
|
current_streak += 1
|
|
max_streak = max(max_streak, current_streak)
|
|
else:
|
|
current_streak = 1
|
|
else:
|
|
max_streak = 0
|
|
|
|
return {
|
|
"by_day": day_breakdown,
|
|
"by_hour": hour_breakdown,
|
|
"busiest_day": busiest_day,
|
|
"busiest_hour": busiest_hour,
|
|
"active_days": active_days,
|
|
"max_streak": max_streak,
|
|
}
|
|
|
|
def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
|
|
"""Find notable sessions (longest, most messages, most tokens)."""
|
|
top = []
|
|
|
|
# Longest by duration
|
|
sessions_with_duration = [
|
|
s for s in sessions
|
|
if s.get("started_at") and s.get("ended_at")
|
|
]
|
|
if sessions_with_duration:
|
|
longest = max(
|
|
sessions_with_duration,
|
|
key=lambda s: (s["ended_at"] - s["started_at"]),
|
|
)
|
|
dur = longest["ended_at"] - longest["started_at"]
|
|
top.append({
|
|
"label": "Longest session",
|
|
"session_id": longest["id"][:16],
|
|
"value": _format_duration(dur),
|
|
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
|
|
})
|
|
|
|
# Most messages
|
|
most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
|
|
if (most_msgs.get("message_count") or 0) > 0:
|
|
top.append({
|
|
"label": "Most messages",
|
|
"session_id": most_msgs["id"][:16],
|
|
"value": f"{most_msgs['message_count']} msgs",
|
|
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
|
|
})
|
|
|
|
# Most tokens
|
|
most_tokens = max(
|
|
sessions,
|
|
key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0),
|
|
)
|
|
token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
|
|
if token_total > 0:
|
|
top.append({
|
|
"label": "Most tokens",
|
|
"session_id": most_tokens["id"][:16],
|
|
"value": f"{token_total:,} tokens",
|
|
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
|
|
})
|
|
|
|
# Most tool calls
|
|
most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
|
|
if (most_tools.get("tool_call_count") or 0) > 0:
|
|
top.append({
|
|
"label": "Most tool calls",
|
|
"session_id": most_tools["id"][:16],
|
|
"value": f"{most_tools['tool_call_count']} calls",
|
|
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
|
|
})
|
|
|
|
return top
|
|
|
|
# =========================================================================
|
|
# Formatting
|
|
# =========================================================================
|
|
|
|
def format_terminal(self, report: Dict) -> str:
|
|
"""Format the insights report for terminal display (CLI)."""
|
|
if report.get("empty"):
|
|
days = report.get("days", 30)
|
|
src = f" (source: {report['source_filter']})" if report.get("source_filter") else ""
|
|
return f" No sessions found in the last {days} days{src}."
|
|
|
|
lines = []
|
|
o = report["overview"]
|
|
days = report["days"]
|
|
src_filter = report.get("source_filter")
|
|
|
|
# Header
|
|
lines.append("")
|
|
lines.append(" ╔══════════════════════════════════════════════════════════╗")
|
|
lines.append(" ║ 📊 Hermes Insights ║")
|
|
period_label = f"Last {days} days"
|
|
if src_filter:
|
|
period_label += f" ({src_filter})"
|
|
padding = 58 - len(period_label) - 2
|
|
left_pad = padding // 2
|
|
right_pad = padding - left_pad
|
|
lines.append(f" ║{' ' * left_pad} {period_label} {' ' * right_pad}║")
|
|
lines.append(" ╚══════════════════════════════════════════════════════════╝")
|
|
lines.append("")
|
|
|
|
# Date range
|
|
if o.get("date_range_start") and o.get("date_range_end"):
|
|
start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y")
|
|
end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y")
|
|
lines.append(f" Period: {start_str} — {end_str}")
|
|
lines.append("")
|
|
|
|
# Overview
|
|
lines.append(" 📋 Overview")
|
|
lines.append(" " + "─" * 56)
|
|
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
|
|
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
|
|
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
|
|
cost_str = f"${o['estimated_cost']:.2f}"
|
|
if o.get("models_without_pricing"):
|
|
cost_str += " *"
|
|
lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}")
|
|
if o["total_hours"] > 0:
|
|
lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}")
|
|
lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}")
|
|
lines.append("")
|
|
|
|
# Model breakdown
|
|
if report["models"]:
|
|
lines.append(" 🤖 Models Used")
|
|
lines.append(" " + "─" * 56)
|
|
lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}")
|
|
for m in report["models"]:
|
|
model_name = m["model"][:28]
|
|
if m.get("has_pricing"):
|
|
cost_cell = f"${m['cost']:>6.2f}"
|
|
else:
|
|
cost_cell = " N/A"
|
|
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}")
|
|
if o.get("models_without_pricing"):
|
|
lines.append(" * Cost N/A for custom/self-hosted models")
|
|
lines.append("")
|
|
|
|
# Platform breakdown
|
|
if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"):
|
|
lines.append(" 📱 Platforms")
|
|
lines.append(" " + "─" * 56)
|
|
lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}")
|
|
for p in report["platforms"]:
|
|
lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}")
|
|
lines.append("")
|
|
|
|
# Tool usage
|
|
if report["tools"]:
|
|
lines.append(" 🔧 Top Tools")
|
|
lines.append(" " + "─" * 56)
|
|
lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}")
|
|
for t in report["tools"][:15]: # Top 15
|
|
lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%")
|
|
if len(report["tools"]) > 15:
|
|
lines.append(f" ... and {len(report['tools']) - 15} more tools")
|
|
lines.append("")
|
|
|
|
# Activity patterns
|
|
act = report.get("activity", {})
|
|
if act.get("by_day"):
|
|
lines.append(" 📅 Activity Patterns")
|
|
lines.append(" " + "─" * 56)
|
|
|
|
# Day of week chart
|
|
day_values = [d["count"] for d in act["by_day"]]
|
|
bars = _bar_chart(day_values, max_width=15)
|
|
for i, d in enumerate(act["by_day"]):
|
|
bar = bars[i]
|
|
lines.append(f" {d['day']} {bar:<15} {d['count']}")
|
|
|
|
lines.append("")
|
|
|
|
# Peak hours (show top 5 busiest hours)
|
|
busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True)
|
|
busy_hours = [h for h in busy_hours if h["count"] > 0][:5]
|
|
if busy_hours:
|
|
hour_strs = []
|
|
for h in busy_hours:
|
|
hr = h["hour"]
|
|
ampm = "AM" if hr < 12 else "PM"
|
|
display_hr = hr % 12 or 12
|
|
hour_strs.append(f"{display_hr}{ampm} ({h['count']})")
|
|
lines.append(f" Peak hours: {', '.join(hour_strs)}")
|
|
|
|
if act.get("active_days"):
|
|
lines.append(f" Active days: {act['active_days']}")
|
|
if act.get("max_streak") and act["max_streak"] > 1:
|
|
lines.append(f" Best streak: {act['max_streak']} consecutive days")
|
|
lines.append("")
|
|
|
|
# Notable sessions
|
|
if report.get("top_sessions"):
|
|
lines.append(" 🏆 Notable Sessions")
|
|
lines.append(" " + "─" * 56)
|
|
for ts in report["top_sessions"]:
|
|
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def format_gateway(self, report: Dict) -> str:
|
|
"""Format the insights report for gateway/messaging (shorter)."""
|
|
if report.get("empty"):
|
|
days = report.get("days", 30)
|
|
return f"No sessions found in the last {days} days."
|
|
|
|
lines = []
|
|
o = report["overview"]
|
|
days = report["days"]
|
|
|
|
lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
|
|
|
|
# Overview
|
|
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
|
|
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
|
|
cost_note = ""
|
|
if o.get("models_without_pricing"):
|
|
cost_note = " _(excludes custom/self-hosted models)_"
|
|
lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}")
|
|
if o["total_hours"] > 0:
|
|
lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
|
|
lines.append("")
|
|
|
|
# Models (top 5)
|
|
if report["models"]:
|
|
lines.append("**🤖 Models:**")
|
|
for m in report["models"][:5]:
|
|
cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A"
|
|
lines.append(f" {m['model'][:25]} — {m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}")
|
|
lines.append("")
|
|
|
|
# Platforms (if multi-platform)
|
|
if len(report["platforms"]) > 1:
|
|
lines.append("**📱 Platforms:**")
|
|
for p in report["platforms"]:
|
|
lines.append(f" {p['platform']} — {p['sessions']} sessions, {p['messages']:,} msgs")
|
|
lines.append("")
|
|
|
|
# Tools (top 8)
|
|
if report["tools"]:
|
|
lines.append("**🔧 Top Tools:**")
|
|
for t in report["tools"][:8]:
|
|
lines.append(f" {t['tool']} — {t['count']:,} calls ({t['percentage']:.1f}%)")
|
|
lines.append("")
|
|
|
|
# Activity summary
|
|
act = report.get("activity", {})
|
|
if act.get("busiest_day") and act.get("busiest_hour"):
|
|
hr = act["busiest_hour"]["hour"]
|
|
ampm = "AM" if hr < 12 else "PM"
|
|
display_hr = hr % 12 or 12
|
|
lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
|
|
if act.get("active_days"):
|
|
lines.append(f"**Active days:** {act['active_days']}", )
|
|
if act.get("max_streak", 0) > 1:
|
|
lines.append(f"**Best streak:** {act['max_streak']} consecutive days")
|
|
|
|
return "\n".join(lines)
|