257 lines
8.2 KiB
Python
Executable File
257 lines
8.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Timmy workflow dashboard.
|
|
|
|
Shows current workflow state from the active local surfaces instead of the
|
|
archived dashboard/loop era.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
HERMES_HOME = Path.home() / ".hermes"
|
|
TIMMY_HOME = Path.home() / ".timmy"
|
|
METRICS_DIR = TIMMY_HOME / "metrics"
|
|
CORE_REPOS = [
|
|
"Timmy_Foundation/the-nexus",
|
|
"Timmy_Foundation/timmy-home",
|
|
"Timmy_Foundation/timmy-config",
|
|
"Timmy_Foundation/hermes-agent",
|
|
]
|
|
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000").rstrip("/")
|
|
|
|
|
|
def read_token() -> str | None:
|
|
for path in [
|
|
Path.home() / ".hermes" / "gitea_token_vps",
|
|
Path.home() / ".config" / "gitea" / "token",
|
|
Path.home() / ".config" / "gitea" / "codex-token",
|
|
]:
|
|
if path.exists():
|
|
return path.read_text().strip()
|
|
return None
|
|
|
|
|
|
def gitea_get(path: str, token: str | None) -> list | dict:
|
|
headers = {"Authorization": f"token {token}"} if token else {}
|
|
req = urllib.request.Request(f"{GITEA_URL}/api/v1{path}", headers=headers)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def get_model_health() -> dict:
|
|
path = HERMES_HOME / "model_health.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_last_tick() -> dict:
|
|
path = TIMMY_HOME / "heartbeat" / "last_tick.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_archive_checkpoint() -> dict:
|
|
path = TIMMY_HOME / "twitter-archive" / "checkpoint.json"
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_local_metrics(hours: int = 24) -> list[dict]:
|
|
records = []
|
|
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
if not METRICS_DIR.exists():
|
|
return records
|
|
for f in sorted(METRICS_DIR.glob("local_*.jsonl")):
|
|
for line in f.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
record = json.loads(line)
|
|
ts = datetime.fromisoformat(record["timestamp"])
|
|
if ts >= cutoff:
|
|
records.append(record)
|
|
except Exception:
|
|
continue
|
|
return records
|
|
|
|
|
|
def get_heartbeat_ticks(date_str: str | None = None) -> list[dict]:
|
|
if not date_str:
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
tick_file = TIMMY_HOME / "heartbeat" / f"ticks_{date_str}.jsonl"
|
|
if not tick_file.exists():
|
|
return []
|
|
ticks = []
|
|
for line in tick_file.read_text().splitlines():
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
ticks.append(json.loads(line))
|
|
except Exception:
|
|
continue
|
|
return ticks
|
|
|
|
|
|
def get_review_and_issue_state(token: str | None) -> dict:
|
|
state = {"prs": [], "review_queue": [], "unassigned": 0}
|
|
for repo in CORE_REPOS:
|
|
try:
|
|
prs = gitea_get(f"/repos/{repo}/pulls?state=open&limit=20", token)
|
|
for pr in prs:
|
|
pr["_repo"] = repo
|
|
state["prs"].append(pr)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
issue_prs = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=pulls", token)
|
|
for item in issue_prs:
|
|
assignees = [a.get("login", "") for a in (item.get("assignees") or [])]
|
|
if any(name in assignees for name in ("Timmy", "allegro")):
|
|
item["_repo"] = repo
|
|
state["review_queue"].append(item)
|
|
except Exception:
|
|
continue
|
|
try:
|
|
issues = gitea_get(f"/repos/{repo}/issues?state=open&limit=50&type=issues", token)
|
|
state["unassigned"] += sum(1 for issue in issues if not issue.get("assignees"))
|
|
except Exception:
|
|
continue
|
|
return state
|
|
|
|
|
|
DIM = "\033[2m"
|
|
BOLD = "\033[1m"
|
|
GREEN = "\033[32m"
|
|
YELLOW = "\033[33m"
|
|
RED = "\033[31m"
|
|
CYAN = "\033[36m"
|
|
RST = "\033[0m"
|
|
CLR = "\033[2J\033[H"
|
|
|
|
|
|
def render(hours: int = 24) -> None:
|
|
token = read_token()
|
|
metrics = get_local_metrics(hours)
|
|
ticks = get_heartbeat_ticks()
|
|
health = get_model_health()
|
|
last_tick = get_last_tick()
|
|
checkpoint = get_archive_checkpoint()
|
|
gitea = get_review_and_issue_state(token)
|
|
|
|
print(CLR, end="")
|
|
print(f"{BOLD}{'=' * 72}")
|
|
print(f" TIMMY WORKFLOW DASHBOARD")
|
|
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"{'=' * 72}{RST}")
|
|
|
|
print(f"\n {BOLD}HEARTBEAT{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if last_tick:
|
|
sev = last_tick.get("decision", {}).get("severity", "?")
|
|
tick_id = last_tick.get("tick_id", "?")
|
|
model_decisions = sum(
|
|
1 for tick in ticks
|
|
if isinstance(tick.get("decision"), dict)
|
|
and tick["decision"].get("severity") != "fallback"
|
|
)
|
|
print(f" last tick: {tick_id}")
|
|
print(f" severity: {sev}")
|
|
print(f" ticks today: {len(ticks)} | model decisions: {model_decisions}")
|
|
else:
|
|
print(f" {DIM}(no heartbeat data){RST}")
|
|
|
|
print(f"\n {BOLD}MODEL HEALTH{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if health:
|
|
provider = GREEN if health.get("api_responding") else RED
|
|
inference = GREEN if health.get("inference_ok") else YELLOW
|
|
print(f" provider: {provider}{health.get('api_responding')}{RST}")
|
|
print(f" inference: {inference}{health.get('inference_ok')}{RST}")
|
|
print(f" models: {', '.join(health.get('models_loaded', [])[:4]) or '(none reported)'}")
|
|
else:
|
|
print(f" {DIM}(no model_health.json){RST}")
|
|
|
|
print(f"\n {BOLD}ARCHIVE PIPELINE{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if checkpoint:
|
|
print(f" batches completed: {checkpoint.get('batches_completed', '?')}")
|
|
print(f" next offset: {checkpoint.get('next_offset', '?')}")
|
|
print(f" phase: {checkpoint.get('phase', '?')}")
|
|
else:
|
|
print(f" {DIM}(no archive checkpoint yet){RST}")
|
|
|
|
print(f"\n {BOLD}LOCAL METRICS ({len(metrics)} calls, last {hours}h){RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if metrics:
|
|
by_caller = {}
|
|
for record in metrics:
|
|
caller = record.get("caller", "unknown")
|
|
by_caller.setdefault(caller, {"count": 0, "success": 0, "errors": 0})
|
|
by_caller[caller]["count"] += 1
|
|
if record.get("success"):
|
|
by_caller[caller]["success"] += 1
|
|
else:
|
|
by_caller[caller]["errors"] += 1
|
|
for caller, stats in sorted(by_caller.items()):
|
|
print(f" {caller:24s} calls={stats['count']:3d} ok={stats['success']:3d} err={stats['errors']:3d}")
|
|
else:
|
|
print(f" {DIM}(no local metrics yet){RST}")
|
|
|
|
print(f"\n {BOLD}REVIEW QUEUE{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
if gitea["review_queue"]:
|
|
for item in gitea["review_queue"][:8]:
|
|
repo = item["_repo"].split("/", 1)[1]
|
|
print(f" {repo:12s} #{item['number']:<4d} {item['title'][:42]}")
|
|
else:
|
|
print(f" {DIM}(clear){RST}")
|
|
|
|
print(f"\n {BOLD}OPEN PRS / UNASSIGNED{RST}")
|
|
print(f" {DIM}{'-' * 58}{RST}")
|
|
print(f" open PRs: {len(gitea['prs'])}")
|
|
print(f" unassigned issues: {gitea['unassigned']}")
|
|
for pr in gitea["prs"][:6]:
|
|
repo = pr["_repo"].split("/", 1)[1]
|
|
print(f" PR {repo:10s} #{pr['number']:<4d} {pr['title'][:40]}")
|
|
|
|
print(f"\n{BOLD}{'=' * 72}{RST}")
|
|
print(f" {DIM}Refresh: timmy-dashboard --watch | History: --hours=N{RST}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
watch = "--watch" in sys.argv
|
|
hours = 24
|
|
for arg in sys.argv[1:]:
|
|
if arg.startswith("--hours="):
|
|
hours = int(arg.split("=", 1)[1])
|
|
|
|
if watch:
|
|
try:
|
|
while True:
|
|
render(hours)
|
|
time.sleep(30)
|
|
except KeyboardInterrupt:
|
|
print(f"\n{DIM}Dashboard stopped.{RST}")
|
|
else:
|
|
render(hours)
|