Files
timmy-config/bin/timmy-dashboard

344 lines
11 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python3
"""Timmy workflow dashboard.
Shows current workflow state from the active local surfaces instead of the
archived dashboard/loop era, while preserving useful local/session metrics.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from metrics_helpers import summarize_local_metrics, summarize_session_rows
HERMES_HOME = Path.home() / ".hermes"
TIMMY_HOME = Path.home() / ".timmy"
METRICS_DIR = TIMMY_HOME / "metrics"
CORE_REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/hermes-agent",
]
def resolve_gitea_url() -> str:
env = os.environ.get("GITEA_URL")
if env:
return env.rstrip("/")
api_hint = HERMES_HOME / "gitea_api"
if api_hint.exists():
raw = api_hint.read_text().strip().rstrip("/")
return raw[:-7] if raw.endswith("/api/v1") else raw
base_url = Path.home() / ".config" / "gitea" / "base-url"
if base_url.exists():
return base_url.read_text().strip().rstrip("/")
raise FileNotFoundError("Set GITEA_URL or create ~/.hermes/gitea_api")
GITEA_URL = resolve_gitea_url()
def read_token() -> str | None:
for path in [
Path.home() / ".config" / "gitea" / "timmy-token",
Path.home() / ".hermes" / "gitea_token_vps",
Path.home() / ".hermes" / "gitea_token_timmy",
]:
if path.exists():
return path.read_text().strip()
return None
def gitea_get(path: str, token: str | None) -> list | dict:
headers = {"Authorization": f"token {token}"} if token else {}
req = urllib.request.Request(f"{GITEA_URL}/api/v1{path}", headers=headers)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read().decode())
def get_model_health() -> dict:
path = HERMES_HOME / "model_health.json"
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
def get_last_tick() -> dict:
path = TIMMY_HOME / "heartbeat" / "last_tick.json"
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
def get_archive_checkpoint() -> dict:
path = TIMMY_HOME / "twitter-archive" / "checkpoint.json"
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except Exception:
return {}
def get_local_metrics(hours: int = 24) -> list[dict]:
records = []
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
if not METRICS_DIR.exists():
return records
for path in sorted(METRICS_DIR.glob("local_*.jsonl")):
for line in path.read_text().splitlines():
if not line.strip():
continue
try:
record = json.loads(line)
ts = datetime.fromisoformat(record["timestamp"])
if ts >= cutoff:
records.append(record)
except Exception:
continue
return records
def get_hermes_sessions() -> list[dict]:
sessions_file = HERMES_HOME / "sessions" / "sessions.json"
if not sessions_file.exists():
return []
try:
data = json.loads(sessions_file.read_text())
return list(data.values())
except Exception:
return []
def get_session_rows(hours: int = 24):
state_db = HERMES_HOME / "state.db"
if not state_db.exists():
return []
cutoff = time.time() - (hours * 3600)
try:
conn = sqlite3.connect(str(state_db))
rows = conn.execute(
"""
SELECT model, source, COUNT(*) as sessions,
SUM(message_count) as msgs,
SUM(tool_call_count) as tools
FROM sessions
WHERE started_at > ? AND model IS NOT NULL AND model != ''
GROUP BY model, source
""",
(cutoff,),
).fetchall()
conn.close()
return rows
except Exception:
return []
def get_heartbeat_ticks(date_str: str | None = None) -> list[dict]:
if not date_str:
date_str = datetime.now().strftime("%Y%m%d")
tick_file = TIMMY_HOME / "heartbeat" / f"ticks_{date_str}.jsonl"
if not tick_file.exists():
return []
ticks = []
for line in tick_file.read_text().splitlines():
if not line.strip():
continue
try:
ticks.append(json.loads(line))
except Exception:
continue
return ticks
def get_review_and_issue_state(token: str | None) -> dict:
state = {"prs": [], "review_queue": [], "unassigned": 0}
for repo in CORE_REPOS:
try:
prs = gitea_get(f"/repos/{repo}/pulls?state=open&limit=20", token)
for pr in prs:
pr["_repo"] = repo
state["prs"].append(pr)
except Exception:
continue
try:
issue_prs = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=pulls", token)
for item in issue_prs:
assignees = [a.get("login", "") for a in (item.get("assignees") or [])]
if any(name in assignees for name in ("Timmy", "allegro")):
item["_repo"] = repo
state["review_queue"].append(item)
except Exception:
continue
try:
issues = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=issues", token)
state["unassigned"] += sum(1 for issue in issues if not issue.get("assignees"))
except Exception:
continue
return state
DIM = "\033[2m"
BOLD = "\033[1m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
RED = "\033[31m"
CYAN = "\033[36m"
RST = "\033[0m"
CLR = "\033[2J\033[H"
def render(hours: int = 24) -> None:
token = read_token()
metrics = get_local_metrics(hours)
local_summary = summarize_local_metrics(metrics)
ticks = get_heartbeat_ticks()
health = get_model_health()
last_tick = get_last_tick()
checkpoint = get_archive_checkpoint()
sessions = get_hermes_sessions()
session_rows = get_session_rows(hours)
session_summary = summarize_session_rows(session_rows)
gitea = get_review_and_issue_state(token)
print(CLR, end="")
print(f"{BOLD}{'=' * 72}")
print(" TIMMY WORKFLOW DASHBOARD")
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'=' * 72}{RST}")
print(f"\n {BOLD}HEARTBEAT{RST}")
print(f" {DIM}{'-' * 58}{RST}")
if last_tick:
sev = last_tick.get("decision", {}).get("severity", "?")
tick_id = last_tick.get("tick_id", "?")
model_decisions = sum(
1
for tick in ticks
if isinstance(tick.get("decision"), dict)
and tick["decision"].get("severity") != "fallback"
)
print(f" last tick: {tick_id}")
print(f" severity: {sev}")
print(f" ticks today: {len(ticks)} | model decisions: {model_decisions}")
else:
print(f" {DIM}(no heartbeat data){RST}")
print(f"\n {BOLD}MODEL HEALTH{RST}")
print(f" {DIM}{'-' * 58}{RST}")
if health:
provider = GREEN if health.get("api_responding") else RED
inference = GREEN if health.get("inference_ok") else YELLOW
print(f" provider: {provider}{health.get('api_responding')}{RST}")
print(f" inference: {inference}{health.get('inference_ok')}{RST}")
print(f" models: {', '.join(health.get('models_loaded', [])[:4]) or '(none reported)'}")
else:
print(f" {DIM}(no model_health.json){RST}")
print(f"\n {BOLD}ARCHIVE PIPELINE{RST}")
print(f" {DIM}{'-' * 58}{RST}")
if checkpoint:
print(f" batches completed: {checkpoint.get('batches_completed', '?')}")
print(f" next offset: {checkpoint.get('next_offset', '?')}")
print(f" phase: {checkpoint.get('phase', '?')}")
else:
print(f" {DIM}(no archive checkpoint yet){RST}")
print(f"\n {BOLD}LOCAL METRICS ({len(metrics)} calls, last {hours}h){RST}")
print(f" {DIM}{'-' * 58}{RST}")
if metrics:
print(
f" Tokens: {local_summary['input_tokens']} in | "
f"{local_summary['output_tokens']} out | "
f"{local_summary['total_tokens']} total"
)
if local_summary.get("avg_latency_s") is not None:
print(f" Avg latency: {local_summary['avg_latency_s']:.2f}s")
if local_summary.get("avg_tokens_per_second") is not None:
print(f" Avg throughput: {GREEN}{local_summary['avg_tokens_per_second']:.2f} tok/s{RST}")
for caller, stats in sorted(local_summary["by_caller"].items()):
err = f" {RED}err:{stats['failed_calls']}{RST}" if stats["failed_calls"] else ""
print(
f" {caller:24s} calls={stats['calls']:3d} "
f"tok={stats['total_tokens']:5d} {GREEN}ok:{stats['successful_calls']}{RST}{err}"
)
else:
print(f" {DIM}(no local metrics yet){RST}")
print(f"\n {BOLD}SESSION LOAD{RST}")
print(f" {DIM}{'-' * 58}{RST}")
local_sessions = [s for s in sessions if "localhost" in str(s.get("base_url", ""))]
cloud_sessions = [s for s in sessions if s not in local_sessions]
print(
f" Session cache: {len(sessions)} total | "
f"{GREEN}{len(local_sessions)} local{RST} | "
f"{YELLOW}{len(cloud_sessions)} remote{RST}"
)
if session_rows:
print(
f" Session DB: {session_summary['total_sessions']} total | "
f"{GREEN}{session_summary['local_sessions']} local{RST} | "
f"{YELLOW}{session_summary['cloud_sessions']} remote{RST}"
)
print(
f" Token est: {GREEN}{session_summary['local_est_tokens']} local{RST} | "
f"{YELLOW}{session_summary['cloud_est_tokens']} remote{RST}"
)
print(f" Est remote cost: ${session_summary['cloud_est_cost_usd']:.4f}")
else:
print(f" {DIM}(no session-db stats available){RST}")
print(f"\n {BOLD}REVIEW QUEUE{RST}")
print(f" {DIM}{'-' * 58}{RST}")
if gitea["review_queue"]:
for item in gitea["review_queue"][:8]:
repo = item["_repo"].split("/", 1)[1]
print(f" {repo:12s} #{item['number']:<4d} {item['title'][:42]}")
else:
print(f" {DIM}(clear){RST}")
print(f"\n {BOLD}OPEN PRS / UNASSIGNED{RST}")
print(f" {DIM}{'-' * 58}{RST}")
print(f" open PRs: {len(gitea['prs'])}")
print(f" unassigned issues: {gitea['unassigned']}")
for pr in gitea["prs"][:6]:
repo = pr["_repo"].split("/", 1)[1]
print(f" PR {repo:10s} #{pr['number']:<4d} {pr['title'][:40]}")
print(f"\n{BOLD}{'=' * 72}{RST}")
print(f" {DIM}Refresh: timmy-dashboard --watch | History: --hours=N{RST}")
if __name__ == "__main__":
watch = "--watch" in sys.argv
hours = 24
for arg in sys.argv[1:]:
if arg.startswith("--hours="):
hours = int(arg.split("=", 1)[1])
if watch:
try:
while True:
render(hours)
time.sleep(30)
except KeyboardInterrupt:
print(f"\n{DIM}Dashboard stopped.{RST}")
else:
render(hours)