Co-authored-by: Codex Agent <codex@hermes.local> Co-committed-by: Codex Agent <codex@hermes.local>
344 lines
11 KiB
Python
344 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Timmy workflow dashboard.
|
|
|
|
Shows current workflow state from the active local surfaces instead of the
|
|
archived dashboard/loop era, while preserving useful local/session metrics.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
if str(REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
from metrics_helpers import summarize_local_metrics, summarize_session_rows
|
|
|
|
HERMES_HOME = Path.home() / ".hermes"
|
|
TIMMY_HOME = Path.home() / ".timmy"
|
|
METRICS_DIR = TIMMY_HOME / "metrics"
|
|
CORE_REPOS = [
|
|
"Timmy_Foundation/the-nexus",
|
|
"Timmy_Foundation/timmy-home",
|
|
"Timmy_Foundation/timmy-config",
|
|
"Timmy_Foundation/hermes-agent",
|
|
]
|
|
def resolve_gitea_url() -> str:
|
|
env = os.environ.get("GITEA_URL")
|
|
if env:
|
|
return env.rstrip("/")
|
|
api_hint = HERMES_HOME / "gitea_api"
|
|
if api_hint.exists():
|
|
raw = api_hint.read_text().strip().rstrip("/")
|
|
return raw[:-7] if raw.endswith("/api/v1") else raw
|
|
base_url = Path.home() / ".config" / "gitea" / "base-url"
|
|
if base_url.exists():
|
|
return base_url.read_text().strip().rstrip("/")
|
|
raise FileNotFoundError("Set GITEA_URL or create ~/.hermes/gitea_api")
|
|
|
|
|
|
GITEA_URL = resolve_gitea_url()
|
|
|
|
|
|
def read_token() -> str | None:
|
|
for path in [
|
|
Path.home() / ".config" / "gitea" / "timmy-token",
|
|
Path.home() / ".hermes" / "gitea_token_vps",
|
|
Path.home() / ".hermes" / "gitea_token_timmy",
|
|
]:
|
|
if path.exists():
|
|
return path.read_text().strip()
|
|
return None
|
|
|
|
|
|
def gitea_get(path: str, token: str | None) -> list | dict:
|
|
headers = {"Authorization": f"token {token}"} if token else {}
|
|
req = urllib.request.Request(f"{GITEA_URL}/api/v1{path}", headers=headers)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def get_model_health() -> dict:
|
|
path = HERMES_HOME / "model_health.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_last_tick() -> dict:
|
|
path = TIMMY_HOME / "heartbeat" / "last_tick.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_archive_checkpoint() -> dict:
|
|
path = TIMMY_HOME / "twitter-archive" / "checkpoint.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_local_metrics(hours: int = 24) -> list[dict]:
|
|
records = []
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
if not METRICS_DIR.exists():
|
|
return records
|
|
for path in sorted(METRICS_DIR.glob("local_*.jsonl")):
|
|
for line in path.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
record = json.loads(line)
|
|
ts = datetime.fromisoformat(record["timestamp"])
|
|
if ts >= cutoff:
|
|
records.append(record)
|
|
except Exception:
|
|
continue
|
|
return records
|
|
|
|
|
|
def get_hermes_sessions() -> list[dict]:
|
|
sessions_file = HERMES_HOME / "sessions" / "sessions.json"
|
|
if not sessions_file.exists():
|
|
return []
|
|
try:
|
|
data = json.loads(sessions_file.read_text())
|
|
return list(data.values())
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_session_rows(hours: int = 24):
|
|
state_db = HERMES_HOME / "state.db"
|
|
if not state_db.exists():
|
|
return []
|
|
cutoff = time.time() - (hours * 3600)
|
|
try:
|
|
conn = sqlite3.connect(str(state_db))
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT model, source, COUNT(*) as sessions,
|
|
SUM(message_count) as msgs,
|
|
SUM(tool_call_count) as tools
|
|
FROM sessions
|
|
WHERE started_at > ? AND model IS NOT NULL AND model != ''
|
|
GROUP BY model, source
|
|
""",
|
|
(cutoff,),
|
|
).fetchall()
|
|
conn.close()
|
|
return rows
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_heartbeat_ticks(date_str: str | None = None) -> list[dict]:
|
|
if not date_str:
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
tick_file = TIMMY_HOME / "heartbeat" / f"ticks_{date_str}.jsonl"
|
|
if not tick_file.exists():
|
|
return []
|
|
ticks = []
|
|
for line in tick_file.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
ticks.append(json.loads(line))
|
|
except Exception:
|
|
continue
|
|
return ticks
|
|
|
|
|
|
def get_review_and_issue_state(token: str | None) -> dict:
|
|
state = {"prs": [], "review_queue": [], "unassigned": 0}
|
|
for repo in CORE_REPOS:
|
|
try:
|
|
prs = gitea_get(f"/repos/{repo}/pulls?state=open&limit=20", token)
|
|
for pr in prs:
|
|
pr["_repo"] = repo
|
|
state["prs"].append(pr)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
issue_prs = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=pulls", token)
|
|
for item in issue_prs:
|
|
assignees = [a.get("login", "") for a in (item.get("assignees") or [])]
|
|
if any(name in assignees for name in ("Timmy", "allegro")):
|
|
item["_repo"] = repo
|
|
state["review_queue"].append(item)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
issues = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=issues", token)
|
|
state["unassigned"] += sum(1 for issue in issues if not issue.get("assignees"))
|
|
except Exception:
|
|
continue
|
|
return state
|
|
|
|
|
|
DIM = "\033[2m"
|
|
BOLD = "\033[1m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
RED = "\033[31m"
|
|
CYAN = "\033[36m"
|
|
RST = "\033[0m"
|
|
CLR = "\033[2J\033[H"
|
|
|
|
|
|
def render(hours: int = 24) -> None:
|
|
token = read_token()
|
|
metrics = get_local_metrics(hours)
|
|
local_summary = summarize_local_metrics(metrics)
|
|
ticks = get_heartbeat_ticks()
|
|
health = get_model_health()
|
|
last_tick = get_last_tick()
|
|
checkpoint = get_archive_checkpoint()
|
|
sessions = get_hermes_sessions()
|
|
session_rows = get_session_rows(hours)
|
|
session_summary = summarize_session_rows(session_rows)
|
|
gitea = get_review_and_issue_state(token)
|
|
|
|
print(CLR, end="")
|
|
print(f"{BOLD}{'=' * 72}")
|
|
print(" TIMMY WORKFLOW DASHBOARD")
|
|
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"{'=' * 72}{RST}")
|
|
|
|
print(f"\n {BOLD}HEARTBEAT{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if last_tick:
|
|
sev = last_tick.get("decision", {}).get("severity", "?")
|
|
tick_id = last_tick.get("tick_id", "?")
|
|
model_decisions = sum(
|
|
1
|
|
for tick in ticks
|
|
if isinstance(tick.get("decision"), dict)
|
|
and tick["decision"].get("severity") != "fallback"
|
|
)
|
|
print(f" last tick: {tick_id}")
|
|
print(f" severity: {sev}")
|
|
print(f" ticks today: {len(ticks)} | model decisions: {model_decisions}")
|
|
else:
|
|
print(f" {DIM}(no heartbeat data){RST}")
|
|
|
|
print(f"\n {BOLD}MODEL HEALTH{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if health:
|
|
provider = GREEN if health.get("api_responding") else RED
|
|
inference = GREEN if health.get("inference_ok") else YELLOW
|
|
print(f" provider: {provider}{health.get('api_responding')}{RST}")
|
|
print(f" inference: {inference}{health.get('inference_ok')}{RST}")
|
|
print(f" models: {', '.join(health.get('models_loaded', [])[:4]) or '(none reported)'}")
|
|
else:
|
|
print(f" {DIM}(no model_health.json){RST}")
|
|
|
|
print(f"\n {BOLD}ARCHIVE PIPELINE{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if checkpoint:
|
|
print(f" batches completed: {checkpoint.get('batches_completed', '?')}")
|
|
print(f" next offset: {checkpoint.get('next_offset', '?')}")
|
|
print(f" phase: {checkpoint.get('phase', '?')}")
|
|
else:
|
|
print(f" {DIM}(no archive checkpoint yet){RST}")
|
|
|
|
print(f"\n {BOLD}LOCAL METRICS ({len(metrics)} calls, last {hours}h){RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if metrics:
|
|
print(
|
|
f" Tokens: {local_summary['input_tokens']} in | "
|
|
f"{local_summary['output_tokens']} out | "
|
|
f"{local_summary['total_tokens']} total"
|
|
)
|
|
if local_summary.get("avg_latency_s") is not None:
|
|
print(f" Avg latency: {local_summary['avg_latency_s']:.2f}s")
|
|
if local_summary.get("avg_tokens_per_second") is not None:
|
|
print(f" Avg throughput: {GREEN}{local_summary['avg_tokens_per_second']:.2f} tok/s{RST}")
|
|
for caller, stats in sorted(local_summary["by_caller"].items()):
|
|
err = f" {RED}err:{stats['failed_calls']}{RST}" if stats["failed_calls"] else ""
|
|
print(
|
|
f" {caller:24s} calls={stats['calls']:3d} "
|
|
f"tok={stats['total_tokens']:5d} {GREEN}ok:{stats['successful_calls']}{RST}{err}"
|
|
)
|
|
else:
|
|
print(f" {DIM}(no local metrics yet){RST}")
|
|
|
|
print(f"\n {BOLD}SESSION LOAD{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
local_sessions = [s for s in sessions if "localhost" in str(s.get("base_url", ""))]
|
|
cloud_sessions = [s for s in sessions if s not in local_sessions]
|
|
print(
|
|
f" Session cache: {len(sessions)} total | "
|
|
f"{GREEN}{len(local_sessions)} local{RST} | "
|
|
f"{YELLOW}{len(cloud_sessions)} remote{RST}"
|
|
)
|
|
if session_rows:
|
|
print(
|
|
f" Session DB: {session_summary['total_sessions']} total | "
|
|
f"{GREEN}{session_summary['local_sessions']} local{RST} | "
|
|
f"{YELLOW}{session_summary['cloud_sessions']} remote{RST}"
|
|
)
|
|
print(
|
|
f" Token est: {GREEN}{session_summary['local_est_tokens']} local{RST} | "
|
|
f"{YELLOW}{session_summary['cloud_est_tokens']} remote{RST}"
|
|
)
|
|
print(f" Est remote cost: ${session_summary['cloud_est_cost_usd']:.4f}")
|
|
else:
|
|
print(f" {DIM}(no session-db stats available){RST}")
|
|
|
|
print(f"\n {BOLD}REVIEW QUEUE{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if gitea["review_queue"]:
|
|
for item in gitea["review_queue"][:8]:
|
|
repo = item["_repo"].split("/", 1)[1]
|
|
print(f" {repo:12s} #{item['number']:<4d} {item['title'][:42]}")
|
|
else:
|
|
print(f" {DIM}(clear){RST}")
|
|
|
|
print(f"\n {BOLD}OPEN PRS / UNASSIGNED{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
print(f" open PRs: {len(gitea['prs'])}")
|
|
print(f" unassigned issues: {gitea['unassigned']}")
|
|
for pr in gitea["prs"][:6]:
|
|
repo = pr["_repo"].split("/", 1)[1]
|
|
print(f" PR {repo:10s} #{pr['number']:<4d} {pr['title'][:40]}")
|
|
|
|
print(f"\n{BOLD}{'=' * 72}{RST}")
|
|
print(f" {DIM}Refresh: timmy-dashboard --watch | History: --hours=N{RST}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
watch = "--watch" in sys.argv
|
|
hours = 24
|
|
for arg in sys.argv[1:]:
|
|
if arg.startswith("--hours="):
|
|
hours = int(arg.split("=", 1)[1])
|
|
|
|
if watch:
|
|
try:
|
|
while True:
|
|
render(hours)
|
|
time.sleep(30)
|
|
except KeyboardInterrupt:
|
|
print(f"\n{DIM}Dashboard stopped.{RST}")
|
|
else:
|
|
render(hours)
|