#!/usr/bin/env python3 """Timmy workflow dashboard. Shows current workflow state from the active local surfaces instead of the archived dashboard/loop era, while preserving useful local/session metrics. """ from __future__ import annotations import json import os import sqlite3 import sys import time import urllib.request from datetime import datetime, timedelta, timezone from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from metrics_helpers import summarize_local_metrics, summarize_session_rows HERMES_HOME = Path.home() / ".hermes" TIMMY_HOME = Path.home() / ".timmy" METRICS_DIR = TIMMY_HOME / "metrics" CORE_REPOS = [ "Timmy_Foundation/the-nexus", "Timmy_Foundation/timmy-home", "Timmy_Foundation/timmy-config", "Timmy_Foundation/hermes-agent", ] def resolve_gitea_url() -> str: env = os.environ.get("GITEA_URL") if env: return env.rstrip("/") api_hint = HERMES_HOME / "gitea_api" if api_hint.exists(): raw = api_hint.read_text().strip().rstrip("/") return raw[:-7] if raw.endswith("/api/v1") else raw base_url = Path.home() / ".config" / "gitea" / "base-url" if base_url.exists(): return base_url.read_text().strip().rstrip("/") raise FileNotFoundError("Set GITEA_URL or create ~/.hermes/gitea_api") GITEA_URL = resolve_gitea_url() def read_token() -> str | None: for path in [ Path.home() / ".config" / "gitea" / "timmy-token", Path.home() / ".hermes" / "gitea_token_vps", Path.home() / ".hermes" / "gitea_token_timmy", ]: if path.exists(): return path.read_text().strip() return None def gitea_get(path: str, token: str | None) -> list | dict: headers = {"Authorization": f"token {token}"} if token else {} req = urllib.request.Request(f"{GITEA_URL}/api/v1{path}", headers=headers) with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read().decode()) def get_model_health() -> dict: path = HERMES_HOME / "model_health.json" if not path.exists(): return {} try: return json.loads(path.read_text()) except Exception: return {} def get_last_tick() -> dict: path = TIMMY_HOME / "heartbeat" / "last_tick.json" if not path.exists(): return {} try: return json.loads(path.read_text()) except Exception: return {} def get_archive_checkpoint() -> dict: path = TIMMY_HOME / "twitter-archive" / "checkpoint.json" if not path.exists(): return {} try: return json.loads(path.read_text()) except Exception: return {} def get_local_metrics(hours: int = 24) -> list[dict]: records = [] cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) if not METRICS_DIR.exists(): return records for path in sorted(METRICS_DIR.glob("local_*.jsonl")): for line in path.read_text().splitlines(): if not line.strip(): continue try: record = json.loads(line) ts = datetime.fromisoformat(record["timestamp"]) if ts >= cutoff: records.append(record) except Exception: continue return records def get_hermes_sessions() -> list[dict]: sessions_file = HERMES_HOME / "sessions" / "sessions.json" if not sessions_file.exists(): return [] try: data = json.loads(sessions_file.read_text()) return list(data.values()) except Exception: return [] def get_session_rows(hours: int = 24): state_db = HERMES_HOME / "state.db" if not state_db.exists(): return [] cutoff = time.time() - (hours * 3600) try: conn = sqlite3.connect(str(state_db)) rows = conn.execute( """ SELECT model, source, COUNT(*) as sessions, SUM(message_count) as msgs, SUM(tool_call_count) as tools FROM sessions WHERE started_at > ? AND model IS NOT NULL AND model != '' GROUP BY model, source """, (cutoff,), ).fetchall() conn.close() return rows except Exception: return [] def get_heartbeat_ticks(date_str: str | None = None) -> list[dict]: if not date_str: date_str = datetime.now().strftime("%Y%m%d") tick_file = TIMMY_HOME / "heartbeat" / f"ticks_{date_str}.jsonl" if not tick_file.exists(): return [] ticks = [] for line in tick_file.read_text().splitlines(): if not line.strip(): continue try: ticks.append(json.loads(line)) except Exception: continue return ticks def get_review_and_issue_state(token: str | None) -> dict: state = {"prs": [], "review_queue": [], "unassigned": 0} for repo in CORE_REPOS: try: prs = gitea_get(f"/repos/{repo}/pulls?state=open&limit=20", token) for pr in prs: pr["_repo"] = repo state["prs"].append(pr) except Exception: continue try: issue_prs = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=pulls", token) for item in issue_prs: assignees = [a.get("login", "") for a in (item.get("assignees") or [])] if any(name in assignees for name in ("Timmy", "allegro")): item["_repo"] = repo state["review_queue"].append(item) except Exception: continue try: issues = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=issues", token) state["unassigned"] += sum(1 for issue in issues if not issue.get("assignees")) except Exception: continue return state DIM = "\033[2m" BOLD = "\033[1m" GREEN = "\033[32m" YELLOW = "\033[33m" RED = "\033[31m" CYAN = "\033[36m" RST = "\033[0m" CLR = "\033[2J\033[H" def render(hours: int = 24) -> None: token = read_token() metrics = get_local_metrics(hours) local_summary = summarize_local_metrics(metrics) ticks = get_heartbeat_ticks() health = get_model_health() last_tick = get_last_tick() checkpoint = get_archive_checkpoint() sessions = get_hermes_sessions() session_rows = get_session_rows(hours) session_summary = summarize_session_rows(session_rows) gitea = get_review_and_issue_state(token) print(CLR, end="") print(f"{BOLD}{'=' * 72}") print(" TIMMY WORKFLOW DASHBOARD") print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"{'=' * 72}{RST}") print(f"\n {BOLD}HEARTBEAT{RST}") print(f" {DIM}{'-' * 58}{RST}") if last_tick: sev = last_tick.get("decision", {}).get("severity", "?") tick_id = last_tick.get("tick_id", "?") model_decisions = sum( 1 for tick in ticks if isinstance(tick.get("decision"), dict) and tick["decision"].get("severity") != "fallback" ) print(f" last tick: {tick_id}") print(f" severity: {sev}") print(f" ticks today: {len(ticks)} | model decisions: {model_decisions}") else: print(f" {DIM}(no heartbeat data){RST}") print(f"\n {BOLD}MODEL HEALTH{RST}") print(f" {DIM}{'-' * 58}{RST}") if health: provider = GREEN if health.get("api_responding") else RED inference = GREEN if health.get("inference_ok") else YELLOW print(f" provider: {provider}{health.get('api_responding')}{RST}") print(f" inference: {inference}{health.get('inference_ok')}{RST}") print(f" models: {', '.join(health.get('models_loaded', [])[:4]) or '(none reported)'}") else: print(f" {DIM}(no model_health.json){RST}") print(f"\n {BOLD}ARCHIVE PIPELINE{RST}") print(f" {DIM}{'-' * 58}{RST}") if checkpoint: print(f" batches completed: {checkpoint.get('batches_completed', '?')}") print(f" next offset: {checkpoint.get('next_offset', '?')}") print(f" phase: {checkpoint.get('phase', '?')}") else: print(f" {DIM}(no archive checkpoint yet){RST}") print(f"\n {BOLD}LOCAL METRICS ({len(metrics)} calls, last {hours}h){RST}") print(f" {DIM}{'-' * 58}{RST}") if metrics: print( f" Tokens: {local_summary['input_tokens']} in | " f"{local_summary['output_tokens']} out | " f"{local_summary['total_tokens']} total" ) if local_summary.get("avg_latency_s") is not None: print(f" Avg latency: {local_summary['avg_latency_s']:.2f}s") if local_summary.get("avg_tokens_per_second") is not None: print(f" Avg throughput: {GREEN}{local_summary['avg_tokens_per_second']:.2f} tok/s{RST}") for caller, stats in sorted(local_summary["by_caller"].items()): err = f" {RED}err:{stats['failed_calls']}{RST}" if stats["failed_calls"] else "" print( f" {caller:24s} calls={stats['calls']:3d} " f"tok={stats['total_tokens']:5d} {GREEN}ok:{stats['successful_calls']}{RST}{err}" ) else: print(f" {DIM}(no local metrics yet){RST}") print(f"\n {BOLD}SESSION LOAD{RST}") print(f" {DIM}{'-' * 58}{RST}") local_sessions = [s for s in sessions if "localhost" in str(s.get("base_url", ""))] cloud_sessions = [s for s in sessions if s not in local_sessions] print( f" Session cache: {len(sessions)} total | " f"{GREEN}{len(local_sessions)} local{RST} | " f"{YELLOW}{len(cloud_sessions)} remote{RST}" ) if session_rows: print( f" Session DB: {session_summary['total_sessions']} total | " f"{GREEN}{session_summary['local_sessions']} local{RST} | " f"{YELLOW}{session_summary['cloud_sessions']} remote{RST}" ) print( f" Token est: {GREEN}{session_summary['local_est_tokens']} local{RST} | " f"{YELLOW}{session_summary['cloud_est_tokens']} remote{RST}" ) print(f" Est remote cost: ${session_summary['cloud_est_cost_usd']:.4f}") else: print(f" {DIM}(no session-db stats available){RST}") print(f"\n {BOLD}REVIEW QUEUE{RST}") print(f" {DIM}{'-' * 58}{RST}") if gitea["review_queue"]: for item in gitea["review_queue"][:8]: repo = item["_repo"].split("/", 1)[1] print(f" {repo:12s} #{item['number']:<4d} {item['title'][:42]}") else: print(f" {DIM}(clear){RST}") print(f"\n {BOLD}OPEN PRS / UNASSIGNED{RST}") print(f" {DIM}{'-' * 58}{RST}") print(f" open PRs: {len(gitea['prs'])}") print(f" unassigned issues: {gitea['unassigned']}") for pr in gitea["prs"][:6]: repo = pr["_repo"].split("/", 1)[1] print(f" PR {repo:10s} #{pr['number']:<4d} {pr['title'][:40]}") print(f"\n{BOLD}{'=' * 72}{RST}") print(f" {DIM}Refresh: timmy-dashboard --watch | History: --hours=N{RST}") if __name__ == "__main__": watch = "--watch" in sys.argv hours = 24 for arg in sys.argv[1:]: if arg.startswith("--hours="): hours = int(arg.split("=", 1)[1]) if watch: try: while True: render(hours) time.sleep(30) except KeyboardInterrupt: print(f"\n{DIM}Dashboard stopped.{RST}") else: render(hours)