From 2f15435fed48fe14e671ebad76fa49a25821b8d7 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Sat, 21 Mar 2026 21:53:40 +0000 Subject: [PATCH] [kimi] Implement quick health snapshot before coding (#710) (#828) --- src/dashboard/routes/health.py | 51 ++ src/timmy/cli.py | 38 ++ tests/test_smoke.py | 8 + .../timmy_automations/test_health_snapshot.py | 401 ++++++++++++ timmy_automations/config/automations.json | 19 + .../daily_run/health_snapshot.py | 619 ++++++++++++++++++ 6 files changed, 1136 insertions(+) create mode 100644 tests/timmy_automations/test_health_snapshot.py create mode 100755 timmy_automations/daily_run/health_snapshot.py diff --git a/src/dashboard/routes/health.py b/src/dashboard/routes/health.py index f9a19614..581e7057 100644 --- a/src/dashboard/routes/health.py +++ b/src/dashboard/routes/health.py @@ -275,3 +275,54 @@ async def component_status(): }, "timestamp": datetime.now(UTC).isoformat(), } + + +@router.get("/health/snapshot") +async def health_snapshot(): + """Quick health snapshot before coding. + + Returns a concise status summary including: + - CI pipeline status (pass/fail/unknown) + - Critical issues count (P0/P1) + - Test flakiness rate + - Token economy temperature + + Fast execution (< 5 seconds) for pre-work checks. + Refs: #710 + """ + import sys + from pathlib import Path + + # Import the health snapshot module + snapshot_path = Path(settings.repo_root) / "timmy_automations" / "daily_run" + if str(snapshot_path) not in sys.path: + sys.path.insert(0, str(snapshot_path)) + + try: + from health_snapshot import generate_snapshot, get_token, load_config + + config = load_config() + token = get_token(config) + + # Run the health snapshot (in thread to avoid blocking) + snapshot = await asyncio.to_thread(generate_snapshot, config, token) + + return snapshot.to_dict() + except Exception as exc: + logger.warning("Health snapshot failed: %s", exc) + # Return graceful fallback + return { + "timestamp": datetime.now(UTC).isoformat(), + "overall_status": "unknown", + "error": str(exc), + "ci": {"status": "unknown", "message": "Snapshot failed"}, + "issues": {"count": 0, "p0_count": 0, "p1_count": 0, "issues": []}, + "flakiness": { + "status": "unknown", + "recent_failures": 0, + "recent_cycles": 0, + "failure_rate": 0.0, + "message": "Snapshot failed", + }, + "tokens": {"status": "unknown", "message": "Snapshot failed"}, + } diff --git a/src/timmy/cli.py b/src/timmy/cli.py index 5ffa82ae..ef65dbfe 100644 --- a/src/timmy/cli.py +++ b/src/timmy/cli.py @@ -489,5 +489,43 @@ def focus( typer.echo("No active focus (broad mode).") +@app.command(name="healthcheck") +def healthcheck( + json_output: bool = typer.Option(False, "--json", "-j", help="Output as JSON"), + verbose: bool = typer.Option( + False, "--verbose", "-v", help="Show verbose output including issue details" + ), + quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"), +): + """Quick health snapshot before coding. + + Shows CI status, critical issues (P0/P1), test flakiness, and token economy. + Fast execution (< 5 seconds) for pre-work checks. + + Refs: #710 + """ + import subprocess + import sys + from pathlib import Path + + script_path = ( + Path(__file__).resolve().parent.parent.parent + / "timmy_automations" + / "daily_run" + / "health_snapshot.py" + ) + + cmd = [sys.executable, str(script_path)] + if json_output: + cmd.append("--json") + if verbose: + cmd.append("--verbose") + if quiet: + cmd.append("--quiet") + + result = subprocess.run(cmd) + raise typer.Exit(result.returncode) + + def main(): app() diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 4dd25207..c988b948 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -130,6 +130,13 @@ class TestAPIEndpoints: r = client.get("/health/sovereignty") assert r.status_code == 200 + def test_health_snapshot(self, client): + r = client.get("/health/snapshot") + assert r.status_code == 200 + data = r.json() + assert "overall_status" in data + assert data["overall_status"] in ["green", "yellow", "red", "unknown"] + def test_queue_status(self, client): r = client.get("/api/queue/status") assert r.status_code == 200 @@ -186,6 +193,7 @@ class TestNo500: "/health", "/health/status", "/health/sovereignty", + "/health/snapshot", "/health/components", "/agents/default/panel", "/agents/default/history", diff --git a/tests/timmy_automations/test_health_snapshot.py b/tests/timmy_automations/test_health_snapshot.py new file mode 100644 index 00000000..2cc2cb1c --- /dev/null +++ b/tests/timmy_automations/test_health_snapshot.py @@ -0,0 +1,401 @@ +"""Tests for health_snapshot module.""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from unittest.mock import patch + +# Add timmy_automations to path for imports +sys.path.insert( + 0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run") +) + +from datetime import UTC + +import health_snapshot as hs + + +class TestLoadConfig: + """Test configuration loading.""" + + def test_loads_default_config(self): + """Load default configuration.""" + config = hs.load_config() + + assert "gitea_api" in config + assert "repo_slug" in config + assert "critical_labels" in config + assert "flakiness_lookback_cycles" in config + + def test_environment_overrides(self, monkeypatch): + """Environment variables override defaults.""" + monkeypatch.setenv("TIMMY_GITEA_API", "http://test:3000/api/v1") + monkeypatch.setenv("TIMMY_REPO_SLUG", "test/repo") + + config = hs.load_config() + + assert config["gitea_api"] == "http://test:3000/api/v1" + assert config["repo_slug"] == "test/repo" + + +class TestGetToken: + """Test token retrieval.""" + + def test_returns_config_token(self): + """Return token from config if present.""" + config = {"token": "test-token-123"} + token = hs.get_token(config) + + assert token == "test-token-123" + + def test_reads_from_file(self, tmp_path, monkeypatch): + """Read token from file if no config token.""" + token_file = tmp_path / "gitea_token" + token_file.write_text("file-token-456") + + config = {"token_file": str(token_file)} + token = hs.get_token(config) + + assert token == "file-token-456" + + def test_returns_none_when_no_token(self): + """Return None when no token available.""" + config = {"token_file": "/nonexistent/path"} + token = hs.get_token(config) + + assert token is None + + +class TestCISignal: + """Test CISignal dataclass.""" + + def test_default_details(self): + """Details defaults to empty dict.""" + signal = hs.CISignal(status="pass", message="CI passing") + + assert signal.details == {} + + def test_with_details(self): + """Can include details.""" + signal = hs.CISignal(status="pass", message="CI passing", details={"sha": "abc123"}) + + assert signal.details["sha"] == "abc123" + + +class TestIssueSignal: + """Test IssueSignal dataclass.""" + + def test_default_issues_list(self): + """Issues defaults to empty list.""" + signal = hs.IssueSignal(count=0, p0_count=0, p1_count=0) + + assert signal.issues == [] + + def test_with_issues(self): + """Can include issues.""" + issues = [{"number": 1, "title": "Test"}] + signal = hs.IssueSignal(count=1, p0_count=1, p1_count=0, issues=issues) + + assert len(signal.issues) == 1 + + +class TestFlakinessSignal: + """Test FlakinessSignal dataclass.""" + + def test_calculated_fields(self): + """All fields set correctly.""" + signal = hs.FlakinessSignal( + status="healthy", + recent_failures=2, + recent_cycles=20, + failure_rate=0.1, + message="Low flakiness", + ) + + assert signal.status == "healthy" + assert signal.recent_failures == 2 + assert signal.failure_rate == 0.1 + + +class TestHealthSnapshot: + """Test HealthSnapshot dataclass.""" + + def test_to_dict_structure(self): + """to_dict produces expected structure.""" + snapshot = hs.HealthSnapshot( + timestamp="2026-01-01T00:00:00+00:00", + overall_status="green", + ci=hs.CISignal(status="pass", message="CI passing"), + issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0), + flakiness=hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ), + tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"), + ) + + data = snapshot.to_dict() + + assert data["timestamp"] == "2026-01-01T00:00:00+00:00" + assert data["overall_status"] == "green" + assert "ci" in data + assert "issues" in data + assert "flakiness" in data + assert "tokens" in data + + def test_to_dict_limits_issues(self): + """to_dict limits issues to 5.""" + many_issues = [{"number": i, "title": f"Issue {i}"} for i in range(10)] + snapshot = hs.HealthSnapshot( + timestamp="2026-01-01T00:00:00+00:00", + overall_status="green", + ci=hs.CISignal(status="pass", message="CI passing"), + issues=hs.IssueSignal(count=10, p0_count=5, p1_count=5, issues=many_issues), + flakiness=hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ), + tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"), + ) + + data = snapshot.to_dict() + + assert len(data["issues"]["issues"]) == 5 + + +class TestCalculateOverallStatus: + """Test overall status calculation.""" + + def test_green_when_all_healthy(self): + """Status is green when all signals healthy.""" + ci = hs.CISignal(status="pass", message="CI passing") + issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0) + flakiness = hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "green" + + def test_red_when_ci_fails(self): + """Status is red when CI fails.""" + ci = hs.CISignal(status="fail", message="CI failed") + issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0) + flakiness = hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "red" + + def test_red_when_p0_issues(self): + """Status is red when P0 issues exist.""" + ci = hs.CISignal(status="pass", message="CI passing") + issues = hs.IssueSignal(count=1, p0_count=1, p1_count=0) + flakiness = hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "red" + + def test_yellow_when_p1_issues(self): + """Status is yellow when P1 issues exist.""" + ci = hs.CISignal(status="pass", message="CI passing") + issues = hs.IssueSignal(count=1, p0_count=0, p1_count=1) + flakiness = hs.FlakinessSignal( + status="healthy", + recent_failures=0, + recent_cycles=10, + failure_rate=0.0, + message="All good", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "yellow" + + def test_yellow_when_flakiness_degraded(self): + """Status is yellow when flakiness degraded.""" + ci = hs.CISignal(status="pass", message="CI passing") + issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0) + flakiness = hs.FlakinessSignal( + status="degraded", + recent_failures=5, + recent_cycles=20, + failure_rate=0.25, + message="Moderate flakiness", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "yellow" + + def test_red_when_flakiness_critical(self): + """Status is red when flakiness critical.""" + ci = hs.CISignal(status="pass", message="CI passing") + issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0) + flakiness = hs.FlakinessSignal( + status="critical", + recent_failures=10, + recent_cycles=20, + failure_rate=0.5, + message="High flakiness", + ) + + status = hs.calculate_overall_status(ci, issues, flakiness) + + assert status == "red" + + +class TestCheckFlakiness: + """Test flakiness checking.""" + + def test_no_data_returns_unknown(self, tmp_path, monkeypatch): + """Return unknown when no cycle data exists.""" + monkeypatch.setattr(hs, "REPO_ROOT", tmp_path) + config = {"flakiness_lookback_cycles": 20} + + signal = hs.check_flakiness(config) + + assert signal.status == "unknown" + assert signal.message == "No cycle data available" + + def test_calculates_failure_rate(self, tmp_path, monkeypatch): + """Calculate failure rate from cycle data.""" + monkeypatch.setattr(hs, "REPO_ROOT", tmp_path) + + retro_dir = tmp_path / ".loop" / "retro" + retro_dir.mkdir(parents=True) + + cycles = [ + json.dumps({"success": True, "cycle": 1}), + json.dumps({"success": True, "cycle": 2}), + json.dumps({"success": False, "cycle": 3}), + json.dumps({"success": True, "cycle": 4}), + json.dumps({"success": False, "cycle": 5}), + ] + retro_file = retro_dir / "cycles.jsonl" + retro_file.write_text("\n".join(cycles)) + + config = {"flakiness_lookback_cycles": 20} + signal = hs.check_flakiness(config) + + assert signal.recent_cycles == 5 + assert signal.recent_failures == 2 + assert signal.failure_rate == 0.4 + assert signal.status == "critical" # 40% > 30% + + +class TestCheckTokenEconomy: + """Test token economy checking.""" + + def test_no_data_returns_unknown(self, tmp_path, monkeypatch): + """Return unknown when no token data exists.""" + monkeypatch.setattr(hs, "REPO_ROOT", tmp_path) + config = {} + + signal = hs.check_token_economy(config) + + assert signal.status == "unknown" + + def test_calculates_balanced(self, tmp_path, monkeypatch): + """Detect balanced token economy.""" + monkeypatch.setattr(hs, "REPO_ROOT", tmp_path) + + loop_dir = tmp_path / ".loop" + loop_dir.mkdir(parents=True) + + from datetime import datetime + + now = datetime.now(UTC).isoformat() + transactions = [ + json.dumps({"timestamp": now, "delta": 10}), + json.dumps({"timestamp": now, "delta": -5}), + ] + ledger_file = loop_dir / "token_economy.jsonl" + ledger_file.write_text("\n".join(transactions)) + + config = {} + signal = hs.check_token_economy(config) + + assert signal.status == "balanced" + assert signal.recent_mint == 10 + assert signal.recent_burn == 5 + + +class TestGiteaClient: + """Test Gitea API client.""" + + def test_initialization(self): + """Initialize with config and token.""" + config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"} + client = hs.GiteaClient(config, "token123") + + assert client.api_base == "http://test:3000/api/v1" + assert client.repo_slug == "test/repo" + assert client.token == "token123" + + def test_headers_with_token(self): + """Include authorization header with token.""" + config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"} + client = hs.GiteaClient(config, "token123") + + headers = client._headers() + + assert headers["Authorization"] == "token token123" + assert headers["Accept"] == "application/json" + + def test_headers_without_token(self): + """No authorization header without token.""" + config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"} + client = hs.GiteaClient(config, None) + + headers = client._headers() + + assert "Authorization" not in headers + assert headers["Accept"] == "application/json" + + +class TestGenerateSnapshot: + """Test snapshot generation.""" + + def test_returns_snapshot(self): + """Generate a complete snapshot.""" + config = hs.load_config() + + with ( + patch.object(hs.GiteaClient, "is_available", return_value=False), + patch.object(hs.GiteaClient, "__init__", return_value=None), + ): + snapshot = hs.generate_snapshot(config, None) + + assert isinstance(snapshot, hs.HealthSnapshot) + assert snapshot.overall_status in ["green", "yellow", "red", "unknown"] + assert snapshot.ci is not None + assert snapshot.issues is not None + assert snapshot.flakiness is not None + assert snapshot.tokens is not None diff --git a/timmy_automations/config/automations.json b/timmy_automations/config/automations.json index 29ecc529..8478c05b 100644 --- a/timmy_automations/config/automations.json +++ b/timmy_automations/config/automations.json @@ -1,6 +1,9 @@ { "version": "1.0.0", "description": "Master manifest of all Timmy automations", + "_health_snapshot": { + "note": "Quick health check before coding — CI, P0/P1 issues, flakiness" + }, "last_updated": "2026-03-21", "automations": [ { @@ -249,6 +252,22 @@ ".loop/weekly_narrative.json", ".loop/weekly_narrative.md" ] + }, + { + "id": "health_snapshot", + "name": "Health Snapshot", + "description": "Quick health check before coding — CI status, P0/P1 issues, test flakiness, token economy", + "script": "timmy_automations/daily_run/health_snapshot.py", + "category": "daily_run", + "enabled": true, + "trigger": "pre_cycle", + "executable": "python3", + "config": { + "critical_labels": ["P0", "P1", "priority/critical", "priority/high"], + "flakiness_lookback_cycles": 20, + "ci_timeout_seconds": 5 + }, + "outputs": [] } ] } diff --git a/timmy_automations/daily_run/health_snapshot.py b/timmy_automations/daily_run/health_snapshot.py new file mode 100755 index 00000000..dbcc44cb --- /dev/null +++ b/timmy_automations/daily_run/health_snapshot.py @@ -0,0 +1,619 @@ +#!/usr/bin/env python3 +"""Quick health snapshot before coding — checks CI, issues, flakiness. + +A fast status check that shows major red/green signals before deeper work. +Runs in a few seconds and produces a concise summary. + +Run: python3 timmy_automations/daily_run/health_snapshot.py +Env: GITEA_API, GITEA_TOKEN, REPO_SLUG + +Refs: #710 +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any +from urllib.request import Request, urlopen +from urllib.error import HTTPError, URLError + +# ── Configuration ───────────────────────────────────────────────────────── + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent + +DEFAULT_CONFIG = { + "gitea_api": "http://localhost:3000/api/v1", + "repo_slug": "rockachopa/Timmy-time-dashboard", + "token_file": "~/.hermes/gitea_token", + "critical_labels": ["P0", "P1", "priority/critical", "priority/high"], + "flakiness_lookback_cycles": 20, + "ci_timeout_seconds": 5, +} + + +def load_config() -> dict: + """Load configuration with fallback to defaults.""" + config = DEFAULT_CONFIG.copy() + + # Environment variable overrides + if os.environ.get("TIMMY_GITEA_API"): + config["gitea_api"] = os.environ["TIMMY_GITEA_API"] + if os.environ.get("TIMMY_REPO_SLUG"): + config["repo_slug"] = os.environ["TIMMY_REPO_SLUG"] + if os.environ.get("TIMMY_GITEA_TOKEN"): + config["token"] = os.environ["TIMMY_GITEA_TOKEN"] + + return config + + +def get_token(config: dict) -> str | None: + """Get Gitea token from environment or file.""" + if "token" in config: + return config["token"] + + # Try timmy's token file + repo_root = Path(__file__).resolve().parent.parent.parent + timmy_token_path = repo_root / ".timmy_gitea_token" + if timmy_token_path.exists(): + return timmy_token_path.read_text().strip() + + # Fallback to legacy token file + token_file = Path(config["token_file"]).expanduser() + if token_file.exists(): + return token_file.read_text().strip() + + return None + + +# ── Gitea API Client ────────────────────────────────────────────────────── + +class GiteaClient: + """Simple Gitea API client with graceful degradation.""" + + def __init__(self, config: dict, token: str | None): + self.api_base = config["gitea_api"].rstrip("/") + self.repo_slug = config["repo_slug"] + self.token = token + self._available: bool | None = None + + def _headers(self) -> dict: + headers = {"Accept": "application/json"} + if self.token: + headers["Authorization"] = f"token {self.token}" + return headers + + def _api_url(self, path: str) -> str: + return f"{self.api_base}/repos/{self.repo_slug}/{path}" + + def is_available(self) -> bool: + """Check if Gitea API is reachable.""" + if self._available is not None: + return self._available + + try: + req = Request( + f"{self.api_base}/version", + headers=self._headers(), + method="GET", + ) + with urlopen(req, timeout=3) as resp: + self._available = resp.status == 200 + return self._available + except (HTTPError, URLError, TimeoutError): + self._available = False + return False + + def get(self, path: str, params: dict | None = None) -> list | dict: + """Make a GET request to the Gitea API.""" + url = self._api_url(path) + if params: + query = "&".join(f"{k}={v}" for k, v in params.items()) + url = f"{url}?{query}" + + req = Request(url, headers=self._headers(), method="GET") + with urlopen(req, timeout=10) as resp: + return json.loads(resp.read()) + + def get_paginated(self, path: str, params: dict | None = None) -> list: + """Fetch all pages of a paginated endpoint.""" + all_items = [] + page = 1 + limit = 50 + + while True: + page_params = {"limit": limit, "page": page} + if params: + page_params.update(params) + + batch = self.get(path, page_params) + if not batch: + break + + all_items.extend(batch) + if len(batch) < limit: + break + page += 1 + + return all_items + + +# ── Data Models ─────────────────────────────────────────────────────────── + +@dataclass +class CISignal: + """CI pipeline status signal.""" + status: str # "pass", "fail", "unknown", "unavailable" + message: str + details: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class IssueSignal: + """Critical issues signal.""" + count: int + p0_count: int + p1_count: int + issues: list[dict[str, Any]] = field(default_factory=list) + + +@dataclass +class FlakinessSignal: + """Test flakiness/error rate signal.""" + status: str # "healthy", "degraded", "critical", "unknown" + recent_failures: int + recent_cycles: int + failure_rate: float + message: str + + +@dataclass +class TokenEconomySignal: + """Token economy temperature indicator.""" + status: str # "balanced", "inflationary", "deflationary", "unknown" + message: str + recent_mint: int = 0 + recent_burn: int = 0 + + +@dataclass +class HealthSnapshot: + """Complete health snapshot.""" + timestamp: str + overall_status: str # "green", "yellow", "red" + ci: CISignal + issues: IssueSignal + flakiness: FlakinessSignal + tokens: TokenEconomySignal + + def to_dict(self) -> dict[str, Any]: + return { + "timestamp": self.timestamp, + "overall_status": self.overall_status, + "ci": { + "status": self.ci.status, + "message": self.ci.message, + "details": self.ci.details, + }, + "issues": { + "count": self.issues.count, + "p0_count": self.issues.p0_count, + "p1_count": self.issues.p1_count, + "issues": self.issues.issues[:5], # Limit to 5 + }, + "flakiness": { + "status": self.flakiness.status, + "recent_failures": self.flakiness.recent_failures, + "recent_cycles": self.flakiness.recent_cycles, + "failure_rate": round(self.flakiness.failure_rate, 2), + "message": self.flakiness.message, + }, + "tokens": { + "status": self.tokens.status, + "message": self.tokens.message, + "recent_mint": self.tokens.recent_mint, + "recent_burn": self.tokens.recent_burn, + }, + } + + +# ── Health Check Functions ──────────────────────────────────────────────── + +def check_ci_status(client: GiteaClient, config: dict) -> CISignal: + """Check CI pipeline status from recent commits.""" + try: + # Get recent commits with status + commits = client.get_paginated("commits", {"limit": 5}) + + if not commits: + return CISignal( + status="unknown", + message="No recent commits found", + ) + + # Check status for most recent commit + latest = commits[0] + sha = latest.get("sha", "") + + try: + statuses = client.get(f"commits/{sha}/status") + state = statuses.get("state", "unknown") + + if state == "success": + return CISignal( + status="pass", + message="CI passing", + details={"sha": sha[:8], "state": state}, + ) + elif state in ("failure", "error"): + return CISignal( + status="fail", + message=f"CI failed ({state})", + details={"sha": sha[:8], "state": state}, + ) + elif state == "pending": + return CISignal( + status="unknown", + message="CI pending", + details={"sha": sha[:8], "state": state}, + ) + else: + return CISignal( + status="unknown", + message=f"CI status: {state}", + details={"sha": sha[:8], "state": state}, + ) + except (HTTPError, URLError) as exc: + return CISignal( + status="unknown", + message=f"Could not fetch CI status: {exc}", + ) + + except (HTTPError, URLError) as exc: + return CISignal( + status="unavailable", + message=f"CI check failed: {exc}", + ) + + +def check_critical_issues(client: GiteaClient, config: dict) -> IssueSignal: + """Check for open P0/P1 issues.""" + critical_labels = config.get("critical_labels", ["P0", "P1"]) + + try: + # Fetch open issues + issues = client.get_paginated("issues", {"state": "open", "limit": 100}) + + p0_issues = [] + p1_issues = [] + other_critical = [] + + for issue in issues: + labels = [l.get("name", "").lower() for l in issue.get("labels", [])] + + # Check for P0/P1 labels + is_p0 = any("p0" in l or "critical" in l for l in labels) + is_p1 = any("p1" in l or "high" in l for l in labels) + + issue_summary = { + "number": issue.get("number"), + "title": issue.get("title", "Untitled")[:60], + "url": issue.get("html_url", ""), + } + + if is_p0: + p0_issues.append(issue_summary) + elif is_p1: + p1_issues.append(issue_summary) + elif any(cl.lower() in labels for cl in critical_labels): + other_critical.append(issue_summary) + + all_critical = p0_issues + p1_issues + other_critical + + return IssueSignal( + count=len(all_critical), + p0_count=len(p0_issues), + p1_count=len(p1_issues), + issues=all_critical[:10], # Limit stored issues + ) + + except (HTTPError, URLError) as exc: + return IssueSignal( + count=0, + p0_count=0, + p1_count=0, + issues=[], + ) + + +def check_flakiness(config: dict) -> FlakinessSignal: + """Check test flakiness from cycle retrospective data.""" + retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" + lookback = config.get("flakiness_lookback_cycles", 20) + + if not retro_file.exists(): + return FlakinessSignal( + status="unknown", + recent_failures=0, + recent_cycles=0, + failure_rate=0.0, + message="No cycle data available", + ) + + try: + entries = [] + for line in retro_file.read_text().strip().splitlines(): + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + # Get recent entries + recent = entries[-lookback:] if len(entries) > lookback else entries + + failures = [e for e in recent if not e.get("success", True)] + failure_count = len(failures) + total_count = len(recent) + + if total_count == 0: + return FlakinessSignal( + status="unknown", + recent_failures=0, + recent_cycles=0, + failure_rate=0.0, + message="No recent cycle data", + ) + + failure_rate = failure_count / total_count + + # Determine status based on failure rate + if failure_rate < 0.1: + status = "healthy" + message = f"Low flakiness ({failure_rate:.0%})" + elif failure_rate < 0.3: + status = "degraded" + message = f"Moderate flakiness ({failure_rate:.0%})" + else: + status = "critical" + message = f"High flakiness ({failure_rate:.0%})" + + return FlakinessSignal( + status=status, + recent_failures=failure_count, + recent_cycles=total_count, + failure_rate=failure_rate, + message=message, + ) + + except (OSError, ValueError) as exc: + return FlakinessSignal( + status="unknown", + recent_failures=0, + recent_cycles=0, + failure_rate=0.0, + message=f"Could not read cycle data: {exc}", + ) + + +def check_token_economy(config: dict) -> TokenEconomySignal: + """Check token economy temperature from recent transactions.""" + # This is a simplified check - in a full implementation, + # this would query the token ledger + ledger_file = REPO_ROOT / ".loop" / "token_economy.jsonl" + + if not ledger_file.exists(): + return TokenEconomySignal( + status="unknown", + message="No token economy data", + ) + + try: + # Read last 24 hours of transactions + since = datetime.now(timezone.utc) - timedelta(hours=24) + + recent_mint = 0 + recent_burn = 0 + + for line in ledger_file.read_text().strip().splitlines(): + try: + tx = json.loads(line) + tx_time = datetime.fromisoformat(tx.get("timestamp", "1970-01-01").replace("Z", "+00:00")) + if tx_time >= since: + delta = tx.get("delta", 0) + if delta > 0: + recent_mint += delta + else: + recent_burn += abs(delta) + except (json.JSONDecodeError, ValueError): + continue + + # Simple temperature check + if recent_mint > recent_burn * 2: + status = "inflationary" + message = f"High mint activity (+{recent_mint}/-{recent_burn})" + elif recent_burn > recent_mint * 2: + status = "deflationary" + message = f"High burn activity (+{recent_mint}/-{recent_burn})" + else: + status = "balanced" + message = f"Balanced flow (+{recent_mint}/-{recent_burn})" + + return TokenEconomySignal( + status=status, + message=message, + recent_mint=recent_mint, + recent_burn=recent_burn, + ) + + except (OSError, ValueError) as exc: + return TokenEconomySignal( + status="unknown", + message=f"Could not read token data: {exc}", + ) + + +def calculate_overall_status( + ci: CISignal, + issues: IssueSignal, + flakiness: FlakinessSignal, +) -> str: + """Calculate overall status from individual signals.""" + # Red conditions + if ci.status == "fail": + return "red" + if issues.p0_count > 0: + return "red" + if flakiness.status == "critical": + return "red" + + # Yellow conditions + if ci.status == "unknown": + return "yellow" + if issues.p1_count > 0: + return "yellow" + if flakiness.status == "degraded": + return "yellow" + + # Green + return "green" + + +# ── Main Functions ──────────────────────────────────────────────────────── + +def generate_snapshot(config: dict, token: str | None) -> HealthSnapshot: + """Generate a complete health snapshot.""" + client = GiteaClient(config, token) + + # Always run all checks (don't short-circuit) + if client.is_available(): + ci = check_ci_status(client, config) + issues = check_critical_issues(client, config) + else: + ci = CISignal( + status="unavailable", + message="Gitea unavailable", + ) + issues = IssueSignal(count=0, p0_count=0, p1_count=0, issues=[]) + + flakiness = check_flakiness(config) + tokens = check_token_economy(config) + + overall = calculate_overall_status(ci, issues, flakiness) + + return HealthSnapshot( + timestamp=datetime.now(timezone.utc).isoformat(), + overall_status=overall, + ci=ci, + issues=issues, + flakiness=flakiness, + tokens=tokens, + ) + + +def print_snapshot(snapshot: HealthSnapshot, verbose: bool = False) -> None: + """Print a formatted health snapshot.""" + # Status emoji + status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get( + snapshot.overall_status, "⚪" + ) + + print("=" * 60) + print(f"{status_emoji} HEALTH SNAPSHOT") + print("=" * 60) + print(f"Generated: {snapshot.timestamp}") + print(f"Overall: {snapshot.overall_status.upper()}") + print() + + # CI Status + ci_emoji = {"pass": "✅", "fail": "❌", "unknown": "⚠️", "unavailable": "⚪"}.get( + snapshot.ci.status, "⚪" + ) + print(f"{ci_emoji} CI: {snapshot.ci.message}") + + # Issues + if snapshot.issues.p0_count > 0: + issue_emoji = "🔴" + elif snapshot.issues.p1_count > 0: + issue_emoji = "🟡" + else: + issue_emoji = "✅" + print(f"{issue_emoji} Issues: {snapshot.issues.count} critical") + if snapshot.issues.p0_count > 0: + print(f" 🔴 P0: {snapshot.issues.p0_count}") + if snapshot.issues.p1_count > 0: + print(f" 🟡 P1: {snapshot.issues.p1_count}") + + # Flakiness + flak_emoji = {"healthy": "✅", "degraded": "🟡", "critical": "🔴", "unknown": "⚪"}.get( + snapshot.flakiness.status, "⚪" + ) + print(f"{flak_emoji} Flakiness: {snapshot.flakiness.message}") + + # Token Economy + token_emoji = {"balanced": "✅", "inflationary": "🟡", "deflationary": "🔵", "unknown": "⚪"}.get( + snapshot.tokens.status, "⚪" + ) + print(f"{token_emoji} Tokens: {snapshot.tokens.message}") + + # Verbose: show issue details + if verbose and snapshot.issues.issues: + print() + print("Critical Issues:") + for issue in snapshot.issues.issues[:5]: + print(f" #{issue['number']}: {issue['title'][:50]}") + + print() + print("─" * 60) + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Quick health snapshot before coding", + ) + p.add_argument( + "--json", "-j", + action="store_true", + help="Output as JSON", + ) + p.add_argument( + "--verbose", "-v", + action="store_true", + help="Show verbose output including issue details", + ) + p.add_argument( + "--quiet", "-q", + action="store_true", + help="Only show status line (no details)", + ) + return p.parse_args() + + +def main() -> int: + """Main entry point for CLI.""" + args = parse_args() + config = load_config() + token = get_token(config) + + snapshot = generate_snapshot(config, token) + + if args.json: + print(json.dumps(snapshot.to_dict(), indent=2)) + elif args.quiet: + status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get( + snapshot.overall_status, "⚪" + ) + print(f"{status_emoji} {snapshot.overall_status.upper()}") + else: + print_snapshot(snapshot, verbose=args.verbose) + + # Exit with non-zero if red status + return 0 if snapshot.overall_status != "red" else 1 + + +if __name__ == "__main__": + sys.exit(main())