625 lines
20 KiB
Python
Executable File
625 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Quick health snapshot before coding — checks CI, issues, flakiness.
|
|
|
|
A fast status check that shows major red/green signals before deeper work.
|
|
Runs in a few seconds and produces a concise summary.
|
|
|
|
Run: python3 timmy_automations/daily_run/health_snapshot.py
|
|
Env: GITEA_API, GITEA_TOKEN, REPO_SLUG
|
|
|
|
Refs: #710
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import HTTPError, URLError
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
|
|
|
|
DEFAULT_CONFIG = {
|
|
"gitea_api": "http://localhost:3000/api/v1",
|
|
"repo_slug": "rockachopa/Timmy-time-dashboard",
|
|
"token_file": "~/.hermes/gitea_token",
|
|
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
|
|
"flakiness_lookback_cycles": 20,
|
|
"ci_timeout_seconds": 5,
|
|
}
|
|
|
|
|
|
def load_config() -> dict:
|
|
"""Load configuration with fallback to defaults."""
|
|
config = DEFAULT_CONFIG.copy()
|
|
|
|
# Environment variable overrides
|
|
if os.environ.get("TIMMY_GITEA_API"):
|
|
config["gitea_api"] = os.environ["TIMMY_GITEA_API"]
|
|
if os.environ.get("TIMMY_REPO_SLUG"):
|
|
config["repo_slug"] = os.environ["TIMMY_REPO_SLUG"]
|
|
if os.environ.get("TIMMY_GITEA_TOKEN"):
|
|
config["token"] = os.environ["TIMMY_GITEA_TOKEN"]
|
|
|
|
return config
|
|
|
|
|
|
def get_token(config: dict) -> str | None:
|
|
"""Get Gitea token from environment or file.
|
|
|
|
Priority: config["token"] > config["token_file"] > .timmy_gitea_token
|
|
"""
|
|
if "token" in config:
|
|
return config["token"]
|
|
|
|
# Explicit token_file from config takes priority
|
|
token_file_str = config.get("token_file", "")
|
|
if token_file_str:
|
|
token_file = Path(token_file_str)
|
|
if token_file.exists():
|
|
return token_file.read_text().strip()
|
|
|
|
# Fallback: repo-root .timmy_gitea_token
|
|
repo_root = Path(__file__).resolve().parent.parent.parent
|
|
timmy_token_path = repo_root / ".timmy_gitea_token"
|
|
if timmy_token_path.exists():
|
|
return timmy_token_path.read_text().strip()
|
|
|
|
return None
|
|
|
|
|
|
# ── Gitea API Client ──────────────────────────────────────────────────────
|
|
|
|
class GiteaClient:
|
|
"""Simple Gitea API client with graceful degradation."""
|
|
|
|
def __init__(self, config: dict, token: str | None):
|
|
self.api_base = config["gitea_api"].rstrip("/")
|
|
self.repo_slug = config["repo_slug"]
|
|
self.token = token
|
|
self._available: bool | None = None
|
|
|
|
def _headers(self) -> dict:
|
|
headers = {"Accept": "application/json"}
|
|
if self.token:
|
|
headers["Authorization"] = f"token {self.token}"
|
|
return headers
|
|
|
|
def _api_url(self, path: str) -> str:
|
|
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Gitea API is reachable."""
|
|
if self._available is not None:
|
|
return self._available
|
|
|
|
try:
|
|
req = Request(
|
|
f"{self.api_base}/version",
|
|
headers=self._headers(),
|
|
method="GET",
|
|
)
|
|
with urlopen(req, timeout=3) as resp:
|
|
self._available = resp.status == 200
|
|
return self._available
|
|
except (HTTPError, URLError, TimeoutError):
|
|
self._available = False
|
|
return False
|
|
|
|
def get(self, path: str, params: dict | None = None) -> list | dict:
|
|
"""Make a GET request to the Gitea API."""
|
|
url = self._api_url(path)
|
|
if params:
|
|
query = "&".join(f"{k}={v}" for k, v in params.items())
|
|
url = f"{url}?{query}"
|
|
|
|
req = Request(url, headers=self._headers(), method="GET")
|
|
with urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
def get_paginated(self, path: str, params: dict | None = None) -> list:
|
|
"""Fetch all pages of a paginated endpoint."""
|
|
all_items = []
|
|
page = 1
|
|
limit = 50
|
|
|
|
while True:
|
|
page_params = {"limit": limit, "page": page}
|
|
if params:
|
|
page_params.update(params)
|
|
|
|
batch = self.get(path, page_params)
|
|
if not batch:
|
|
break
|
|
|
|
all_items.extend(batch)
|
|
if len(batch) < limit:
|
|
break
|
|
page += 1
|
|
|
|
return all_items
|
|
|
|
|
|
# ── Data Models ───────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class CISignal:
|
|
"""CI pipeline status signal."""
|
|
status: str # "pass", "fail", "unknown", "unavailable"
|
|
message: str
|
|
details: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class IssueSignal:
|
|
"""Critical issues signal."""
|
|
count: int
|
|
p0_count: int
|
|
p1_count: int
|
|
issues: list[dict[str, Any]] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class FlakinessSignal:
|
|
"""Test flakiness/error rate signal."""
|
|
status: str # "healthy", "degraded", "critical", "unknown"
|
|
recent_failures: int
|
|
recent_cycles: int
|
|
failure_rate: float
|
|
message: str
|
|
|
|
|
|
@dataclass
|
|
class TokenEconomySignal:
|
|
"""Token economy temperature indicator."""
|
|
status: str # "balanced", "inflationary", "deflationary", "unknown"
|
|
message: str
|
|
recent_mint: int = 0
|
|
recent_burn: int = 0
|
|
|
|
|
|
@dataclass
|
|
class HealthSnapshot:
|
|
"""Complete health snapshot."""
|
|
timestamp: str
|
|
overall_status: str # "green", "yellow", "red"
|
|
ci: CISignal
|
|
issues: IssueSignal
|
|
flakiness: FlakinessSignal
|
|
tokens: TokenEconomySignal
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"timestamp": self.timestamp,
|
|
"overall_status": self.overall_status,
|
|
"ci": {
|
|
"status": self.ci.status,
|
|
"message": self.ci.message,
|
|
"details": self.ci.details,
|
|
},
|
|
"issues": {
|
|
"count": self.issues.count,
|
|
"p0_count": self.issues.p0_count,
|
|
"p1_count": self.issues.p1_count,
|
|
"issues": self.issues.issues[:5], # Limit to 5
|
|
},
|
|
"flakiness": {
|
|
"status": self.flakiness.status,
|
|
"recent_failures": self.flakiness.recent_failures,
|
|
"recent_cycles": self.flakiness.recent_cycles,
|
|
"failure_rate": round(self.flakiness.failure_rate, 2),
|
|
"message": self.flakiness.message,
|
|
},
|
|
"tokens": {
|
|
"status": self.tokens.status,
|
|
"message": self.tokens.message,
|
|
"recent_mint": self.tokens.recent_mint,
|
|
"recent_burn": self.tokens.recent_burn,
|
|
},
|
|
}
|
|
|
|
|
|
# ── Health Check Functions ────────────────────────────────────────────────
|
|
|
|
def check_ci_status(client: GiteaClient, config: dict) -> CISignal:
|
|
"""Check CI pipeline status from recent commits."""
|
|
try:
|
|
# Get recent commits with status
|
|
commits = client.get_paginated("commits", {"limit": 5})
|
|
|
|
if not commits:
|
|
return CISignal(
|
|
status="unknown",
|
|
message="No recent commits found",
|
|
)
|
|
|
|
# Check status for most recent commit
|
|
latest = commits[0]
|
|
sha = latest.get("sha", "")
|
|
|
|
try:
|
|
statuses = client.get(f"commits/{sha}/status")
|
|
state = statuses.get("state", "unknown")
|
|
|
|
if state == "success":
|
|
return CISignal(
|
|
status="pass",
|
|
message="CI passing",
|
|
details={"sha": sha[:8], "state": state},
|
|
)
|
|
elif state in ("failure", "error"):
|
|
return CISignal(
|
|
status="fail",
|
|
message=f"CI failed ({state})",
|
|
details={"sha": sha[:8], "state": state},
|
|
)
|
|
elif state == "pending":
|
|
return CISignal(
|
|
status="unknown",
|
|
message="CI pending",
|
|
details={"sha": sha[:8], "state": state},
|
|
)
|
|
else:
|
|
return CISignal(
|
|
status="unknown",
|
|
message=f"CI status: {state}",
|
|
details={"sha": sha[:8], "state": state},
|
|
)
|
|
except (HTTPError, URLError) as exc:
|
|
return CISignal(
|
|
status="unknown",
|
|
message=f"Could not fetch CI status: {exc}",
|
|
)
|
|
|
|
except (HTTPError, URLError) as exc:
|
|
return CISignal(
|
|
status="unavailable",
|
|
message=f"CI check failed: {exc}",
|
|
)
|
|
|
|
|
|
def check_critical_issues(client: GiteaClient, config: dict) -> IssueSignal:
|
|
"""Check for open P0/P1 issues."""
|
|
critical_labels = config.get("critical_labels", ["P0", "P1"])
|
|
|
|
try:
|
|
# Fetch open issues
|
|
issues = client.get_paginated("issues", {"state": "open", "limit": 100})
|
|
|
|
p0_issues = []
|
|
p1_issues = []
|
|
other_critical = []
|
|
|
|
for issue in issues:
|
|
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
|
|
|
|
# Check for P0/P1 labels
|
|
is_p0 = any("p0" in l or "critical" in l for l in labels)
|
|
is_p1 = any("p1" in l or "high" in l for l in labels)
|
|
|
|
issue_summary = {
|
|
"number": issue.get("number"),
|
|
"title": issue.get("title", "Untitled")[:60],
|
|
"url": issue.get("html_url", ""),
|
|
}
|
|
|
|
if is_p0:
|
|
p0_issues.append(issue_summary)
|
|
elif is_p1:
|
|
p1_issues.append(issue_summary)
|
|
elif any(cl.lower() in labels for cl in critical_labels):
|
|
other_critical.append(issue_summary)
|
|
|
|
all_critical = p0_issues + p1_issues + other_critical
|
|
|
|
return IssueSignal(
|
|
count=len(all_critical),
|
|
p0_count=len(p0_issues),
|
|
p1_count=len(p1_issues),
|
|
issues=all_critical[:10], # Limit stored issues
|
|
)
|
|
|
|
except (HTTPError, URLError) as exc:
|
|
return IssueSignal(
|
|
count=0,
|
|
p0_count=0,
|
|
p1_count=0,
|
|
issues=[],
|
|
)
|
|
|
|
|
|
def check_flakiness(config: dict) -> FlakinessSignal:
|
|
"""Check test flakiness from cycle retrospective data."""
|
|
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
|
lookback = config.get("flakiness_lookback_cycles", 20)
|
|
|
|
if not retro_file.exists():
|
|
return FlakinessSignal(
|
|
status="unknown",
|
|
recent_failures=0,
|
|
recent_cycles=0,
|
|
failure_rate=0.0,
|
|
message="No cycle data available",
|
|
)
|
|
|
|
try:
|
|
entries = []
|
|
for line in retro_file.read_text().strip().splitlines():
|
|
try:
|
|
entries.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Get recent entries
|
|
recent = entries[-lookback:] if len(entries) > lookback else entries
|
|
|
|
failures = [e for e in recent if not e.get("success", True)]
|
|
failure_count = len(failures)
|
|
total_count = len(recent)
|
|
|
|
if total_count == 0:
|
|
return FlakinessSignal(
|
|
status="unknown",
|
|
recent_failures=0,
|
|
recent_cycles=0,
|
|
failure_rate=0.0,
|
|
message="No recent cycle data",
|
|
)
|
|
|
|
failure_rate = failure_count / total_count
|
|
|
|
# Determine status based on failure rate
|
|
if failure_rate < 0.1:
|
|
status = "healthy"
|
|
message = f"Low flakiness ({failure_rate:.0%})"
|
|
elif failure_rate < 0.3:
|
|
status = "degraded"
|
|
message = f"Moderate flakiness ({failure_rate:.0%})"
|
|
else:
|
|
status = "critical"
|
|
message = f"High flakiness ({failure_rate:.0%})"
|
|
|
|
return FlakinessSignal(
|
|
status=status,
|
|
recent_failures=failure_count,
|
|
recent_cycles=total_count,
|
|
failure_rate=failure_rate,
|
|
message=message,
|
|
)
|
|
|
|
except (OSError, ValueError) as exc:
|
|
return FlakinessSignal(
|
|
status="unknown",
|
|
recent_failures=0,
|
|
recent_cycles=0,
|
|
failure_rate=0.0,
|
|
message=f"Could not read cycle data: {exc}",
|
|
)
|
|
|
|
|
|
def check_token_economy(config: dict) -> TokenEconomySignal:
|
|
"""Check token economy temperature from recent transactions."""
|
|
# This is a simplified check - in a full implementation,
|
|
# this would query the token ledger
|
|
ledger_file = REPO_ROOT / ".loop" / "token_economy.jsonl"
|
|
|
|
if not ledger_file.exists():
|
|
return TokenEconomySignal(
|
|
status="unknown",
|
|
message="No token economy data",
|
|
)
|
|
|
|
try:
|
|
# Read last 24 hours of transactions
|
|
since = datetime.now(timezone.utc) - timedelta(hours=24)
|
|
|
|
recent_mint = 0
|
|
recent_burn = 0
|
|
|
|
for line in ledger_file.read_text().strip().splitlines():
|
|
try:
|
|
tx = json.loads(line)
|
|
tx_time = datetime.fromisoformat(tx.get("timestamp", "1970-01-01").replace("Z", "+00:00"))
|
|
if tx_time >= since:
|
|
delta = tx.get("delta", 0)
|
|
if delta > 0:
|
|
recent_mint += delta
|
|
else:
|
|
recent_burn += abs(delta)
|
|
except (json.JSONDecodeError, ValueError):
|
|
continue
|
|
|
|
# Simple temperature check
|
|
if recent_mint > recent_burn * 2:
|
|
status = "inflationary"
|
|
message = f"High mint activity (+{recent_mint}/-{recent_burn})"
|
|
elif recent_burn > recent_mint * 2:
|
|
status = "deflationary"
|
|
message = f"High burn activity (+{recent_mint}/-{recent_burn})"
|
|
else:
|
|
status = "balanced"
|
|
message = f"Balanced flow (+{recent_mint}/-{recent_burn})"
|
|
|
|
return TokenEconomySignal(
|
|
status=status,
|
|
message=message,
|
|
recent_mint=recent_mint,
|
|
recent_burn=recent_burn,
|
|
)
|
|
|
|
except (OSError, ValueError) as exc:
|
|
return TokenEconomySignal(
|
|
status="unknown",
|
|
message=f"Could not read token data: {exc}",
|
|
)
|
|
|
|
|
|
def calculate_overall_status(
|
|
ci: CISignal,
|
|
issues: IssueSignal,
|
|
flakiness: FlakinessSignal,
|
|
) -> str:
|
|
"""Calculate overall status from individual signals."""
|
|
# Red conditions
|
|
if ci.status == "fail":
|
|
return "red"
|
|
if issues.p0_count > 0:
|
|
return "red"
|
|
if flakiness.status == "critical":
|
|
return "red"
|
|
|
|
# Yellow conditions
|
|
if ci.status == "unknown":
|
|
return "yellow"
|
|
if issues.p1_count > 0:
|
|
return "yellow"
|
|
if flakiness.status == "degraded":
|
|
return "yellow"
|
|
|
|
# Green
|
|
return "green"
|
|
|
|
|
|
# ── Main Functions ────────────────────────────────────────────────────────
|
|
|
|
def generate_snapshot(config: dict, token: str | None) -> HealthSnapshot:
|
|
"""Generate a complete health snapshot."""
|
|
client = GiteaClient(config, token)
|
|
|
|
# Always run all checks (don't short-circuit)
|
|
if client.is_available():
|
|
ci = check_ci_status(client, config)
|
|
issues = check_critical_issues(client, config)
|
|
else:
|
|
ci = CISignal(
|
|
status="unavailable",
|
|
message="Gitea unavailable",
|
|
)
|
|
issues = IssueSignal(count=0, p0_count=0, p1_count=0, issues=[])
|
|
|
|
flakiness = check_flakiness(config)
|
|
tokens = check_token_economy(config)
|
|
|
|
overall = calculate_overall_status(ci, issues, flakiness)
|
|
|
|
return HealthSnapshot(
|
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
|
overall_status=overall,
|
|
ci=ci,
|
|
issues=issues,
|
|
flakiness=flakiness,
|
|
tokens=tokens,
|
|
)
|
|
|
|
|
|
def print_snapshot(snapshot: HealthSnapshot, verbose: bool = False) -> None:
|
|
"""Print a formatted health snapshot."""
|
|
# Status emoji
|
|
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
|
|
snapshot.overall_status, "⚪"
|
|
)
|
|
|
|
print("=" * 60)
|
|
print(f"{status_emoji} HEALTH SNAPSHOT")
|
|
print("=" * 60)
|
|
print(f"Generated: {snapshot.timestamp}")
|
|
print(f"Overall: {snapshot.overall_status.upper()}")
|
|
print()
|
|
|
|
# CI Status
|
|
ci_emoji = {"pass": "✅", "fail": "❌", "unknown": "⚠️", "unavailable": "⚪"}.get(
|
|
snapshot.ci.status, "⚪"
|
|
)
|
|
print(f"{ci_emoji} CI: {snapshot.ci.message}")
|
|
|
|
# Issues
|
|
if snapshot.issues.p0_count > 0:
|
|
issue_emoji = "🔴"
|
|
elif snapshot.issues.p1_count > 0:
|
|
issue_emoji = "🟡"
|
|
else:
|
|
issue_emoji = "✅"
|
|
print(f"{issue_emoji} Issues: {snapshot.issues.count} critical")
|
|
if snapshot.issues.p0_count > 0:
|
|
print(f" 🔴 P0: {snapshot.issues.p0_count}")
|
|
if snapshot.issues.p1_count > 0:
|
|
print(f" 🟡 P1: {snapshot.issues.p1_count}")
|
|
|
|
# Flakiness
|
|
flak_emoji = {"healthy": "✅", "degraded": "🟡", "critical": "🔴", "unknown": "⚪"}.get(
|
|
snapshot.flakiness.status, "⚪"
|
|
)
|
|
print(f"{flak_emoji} Flakiness: {snapshot.flakiness.message}")
|
|
|
|
# Token Economy
|
|
token_emoji = {"balanced": "✅", "inflationary": "🟡", "deflationary": "🔵", "unknown": "⚪"}.get(
|
|
snapshot.tokens.status, "⚪"
|
|
)
|
|
print(f"{token_emoji} Tokens: {snapshot.tokens.message}")
|
|
|
|
# Verbose: show issue details
|
|
if verbose and snapshot.issues.issues:
|
|
print()
|
|
print("Critical Issues:")
|
|
for issue in snapshot.issues.issues[:5]:
|
|
print(f" #{issue['number']}: {issue['title'][:50]}")
|
|
|
|
print()
|
|
print("─" * 60)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(
|
|
description="Quick health snapshot before coding",
|
|
)
|
|
p.add_argument(
|
|
"--json", "-j",
|
|
action="store_true",
|
|
help="Output as JSON",
|
|
)
|
|
p.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Show verbose output including issue details",
|
|
)
|
|
p.add_argument(
|
|
"--quiet", "-q",
|
|
action="store_true",
|
|
help="Only show status line (no details)",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
"""Main entry point for CLI."""
|
|
args = parse_args()
|
|
config = load_config()
|
|
token = get_token(config)
|
|
|
|
snapshot = generate_snapshot(config, token)
|
|
|
|
if args.json:
|
|
print(json.dumps(snapshot.to_dict(), indent=2))
|
|
elif args.quiet:
|
|
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
|
|
snapshot.overall_status, "⚪"
|
|
)
|
|
print(f"{status_emoji} {snapshot.overall_status.upper()}")
|
|
else:
|
|
print_snapshot(snapshot, verbose=args.verbose)
|
|
|
|
# Exit with non-zero if red status
|
|
return 0 if snapshot.overall_status != "red" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|