608 lines
23 KiB
Python
608 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Compounding Intelligence Metrics Engine.
|
|
|
|
Computes 7 metrics that prove whether the knowledge compounding loop is working:
|
|
1. Knowledge velocity — new facts per day
|
|
2. Knowledge coverage — % of domains with >10 facts
|
|
3. Hit rate — % of sessions referencing bootstrap knowledge
|
|
4. Error recurrence — same errors across sessions (should decrease)
|
|
5. Task completion — % of sessions ending successfully
|
|
6. First-try success — actions without backtracking
|
|
7. Knowledge age — staleness of facts
|
|
|
|
Usage:
|
|
python3 measurer.py # All metrics, all time
|
|
python3 measurer.py --since 2026-04-01 # Time range
|
|
python3 measurer.py --repo the-nexus # Per-repo metrics
|
|
python3 measurer.py --format json # JSON output (default)
|
|
python3 measurer.py --format markdown # Human-readable
|
|
python3 measurer.py --knowledge-dir ./knowledge # Custom knowledge path
|
|
python3 measurer.py --db ~/.hermes/state.db # Custom DB path
|
|
|
|
Data sources:
|
|
- knowledge/index.json — fact index
|
|
- knowledge/ — YAML fact files for coverage
|
|
- ~/.hermes/state.db — session/message metadata
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
# ─── Defaults ───────────────────────────────────────────────────────────────────
|
|
|
|
DEFAULT_KNOWLEDGE_DIR = Path(__file__).parent.parent / "knowledge"
|
|
DEFAULT_DB_PATH = Path.home() / ".hermes" / "state.db"
|
|
SEVEN_DAYS = timedelta(days=7)
|
|
|
|
|
|
# ─── Knowledge Store ────────────────────────────────────────────────────────────
|
|
|
|
def load_facts(knowledge_dir: Path) -> list[dict]:
|
|
"""Load all facts from index.json."""
|
|
index_path = knowledge_dir / "index.json"
|
|
if not index_path.exists():
|
|
return []
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
return data.get("facts", [])
|
|
|
|
|
|
def count_yaml_facts(knowledge_dir: Path) -> dict[str, int]:
|
|
"""Count facts per domain from YAML files (coverage source)."""
|
|
domain_counts: dict[str, int] = {}
|
|
# Walk repos/, global/, agents/ subdirs
|
|
for subdir in ["repos", "global", "agents"]:
|
|
dirpath = knowledge_dir / subdir
|
|
if not dirpath.exists():
|
|
continue
|
|
for yaml_file in dirpath.glob("*.yaml"):
|
|
# Count lines that start with "- id:" — each is a fact
|
|
count = 0
|
|
try:
|
|
content = yaml_file.read_text()
|
|
count = len(re.findall(r"^\s*-\s*id:", content, re.MULTILINE))
|
|
except Exception:
|
|
pass
|
|
domain = yaml_file.stem
|
|
domain_counts[domain] = domain_counts.get(domain, 0) + count
|
|
return domain_counts
|
|
|
|
|
|
# ─── Session Database ───────────────────────────────────────────────────────────
|
|
|
|
def open_db(db_path: Path) -> sqlite3.Connection:
|
|
"""Open session database."""
|
|
if not db_path.exists():
|
|
print(f"WARNING: Database not found at {db_path}", file=sys.stderr)
|
|
return None
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def query_sessions(conn: sqlite3.Connection, since: str = None, repo: str = None) -> list[dict]:
|
|
"""Query sessions with optional filters."""
|
|
if conn is None:
|
|
return []
|
|
|
|
query = """
|
|
SELECT id, started_at, ended_at, end_reason, message_count,
|
|
tool_call_count, model
|
|
FROM sessions
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
if since:
|
|
since_ts = datetime.fromisoformat(since).replace(tzinfo=timezone.utc).timestamp()
|
|
query += " AND started_at >= ?"
|
|
params.append(since_ts)
|
|
|
|
query += " ORDER BY started_at ASC"
|
|
|
|
cur = conn.execute(query, params)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def query_messages(conn: sqlite3.Connection, session_ids: list[str] = None,
|
|
since_ts: float = None) -> list[dict]:
|
|
"""Query messages with optional session filter."""
|
|
if conn is None:
|
|
return []
|
|
|
|
query = """
|
|
SELECT m.session_id, m.role, m.content, m.tool_name, m.timestamp
|
|
FROM messages m
|
|
WHERE 1=1
|
|
"""
|
|
params = []
|
|
|
|
if since_ts:
|
|
query += " AND m.timestamp >= ?"
|
|
params.append(since_ts)
|
|
|
|
if session_ids:
|
|
placeholders = ",".join("?" for _ in session_ids)
|
|
query += f" AND m.session_id IN ({placeholders})"
|
|
params.extend(session_ids)
|
|
|
|
cur = conn.execute(query, params)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
# ─── Metric Computations ───────────────────────────────────────────────────────
|
|
|
|
def compute_knowledge_velocity(facts: list[dict], since: str = None) -> dict:
|
|
"""Metric 1: New facts per day. Higher = compounding working."""
|
|
if not facts:
|
|
return {"value": 0.0, "total_facts": 0, "period_days": 0, "new_facts": 0}
|
|
|
|
dates = []
|
|
for f in facts:
|
|
d = f.get("first_seen") or f.get("created")
|
|
if d:
|
|
try:
|
|
dt = datetime.fromisoformat(d.replace("Z", "+00:00"))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
dates.append(dt)
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
if not dates:
|
|
return {"value": 0.0, "total_facts": len(facts), "period_days": 0, "new_facts": 0}
|
|
|
|
if since:
|
|
cutoff = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
|
|
dates = [d for d in dates if d >= cutoff]
|
|
|
|
if not dates:
|
|
return {"value": 0.0, "total_facts": len(facts), "period_days": 0, "new_facts": 0}
|
|
|
|
earliest = min(dates)
|
|
latest = max(dates)
|
|
period_days = max((latest - earliest).days, 1)
|
|
|
|
return {
|
|
"value": round(len(dates) / period_days, 2),
|
|
"total_facts": len(facts),
|
|
"period_days": period_days,
|
|
"new_facts": len(dates),
|
|
}
|
|
|
|
|
|
def compute_knowledge_coverage(facts: list[dict], yaml_counts: dict[str, int]) -> dict:
|
|
"""Metric 2: % of domains with >10 facts. Breadth indicator."""
|
|
domain_fact_counts: dict[str, int] = defaultdict(int)
|
|
for f in facts:
|
|
domain = f.get("domain", "unknown")
|
|
domain_fact_counts[domain] += 1
|
|
|
|
# Merge YAML counts (may have facts not yet indexed)
|
|
for domain, count in yaml_counts.items():
|
|
domain_fact_counts[domain] = max(domain_fact_counts[domain], count)
|
|
|
|
total_domains = len(domain_fact_counts)
|
|
if total_domains == 0:
|
|
return {"value": 0.0, "covered_domains": 0, "total_domains": 0, "domain_details": {}}
|
|
|
|
covered = sum(1 for c in domain_fact_counts.values() if c >= 10)
|
|
|
|
return {
|
|
"value": round(covered / total_domains, 3),
|
|
"covered_domains": covered,
|
|
"total_domains": total_domains,
|
|
"domain_details": dict(sorted(domain_fact_counts.items(), key=lambda x: -x[1])[:20]),
|
|
}
|
|
|
|
|
|
def compute_hit_rate(sessions: list[dict], messages: list[dict],
|
|
facts: list[dict]) -> dict:
|
|
"""Metric 3: % of sessions that reference bootstrap knowledge.
|
|
|
|
Looks for message content matching known fact text.
|
|
"""
|
|
if not sessions or not facts:
|
|
return {"value": 0.0, "hit_sessions": 0, "total_sessions": len(sessions)}
|
|
|
|
# Build a set of searchable fact fragments (lowercased, 4+ word phrases)
|
|
fact_fragments: set[str] = set()
|
|
for f in facts:
|
|
text = f.get("fact", "").lower().strip()
|
|
# Add full fact
|
|
if len(text) > 10:
|
|
fact_fragments.add(text)
|
|
# Add significant words
|
|
words = re.findall(r'\w{4,}', text)
|
|
for w in words:
|
|
fact_fragments.add(w)
|
|
|
|
if not fact_fragments:
|
|
return {"value": 0.0, "hit_sessions": 0, "total_sessions": len(sessions)}
|
|
|
|
# Group messages by session
|
|
session_messages: dict[str, list[str]] = defaultdict(list)
|
|
for m in messages:
|
|
content = (m.get("content") or "").lower()
|
|
if content:
|
|
session_messages[m["session_id"]].append(content)
|
|
|
|
# Check each session for fact references
|
|
hit_sessions = 0
|
|
for session in sessions:
|
|
sid = session["id"]
|
|
all_content = " ".join(session_messages.get(sid, []))
|
|
if any(frag in all_content for frag in fact_fragments):
|
|
hit_sessions += 1
|
|
|
|
return {
|
|
"value": round(hit_sessions / len(sessions), 3) if sessions else 0.0,
|
|
"hit_sessions": hit_sessions,
|
|
"total_sessions": len(sessions),
|
|
}
|
|
|
|
|
|
def compute_error_recurrence(messages: list[dict]) -> dict:
|
|
"""Metric 4: Same errors appearing across sessions. Should decrease.
|
|
|
|
Extracts error signatures and counts how many sessions each appears in.
|
|
"""
|
|
if not messages:
|
|
return {"value": 0.0, "unique_errors": 0, "recurring_errors": 0, "top_errors": []}
|
|
|
|
# Extract error patterns from assistant/tool messages
|
|
error_pattern = re.compile(
|
|
r'(?:error|Error|ERROR|failed|FAIL|exception|Exception)[:\s]*(.{10,80})',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
error_to_sessions: dict[str, set[str]] = defaultdict(set)
|
|
|
|
for m in messages:
|
|
content = m.get("content") or ""
|
|
if not content:
|
|
continue
|
|
for match in error_pattern.finditer(content):
|
|
sig = match.group(1).strip().lower()
|
|
# Normalize whitespace
|
|
sig = re.sub(r'\s+', ' ', sig)
|
|
if len(sig) > 5:
|
|
error_to_sessions[sig].add(m["session_id"])
|
|
|
|
if not error_to_sessions:
|
|
return {"value": 0.0, "unique_errors": 0, "recurring_errors": 0, "top_errors": []}
|
|
|
|
recurring = {e: s for e, s in error_to_sessions.items() if len(s) > 1}
|
|
total_errors = len(error_to_sessions)
|
|
recurring_count = len(recurring)
|
|
|
|
# Top recurring errors
|
|
top = sorted(recurring.items(), key=lambda x: -len(x[1]))[:10]
|
|
|
|
return {
|
|
"value": round(recurring_count / total_errors, 3) if total_errors else 0.0,
|
|
"unique_errors": total_errors,
|
|
"recurring_errors": recurring_count,
|
|
"top_errors": [{"error": e, "sessions": len(s)} for e, s in top],
|
|
}
|
|
|
|
|
|
def compute_task_completion(sessions: list[dict]) -> dict:
|
|
"""Metric 5: % of sessions ending with successful status."""
|
|
if not sessions:
|
|
return {"value": 0.0, "completed": 0, "total": 0, "breakdown": {}}
|
|
|
|
breakdown: Counter = Counter()
|
|
for s in sessions:
|
|
reason = s.get("end_reason") or "unknown"
|
|
breakdown[reason] += 1
|
|
|
|
completed = breakdown.get("cron_complete", 0) + breakdown.get("session_reset", 0)
|
|
# "cli_close" and "compression" are also normal endings
|
|
normal_endings = completed + breakdown.get("cli_close", 0) + breakdown.get("compression", 0)
|
|
|
|
return {
|
|
"value": round(completed / len(sessions), 3) if sessions else 0.0,
|
|
"normal_end_rate": round(normal_endings / len(sessions), 3) if sessions else 0.0,
|
|
"completed": completed,
|
|
"total": len(sessions),
|
|
"breakdown": dict(breakdown.most_common()),
|
|
}
|
|
|
|
|
|
def compute_first_try_success(sessions: list[dict]) -> dict:
|
|
"""Metric 6: Sessions completed without excessive backtracking.
|
|
|
|
Proxy: ratio of tool_call_count to message_count.
|
|
Low ratio = fewer retries = more first-try success.
|
|
We invert this: high tool/msg ratio means more backtracking (bad).
|
|
"""
|
|
if not sessions:
|
|
return {"value": 0.0, "avg_tool_msg_ratio": 0.0, "sampled": 0}
|
|
|
|
ratios = []
|
|
for s in sessions:
|
|
msgs = s.get("message_count", 0) or 0
|
|
tools = s.get("tool_call_count", 0) or 0
|
|
if msgs > 2: # Skip trivial sessions
|
|
ratios.append(tools / msgs if msgs > 0 else 0)
|
|
|
|
if not ratios:
|
|
return {"value": 0.0, "avg_tool_msg_ratio": 0.0, "sampled": 0}
|
|
|
|
avg_ratio = sum(ratios) / len(ratios)
|
|
# First-try success: sessions with tool_msg_ratio < 0.5 (few tools per message)
|
|
first_try = sum(1 for r in ratios if r < 0.5)
|
|
|
|
return {
|
|
"value": round(first_try / len(ratios), 3),
|
|
"avg_tool_msg_ratio": round(avg_ratio, 3),
|
|
"sampled": len(ratios),
|
|
"interpretation": "Higher value = fewer backtracks = better first-try success",
|
|
}
|
|
|
|
|
|
def compute_knowledge_age(facts: list[dict]) -> dict:
|
|
"""Metric 7: Days since facts were last confirmed. Staleness indicator."""
|
|
if not facts:
|
|
return {"value": 0.0, "avg_age_days": 0, "stale_facts": 0, "total_facts": 0}
|
|
|
|
now = datetime.now(timezone.utc)
|
|
ages = []
|
|
stale_count = 0 # Facts not confirmed in 30+ days
|
|
|
|
for f in facts:
|
|
confirmed = f.get("last_confirmed") or f.get("first_seen")
|
|
if confirmed:
|
|
try:
|
|
dt = datetime.fromisoformat(confirmed.replace("Z", "+00:00"))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
age = (now - dt).days
|
|
ages.append(age)
|
|
if age > 30:
|
|
stale_count += 1
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
|
|
if not ages:
|
|
return {"value": 0.0, "avg_age_days": 0, "stale_facts": 0, "total_facts": len(facts)}
|
|
|
|
avg_age = sum(ages) / len(ages)
|
|
# Lower avg age = fresher = better. Invert for a 0-1 score.
|
|
freshness = max(0.0, 1.0 - (avg_age / 90)) # 90 days = 0 freshness
|
|
|
|
return {
|
|
"value": round(freshness, 3),
|
|
"avg_age_days": round(avg_age, 1),
|
|
"stale_facts": stale_count,
|
|
"total_facts": len(facts),
|
|
"interpretation": "1.0 = all facts fresh. 0.0 = all facts 90+ days old",
|
|
}
|
|
|
|
|
|
# ─── Trend Computation ─────────────────────────────────────────────────────────
|
|
|
|
def compute_trend(current: dict, previous: dict, metric_key: str = "value") -> dict:
|
|
"""Compute 7-day trend between two metric snapshots."""
|
|
if not previous:
|
|
return {"delta": "N/A", "direction": "unknown"}
|
|
|
|
curr_val = current.get(metric_key, 0)
|
|
prev_val = previous.get(metric_key, 0)
|
|
|
|
if prev_val == 0:
|
|
return {"delta": "N/A (no baseline)", "direction": "unknown"}
|
|
|
|
pct = ((curr_val - prev_val) / abs(prev_val)) * 100
|
|
direction = "up" if pct > 0 else "down" if pct < 0 else "flat"
|
|
|
|
# For error_recurrence, down is good
|
|
if metric_key == "error_recurrence" or metric_key == "knowledge_age":
|
|
direction_label = "good" if pct < 0 else "bad" if pct > 0 else "neutral"
|
|
else:
|
|
direction_label = "good" if pct > 0 else "bad" if pct < 0 else "neutral"
|
|
|
|
return {
|
|
"delta": f"{'+' if pct > 0 else ''}{pct:.1f}%",
|
|
"direction": direction,
|
|
"assessment": direction_label,
|
|
}
|
|
|
|
|
|
# ─── Output Formatters ─────────────────────────────────────────────────────────
|
|
|
|
def format_json(metrics: dict) -> str:
|
|
"""Format metrics as JSON."""
|
|
return json.dumps(metrics, indent=2)
|
|
|
|
|
|
def format_markdown(metrics: dict) -> str:
|
|
"""Format metrics as human-readable markdown."""
|
|
lines = [
|
|
"# Compounding Intelligence Metrics",
|
|
f"**Generated:** {metrics.get('generated_at', 'unknown')}",
|
|
"",
|
|
]
|
|
|
|
trend = metrics.get("trend_7d", {})
|
|
|
|
def metric_block(name: str, data: dict, desc: str, good_direction: str = "up"):
|
|
val = data.get("value", 0)
|
|
t = trend.get(name, {})
|
|
delta = t.get("delta", "N/A")
|
|
assessment = t.get("assessment", "unknown")
|
|
arrow = "↑" if assessment == "good" else "↓" if assessment == "bad" else "—"
|
|
|
|
lines.extend([
|
|
f"## {name}",
|
|
f"{desc}",
|
|
"",
|
|
f"**Value:** {val} | **7d trend:** {delta} {arrow} ({assessment})",
|
|
"",
|
|
])
|
|
|
|
# Add key details
|
|
for k, v in data.items():
|
|
if k != "value" and k != "interpretation":
|
|
if isinstance(v, (int, float, str)):
|
|
lines.append(f"- {k}: {v}")
|
|
lines.append("")
|
|
|
|
metric_block(
|
|
"knowledge_velocity",
|
|
metrics.get("knowledge_velocity", {}),
|
|
"New facts extracted per day. Higher = compounding loop working.",
|
|
)
|
|
metric_block(
|
|
"knowledge_coverage",
|
|
metrics.get("knowledge_coverage", {}),
|
|
"Percentage of domains/repos with 10+ facts. Measures breadth.",
|
|
)
|
|
metric_block(
|
|
"hit_rate",
|
|
metrics.get("hit_rate", {}),
|
|
"Percentage of sessions referencing bootstrapped knowledge.",
|
|
)
|
|
metric_block(
|
|
"error_recurrence",
|
|
metrics.get("error_recurrence", {}),
|
|
"Ratio of recurring errors. Lower = fleet learning from mistakes.",
|
|
good_direction="down",
|
|
)
|
|
metric_block(
|
|
"task_completion",
|
|
metrics.get("task_completion", {}),
|
|
"Percentage of sessions ending with successful completion.",
|
|
)
|
|
metric_block(
|
|
"first_try_success",
|
|
metrics.get("first_try_success", {}),
|
|
"Percentage of sessions completed without backtracking.",
|
|
)
|
|
metric_block(
|
|
"knowledge_age",
|
|
metrics.get("knowledge_age", {}),
|
|
"Freshness of knowledge store. 1.0 = all fresh, 0.0 = all stale.",
|
|
good_direction="up",
|
|
)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ─── Snapshot Persistence ───────────────────────────────────────────────────────
|
|
|
|
def load_snapshot(metrics_dir: Path) -> dict:
|
|
"""Load most recent metrics snapshot for trend computation."""
|
|
snapshot_path = metrics_dir / "latest_snapshot.json"
|
|
if snapshot_path.exists():
|
|
with open(snapshot_path) as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
def save_snapshot(metrics_dir: Path, metrics: dict):
|
|
"""Save current metrics as latest snapshot."""
|
|
metrics_dir.mkdir(parents=True, exist_ok=True)
|
|
snapshot_path = metrics_dir / "latest_snapshot.json"
|
|
with open(snapshot_path, "w") as f:
|
|
json.dump(metrics, f, indent=2)
|
|
|
|
|
|
# ─── Main ───────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Compounding Intelligence Metrics")
|
|
parser.add_argument("--since", help="Start date (YYYY-MM-DD)")
|
|
parser.add_argument("--repo", help="Filter by repo/domain")
|
|
parser.add_argument("--format", choices=["json", "markdown"], default="json")
|
|
parser.add_argument("--knowledge-dir", type=Path, default=DEFAULT_KNOWLEDGE_DIR)
|
|
parser.add_argument("--db", type=Path, default=DEFAULT_DB_PATH)
|
|
parser.add_argument("--save-snapshot", action="store_true",
|
|
help="Save current metrics as snapshot for trend tracking")
|
|
parser.add_argument("--metrics-dir", type=Path,
|
|
default=Path(__file__).parent.parent / "metrics",
|
|
help="Directory for snapshots and dashboard")
|
|
args = parser.parse_args()
|
|
|
|
# ── Load data ───────────────────────────────────────────────────────────
|
|
facts = load_facts(args.knowledge_dir)
|
|
yaml_counts = count_yaml_facts(args.knowledge_dir)
|
|
|
|
if args.repo:
|
|
facts = [f for f in facts if f.get("domain") == args.repo]
|
|
|
|
conn = open_db(args.db)
|
|
sessions = query_sessions(conn, since=args.since)
|
|
messages = query_messages(conn) if conn else []
|
|
|
|
if conn:
|
|
conn.close()
|
|
|
|
# ── Compute metrics ─────────────────────────────────────────────────────
|
|
velocity = compute_knowledge_velocity(facts, since=args.since)
|
|
coverage = compute_knowledge_coverage(facts, yaml_counts)
|
|
hit_rate = compute_hit_rate(sessions, messages, facts)
|
|
error_recurrence = compute_error_recurrence(messages)
|
|
task_completion = compute_task_completion(sessions)
|
|
first_try = compute_first_try_success(sessions)
|
|
age = compute_knowledge_age(facts)
|
|
|
|
# ── Compute trends ──────────────────────────────────────────────────────
|
|
previous = load_snapshot(args.metrics_dir)
|
|
trend = {
|
|
"knowledge_velocity": compute_trend(velocity, previous.get("knowledge_velocity", {})),
|
|
"knowledge_coverage": compute_trend(coverage, previous.get("knowledge_coverage", {})),
|
|
"hit_rate": compute_trend(hit_rate, previous.get("hit_rate", {})),
|
|
"error_recurrence": compute_trend(error_recurrence, previous.get("error_recurrence", {}),
|
|
"value"),
|
|
"task_completion": compute_trend(task_completion, previous.get("task_completion", {})),
|
|
"first_try_success": compute_trend(first_try, previous.get("first_try_success", {})),
|
|
"knowledge_age": compute_trend(age, previous.get("knowledge_age", {})),
|
|
}
|
|
|
|
# ── Assemble output ─────────────────────────────────────────────────────
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
metrics = {
|
|
"generated_at": now,
|
|
"knowledge_velocity": velocity,
|
|
"knowledge_coverage": coverage,
|
|
"hit_rate": hit_rate,
|
|
"error_recurrence": error_recurrence,
|
|
"task_completion": task_completion,
|
|
"first_try_success": first_try,
|
|
"knowledge_age": age,
|
|
"trend_7d": trend,
|
|
}
|
|
|
|
if args.since:
|
|
metrics["since"] = args.since
|
|
|
|
# ── Save snapshot if requested ──────────────────────────────────────────
|
|
if args.save_snapshot:
|
|
save_snapshot(args.metrics_dir, metrics)
|
|
# Also write dashboard
|
|
dashboard_path = args.metrics_dir / "dashboard.md"
|
|
with open(dashboard_path, "w") as f:
|
|
f.write(format_markdown(metrics))
|
|
|
|
# ── Output ──────────────────────────────────────────────────────────────
|
|
if args.format == "json":
|
|
print(format_json(metrics))
|
|
else:
|
|
print(format_markdown(metrics))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|