Files
hermes-agent/agent/insights.py
Teknium cbf195e806 chore: fix 154 f-strings, simplify getattr/URL patterns, remove dead code (#3119)
Three categories of cleanup, all zero-behavioral-change:

1. F-strings without placeholders (154 fixes across 29 files)
   - Converted f'...' to '...' where no {expression} was present
   - Heaviest files: run_agent.py (24), cli.py (20), honcho_integration/cli.py (34)

2. Simplify defensive patterns in run_agent.py
   - Added explicit self._is_anthropic_oauth = False in __init__ (before
     the api_mode branch that conditionally sets it)
   - Replaced 7x getattr(self, '_is_anthropic_oauth', False) with direct
     self._is_anthropic_oauth (attribute always initialized now)
   - Added _is_openrouter_url() and _is_anthropic_url() helper methods
   - Replaced 3 inline 'openrouter' in self._base_url_lower checks

3. Remove dead code in small files
   - hermes_cli/claw.py: removed unused 'total' computation
   - tools/fuzzy_match.py: removed unused strip_indent() function and
     pattern_stripped variable

Full test suite: 6184 passed, 0 failures
E2E PTY: banner clean, tool calls work, zero garbled ANSI
2026-03-25 19:47:58 -07:00

793 lines
33 KiB
Python

"""
Session Insights Engine for Hermes Agent.
Analyzes historical session data from the SQLite state database to produce
comprehensive usage insights — token consumption, cost estimates, tool usage
patterns, activity trends, model/platform breakdowns, and session metrics.
Inspired by Claude Code's /insights command, adapted for Hermes Agent's
multi-platform architecture with additional cost estimation and platform
breakdown capabilities.
Usage:
from agent.insights import InsightsEngine
engine = InsightsEngine(db)
report = engine.generate(days=30)
print(engine.format_terminal(report))
"""
import json
import time
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List
from agent.usage_pricing import (
CanonicalUsage,
DEFAULT_PRICING,
estimate_usage_cost,
format_duration_compact,
get_pricing,
has_known_pricing,
)
_DEFAULT_PRICING = DEFAULT_PRICING
def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool:
"""Check if a model has known pricing (vs unknown/custom endpoint)."""
return has_known_pricing(model_name, provider=provider, base_url=base_url)
def _get_pricing(model_name: str) -> Dict[str, float]:
"""Look up pricing for a model. Uses fuzzy matching on model name.
Returns _DEFAULT_PRICING (zero cost) for unknown/custom models —
we can't assume costs for self-hosted endpoints, local inference, etc.
"""
return get_pricing(model_name)
def _estimate_cost(
session_or_model: Dict[str, Any] | str,
input_tokens: int = 0,
output_tokens: int = 0,
*,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
provider: str = None,
base_url: str = None,
) -> tuple[float, str]:
"""Estimate the USD cost for a session row or a model/token tuple."""
if isinstance(session_or_model, dict):
session = session_or_model
model = session.get("model") or ""
usage = CanonicalUsage(
input_tokens=session.get("input_tokens") or 0,
output_tokens=session.get("output_tokens") or 0,
cache_read_tokens=session.get("cache_read_tokens") or 0,
cache_write_tokens=session.get("cache_write_tokens") or 0,
)
provider = session.get("billing_provider")
base_url = session.get("billing_base_url")
else:
model = session_or_model or ""
usage = CanonicalUsage(
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
)
result = estimate_usage_cost(
model,
usage,
provider=provider,
base_url=base_url,
)
return float(result.amount_usd or 0.0), result.status
def _format_duration(seconds: float) -> str:
"""Format seconds into a human-readable duration string."""
return format_duration_compact(seconds)
def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
"""Create simple horizontal bar chart strings from values."""
peak = max(values) if values else 1
if peak == 0:
return ["" for _ in values]
return ["" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
class InsightsEngine:
"""
Analyzes session history and produces usage insights.
Works directly with a SessionDB instance (or raw sqlite3 connection)
to query session and message data.
"""
def __init__(self, db):
"""
Initialize with a SessionDB instance.
Args:
db: A SessionDB instance (from hermes_state.py)
"""
self.db = db
self._conn = db._conn
def generate(self, days: int = 30, source: str = None) -> Dict[str, Any]:
"""
Generate a complete insights report.
Args:
days: Number of days to look back (default: 30)
source: Optional filter by source platform
Returns:
Dict with all computed insights
"""
cutoff = time.time() - (days * 86400)
# Gather raw data
sessions = self._get_sessions(cutoff, source)
tool_usage = self._get_tool_usage(cutoff, source)
message_stats = self._get_message_stats(cutoff, source)
if not sessions:
return {
"days": days,
"source_filter": source,
"empty": True,
"overview": {},
"models": [],
"platforms": [],
"tools": [],
"activity": {},
"top_sessions": [],
}
# Compute insights
overview = self._compute_overview(sessions, message_stats)
models = self._compute_model_breakdown(sessions)
platforms = self._compute_platform_breakdown(sessions)
tools = self._compute_tool_breakdown(tool_usage)
activity = self._compute_activity_patterns(sessions)
top_sessions = self._compute_top_sessions(sessions)
return {
"days": days,
"source_filter": source,
"empty": False,
"generated_at": time.time(),
"overview": overview,
"models": models,
"platforms": platforms,
"tools": tools,
"activity": activity,
"top_sessions": top_sessions,
}
# =========================================================================
# Data gathering (SQL queries)
# =========================================================================
# Columns we actually need (skip system_prompt, model_config blobs)
_SESSION_COLS = ("id, source, model, started_at, ended_at, "
"message_count, tool_call_count, input_tokens, output_tokens, "
"cache_read_tokens, cache_write_tokens, billing_provider, "
"billing_base_url, billing_mode, estimated_cost_usd, "
"actual_cost_usd, cost_status, cost_source")
# Pre-computed query strings — f-string evaluated once at class definition,
# not at runtime, so no user-controlled value can alter the query structure.
_GET_SESSIONS_WITH_SOURCE = (
f"SELECT {_SESSION_COLS} FROM sessions"
" WHERE started_at >= ? AND source = ?"
" ORDER BY started_at DESC"
)
_GET_SESSIONS_ALL = (
f"SELECT {_SESSION_COLS} FROM sessions"
" WHERE started_at >= ?"
" ORDER BY started_at DESC"
)
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
"""Fetch sessions within the time window."""
if source:
cursor = self._conn.execute(self._GET_SESSIONS_WITH_SOURCE, (cutoff, source))
else:
cursor = self._conn.execute(self._GET_SESSIONS_ALL, (cutoff,))
return [dict(row) for row in cursor.fetchall()]
def _get_tool_usage(self, cutoff: float, source: str = None) -> List[Dict]:
"""Get tool call counts from messages.
Uses two sources:
1. tool_name column on 'tool' role messages (set by gateway)
2. tool_calls JSON on 'assistant' role messages (covers CLI where
tool_name is not populated on tool responses)
"""
tool_counts = Counter()
# Source 1: explicit tool_name on tool response messages
if source:
cursor = self._conn.execute(
"""SELECT m.tool_name, COUNT(*) as count
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?
AND m.role = 'tool' AND m.tool_name IS NOT NULL
GROUP BY m.tool_name
ORDER BY count DESC""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"""SELECT m.tool_name, COUNT(*) as count
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?
AND m.role = 'tool' AND m.tool_name IS NOT NULL
GROUP BY m.tool_name
ORDER BY count DESC""",
(cutoff,),
)
for row in cursor.fetchall():
tool_counts[row["tool_name"]] += row["count"]
# Source 2: extract from tool_calls JSON on assistant messages
# (covers CLI sessions where tool_name is NULL on tool responses)
if source:
cursor2 = self._conn.execute(
"""SELECT m.tool_calls
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff, source),
)
else:
cursor2 = self._conn.execute(
"""SELECT m.tool_calls
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff,),
)
tool_calls_counts = Counter()
for row in cursor2.fetchall():
try:
calls = row["tool_calls"]
if isinstance(calls, str):
calls = json.loads(calls)
if isinstance(calls, list):
for call in calls:
func = call.get("function", {}) if isinstance(call, dict) else {}
name = func.get("name")
if name:
tool_calls_counts[name] += 1
except (json.JSONDecodeError, TypeError, AttributeError):
continue
# Merge: prefer tool_name source, supplement with tool_calls source
# for tools not already counted
if not tool_counts and tool_calls_counts:
# No tool_name data at all — use tool_calls exclusively
tool_counts = tool_calls_counts
elif tool_counts and tool_calls_counts:
# Both sources have data — use whichever has the higher count per tool
# (they may overlap, so take the max to avoid double-counting)
all_tools = set(tool_counts) | set(tool_calls_counts)
merged = Counter()
for tool in all_tools:
merged[tool] = max(tool_counts.get(tool, 0), tool_calls_counts.get(tool, 0))
tool_counts = merged
# Convert to the expected format
return [
{"tool_name": name, "count": count}
for name, count in tool_counts.most_common()
]
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
"""Get aggregate message statistics."""
if source:
cursor = self._conn.execute(
"""SELECT
COUNT(*) as total_messages,
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"""SELECT
COUNT(*) as total_messages,
SUM(CASE WHEN m.role = 'user' THEN 1 ELSE 0 END) as user_messages,
SUM(CASE WHEN m.role = 'assistant' THEN 1 ELSE 0 END) as assistant_messages,
SUM(CASE WHEN m.role = 'tool' THEN 1 ELSE 0 END) as tool_messages
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?""",
(cutoff,),
)
row = cursor.fetchone()
return dict(row) if row else {
"total_messages": 0, "user_messages": 0,
"assistant_messages": 0, "tool_messages": 0,
}
# =========================================================================
# Computation
# =========================================================================
def _compute_overview(self, sessions: List[Dict], message_stats: Dict) -> Dict:
"""Compute high-level overview statistics."""
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions)
total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions)
total_tokens = total_input + total_output + total_cache_read + total_cache_write
total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
total_messages = sum(s.get("message_count") or 0 for s in sessions)
# Cost estimation (weighted by model)
total_cost = 0.0
actual_cost = 0.0
models_with_pricing = set()
models_without_pricing = set()
unknown_cost_sessions = 0
included_cost_sessions = 0
for s in sessions:
model = s.get("model") or ""
estimated, status = _estimate_cost(s)
total_cost += estimated
actual_cost += s.get("actual_cost_usd") or 0.0
display = model.split("/")[-1] if "/" in model else (model or "unknown")
if status == "included":
included_cost_sessions += 1
elif status == "unknown":
unknown_cost_sessions += 1
if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")):
models_with_pricing.add(display)
else:
models_without_pricing.add(display)
# Session duration stats (guard against negative durations from clock drift)
durations = []
for s in sessions:
start = s.get("started_at")
end = s.get("ended_at")
if start and end and end > start:
durations.append(end - start)
total_hours = sum(durations) / 3600 if durations else 0
avg_duration = sum(durations) / len(durations) if durations else 0
# Earliest and latest session
started_timestamps = [s["started_at"] for s in sessions if s.get("started_at")]
date_range_start = min(started_timestamps) if started_timestamps else None
date_range_end = max(started_timestamps) if started_timestamps else None
return {
"total_sessions": len(sessions),
"total_messages": total_messages,
"total_tool_calls": total_tool_calls,
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_cache_read_tokens": total_cache_read,
"total_cache_write_tokens": total_cache_write,
"total_tokens": total_tokens,
"estimated_cost": total_cost,
"actual_cost": actual_cost,
"total_hours": total_hours,
"avg_session_duration": avg_duration,
"avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
"avg_tokens_per_session": total_tokens / len(sessions) if sessions else 0,
"user_messages": message_stats.get("user_messages") or 0,
"assistant_messages": message_stats.get("assistant_messages") or 0,
"tool_messages": message_stats.get("tool_messages") or 0,
"date_range_start": date_range_start,
"date_range_end": date_range_end,
"models_with_pricing": sorted(models_with_pricing),
"models_without_pricing": sorted(models_without_pricing),
"unknown_cost_sessions": unknown_cost_sessions,
"included_cost_sessions": included_cost_sessions,
}
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
"""Break down usage by model."""
model_data = defaultdict(lambda: {
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
"cache_read_tokens": 0, "cache_write_tokens": 0,
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
})
for s in sessions:
model = s.get("model") or "unknown"
# Normalize: strip provider prefix for display
display_model = model.split("/")[-1] if "/" in model else model
d = model_data[display_model]
d["sessions"] += 1
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
cache_read = s.get("cache_read_tokens") or 0
cache_write = s.get("cache_write_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["cache_read_tokens"] += cache_read
d["cache_write_tokens"] += cache_write
d["total_tokens"] += inp + out + cache_read + cache_write
d["tool_calls"] += s.get("tool_call_count") or 0
estimate, status = _estimate_cost(s)
d["cost"] += estimate
d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url"))
d["cost_status"] = status
result = [
{"model": model, **data}
for model, data in model_data.items()
]
# Sort by tokens first, fall back to session count when tokens are 0
result.sort(key=lambda x: (x["total_tokens"], x["sessions"]), reverse=True)
return result
def _compute_platform_breakdown(self, sessions: List[Dict]) -> List[Dict]:
"""Break down usage by platform/source."""
platform_data = defaultdict(lambda: {
"sessions": 0, "messages": 0, "input_tokens": 0,
"output_tokens": 0, "cache_read_tokens": 0,
"cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0,
})
for s in sessions:
source = s.get("source") or "unknown"
d = platform_data[source]
d["sessions"] += 1
d["messages"] += s.get("message_count") or 0
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
cache_read = s.get("cache_read_tokens") or 0
cache_write = s.get("cache_write_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["cache_read_tokens"] += cache_read
d["cache_write_tokens"] += cache_write
d["total_tokens"] += inp + out + cache_read + cache_write
d["tool_calls"] += s.get("tool_call_count") or 0
result = [
{"platform": platform, **data}
for platform, data in platform_data.items()
]
result.sort(key=lambda x: x["sessions"], reverse=True)
return result
def _compute_tool_breakdown(self, tool_usage: List[Dict]) -> List[Dict]:
"""Process tool usage data into a ranked list with percentages."""
total_calls = sum(t["count"] for t in tool_usage) if tool_usage else 0
result = []
for t in tool_usage:
pct = (t["count"] / total_calls * 100) if total_calls else 0
result.append({
"tool": t["tool_name"],
"count": t["count"],
"percentage": pct,
})
return result
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
"""Analyze activity patterns by day of week and hour."""
day_counts = Counter() # 0=Monday ... 6=Sunday
hour_counts = Counter()
daily_counts = Counter() # date string -> count
for s in sessions:
ts = s.get("started_at")
if not ts:
continue
dt = datetime.fromtimestamp(ts)
day_counts[dt.weekday()] += 1
hour_counts[dt.hour] += 1
daily_counts[dt.strftime("%Y-%m-%d")] += 1
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
day_breakdown = [
{"day": day_names[i], "count": day_counts.get(i, 0)}
for i in range(7)
]
hour_breakdown = [
{"hour": i, "count": hour_counts.get(i, 0)}
for i in range(24)
]
# Busiest day and hour
busiest_day = max(day_breakdown, key=lambda x: x["count"]) if day_breakdown else None
busiest_hour = max(hour_breakdown, key=lambda x: x["count"]) if hour_breakdown else None
# Active days (days with at least one session)
active_days = len(daily_counts)
# Streak calculation
if daily_counts:
all_dates = sorted(daily_counts.keys())
current_streak = 1
max_streak = 1
for i in range(1, len(all_dates)):
d1 = datetime.strptime(all_dates[i - 1], "%Y-%m-%d")
d2 = datetime.strptime(all_dates[i], "%Y-%m-%d")
if (d2 - d1).days == 1:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 1
else:
max_streak = 0
return {
"by_day": day_breakdown,
"by_hour": hour_breakdown,
"busiest_day": busiest_day,
"busiest_hour": busiest_hour,
"active_days": active_days,
"max_streak": max_streak,
}
def _compute_top_sessions(self, sessions: List[Dict]) -> List[Dict]:
"""Find notable sessions (longest, most messages, most tokens)."""
top = []
# Longest by duration
sessions_with_duration = [
s for s in sessions
if s.get("started_at") and s.get("ended_at")
]
if sessions_with_duration:
longest = max(
sessions_with_duration,
key=lambda s: (s["ended_at"] - s["started_at"]),
)
dur = longest["ended_at"] - longest["started_at"]
top.append({
"label": "Longest session",
"session_id": longest["id"][:16],
"value": _format_duration(dur),
"date": datetime.fromtimestamp(longest["started_at"]).strftime("%b %d"),
})
# Most messages
most_msgs = max(sessions, key=lambda s: s.get("message_count") or 0)
if (most_msgs.get("message_count") or 0) > 0:
top.append({
"label": "Most messages",
"session_id": most_msgs["id"][:16],
"value": f"{most_msgs['message_count']} msgs",
"date": datetime.fromtimestamp(most_msgs["started_at"]).strftime("%b %d") if most_msgs.get("started_at") else "?",
})
# Most tokens
most_tokens = max(
sessions,
key=lambda s: (s.get("input_tokens") or 0) + (s.get("output_tokens") or 0),
)
token_total = (most_tokens.get("input_tokens") or 0) + (most_tokens.get("output_tokens") or 0)
if token_total > 0:
top.append({
"label": "Most tokens",
"session_id": most_tokens["id"][:16],
"value": f"{token_total:,} tokens",
"date": datetime.fromtimestamp(most_tokens["started_at"]).strftime("%b %d") if most_tokens.get("started_at") else "?",
})
# Most tool calls
most_tools = max(sessions, key=lambda s: s.get("tool_call_count") or 0)
if (most_tools.get("tool_call_count") or 0) > 0:
top.append({
"label": "Most tool calls",
"session_id": most_tools["id"][:16],
"value": f"{most_tools['tool_call_count']} calls",
"date": datetime.fromtimestamp(most_tools["started_at"]).strftime("%b %d") if most_tools.get("started_at") else "?",
})
return top
# =========================================================================
# Formatting
# =========================================================================
def format_terminal(self, report: Dict) -> str:
"""Format the insights report for terminal display (CLI)."""
if report.get("empty"):
days = report.get("days", 30)
src = f" (source: {report['source_filter']})" if report.get("source_filter") else ""
return f" No sessions found in the last {days} days{src}."
lines = []
o = report["overview"]
days = report["days"]
src_filter = report.get("source_filter")
# Header
lines.append("")
lines.append(" ╔══════════════════════════════════════════════════════════╗")
lines.append(" ║ 📊 Hermes Insights ║")
period_label = f"Last {days} days"
if src_filter:
period_label += f" ({src_filter})"
padding = 58 - len(period_label) - 2
left_pad = padding // 2
right_pad = padding - left_pad
lines.append(f"{' ' * left_pad} {period_label} {' ' * right_pad}")
lines.append(" ╚══════════════════════════════════════════════════════════╝")
lines.append("")
# Date range
if o.get("date_range_start") and o.get("date_range_end"):
start_str = datetime.fromtimestamp(o["date_range_start"]).strftime("%b %d, %Y")
end_str = datetime.fromtimestamp(o["date_range_end"]).strftime("%b %d, %Y")
lines.append(f" Period: {start_str}{end_str}")
lines.append("")
# Overview
lines.append(" 📋 Overview")
lines.append(" " + "" * 56)
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
cost_str = f"${o['estimated_cost']:.2f}"
if o.get("models_without_pricing"):
cost_str += " *"
lines.append(f" Total tokens: {o['total_tokens']:<12,} Est. cost: {cost_str}")
if o["total_hours"] > 0:
lines.append(f" Active time: ~{_format_duration(o['total_hours'] * 3600):<11} Avg session: ~{_format_duration(o['avg_session_duration'])}")
lines.append(f" Avg msgs/session: {o['avg_messages_per_session']:.1f}")
lines.append("")
# Model breakdown
if report["models"]:
lines.append(" 🤖 Models Used")
lines.append(" " + "" * 56)
lines.append(f" {'Model':<30} {'Sessions':>8} {'Tokens':>12} {'Cost':>8}")
for m in report["models"]:
model_name = m["model"][:28]
if m.get("has_pricing"):
cost_cell = f"${m['cost']:>6.2f}"
else:
cost_cell = " N/A"
lines.append(f" {model_name:<30} {m['sessions']:>8} {m['total_tokens']:>12,} {cost_cell}")
if o.get("models_without_pricing"):
lines.append(" * Cost N/A for custom/self-hosted models")
lines.append("")
# Platform breakdown
if len(report["platforms"]) > 1 or (report["platforms"] and report["platforms"][0]["platform"] != "cli"):
lines.append(" 📱 Platforms")
lines.append(" " + "" * 56)
lines.append(f" {'Platform':<14} {'Sessions':>8} {'Messages':>10} {'Tokens':>14}")
for p in report["platforms"]:
lines.append(f" {p['platform']:<14} {p['sessions']:>8} {p['messages']:>10,} {p['total_tokens']:>14,}")
lines.append("")
# Tool usage
if report["tools"]:
lines.append(" 🔧 Top Tools")
lines.append(" " + "" * 56)
lines.append(f" {'Tool':<28} {'Calls':>8} {'%':>8}")
for t in report["tools"][:15]: # Top 15
lines.append(f" {t['tool']:<28} {t['count']:>8,} {t['percentage']:>7.1f}%")
if len(report["tools"]) > 15:
lines.append(f" ... and {len(report['tools']) - 15} more tools")
lines.append("")
# Activity patterns
act = report.get("activity", {})
if act.get("by_day"):
lines.append(" 📅 Activity Patterns")
lines.append(" " + "" * 56)
# Day of week chart
day_values = [d["count"] for d in act["by_day"]]
bars = _bar_chart(day_values, max_width=15)
for i, d in enumerate(act["by_day"]):
bar = bars[i]
lines.append(f" {d['day']} {bar:<15} {d['count']}")
lines.append("")
# Peak hours (show top 5 busiest hours)
busy_hours = sorted(act["by_hour"], key=lambda x: x["count"], reverse=True)
busy_hours = [h for h in busy_hours if h["count"] > 0][:5]
if busy_hours:
hour_strs = []
for h in busy_hours:
hr = h["hour"]
ampm = "AM" if hr < 12 else "PM"
display_hr = hr % 12 or 12
hour_strs.append(f"{display_hr}{ampm} ({h['count']})")
lines.append(f" Peak hours: {', '.join(hour_strs)}")
if act.get("active_days"):
lines.append(f" Active days: {act['active_days']}")
if act.get("max_streak") and act["max_streak"] > 1:
lines.append(f" Best streak: {act['max_streak']} consecutive days")
lines.append("")
# Notable sessions
if report.get("top_sessions"):
lines.append(" 🏆 Notable Sessions")
lines.append(" " + "" * 56)
for ts in report["top_sessions"]:
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
lines.append("")
return "\n".join(lines)
def format_gateway(self, report: Dict) -> str:
"""Format the insights report for gateway/messaging (shorter)."""
if report.get("empty"):
days = report.get("days", 30)
return f"No sessions found in the last {days} days."
lines = []
o = report["overview"]
days = report["days"]
lines.append(f"📊 **Hermes Insights** — Last {days} days\n")
# Overview
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
cost_note = ""
if o.get("models_without_pricing"):
cost_note = " _(excludes custom/self-hosted models)_"
lines.append(f"**Est. cost:** ${o['estimated_cost']:.2f}{cost_note}")
if o["total_hours"] > 0:
lines.append(f"**Active time:** ~{_format_duration(o['total_hours'] * 3600)} | **Avg session:** ~{_format_duration(o['avg_session_duration'])}")
lines.append("")
# Models (top 5)
if report["models"]:
lines.append("**🤖 Models:**")
for m in report["models"][:5]:
cost_str = f"${m['cost']:.2f}" if m.get("has_pricing") else "N/A"
lines.append(f" {m['model'][:25]}{m['sessions']} sessions, {m['total_tokens']:,} tokens, {cost_str}")
lines.append("")
# Platforms (if multi-platform)
if len(report["platforms"]) > 1:
lines.append("**📱 Platforms:**")
for p in report["platforms"]:
lines.append(f" {p['platform']}{p['sessions']} sessions, {p['messages']:,} msgs")
lines.append("")
# Tools (top 8)
if report["tools"]:
lines.append("**🔧 Top Tools:**")
for t in report["tools"][:8]:
lines.append(f" {t['tool']}{t['count']:,} calls ({t['percentage']:.1f}%)")
lines.append("")
# Activity summary
act = report.get("activity", {})
if act.get("busiest_day") and act.get("busiest_hour"):
hr = act["busiest_hour"]["hour"]
ampm = "AM" if hr < 12 else "PM"
display_hr = hr % 12 or 12
lines.append(f"**📅 Busiest:** {act['busiest_day']['day']}s ({act['busiest_day']['count']} sessions), {display_hr}{ampm} ({act['busiest_hour']['count']} sessions)")
if act.get("active_days"):
lines.append(f"**Active days:** {act['active_days']}", )
if act.get("max_streak", 0) > 1:
lines.append(f"**Best streak:** {act['max_streak']} consecutive days")
return "\n".join(lines)