1
0

[kimi] Implement quick health snapshot before coding (#710) (#828)

This commit is contained in:
2026-03-21 21:53:40 +00:00
parent dfe40f5fe6
commit 2f15435fed
6 changed files with 1136 additions and 0 deletions

View File

@@ -275,3 +275,54 @@ async def component_status():
},
"timestamp": datetime.now(UTC).isoformat(),
}
@router.get("/health/snapshot")
async def health_snapshot():
"""Quick health snapshot before coding.
Returns a concise status summary including:
- CI pipeline status (pass/fail/unknown)
- Critical issues count (P0/P1)
- Test flakiness rate
- Token economy temperature
Fast execution (< 5 seconds) for pre-work checks.
Refs: #710
"""
import sys
from pathlib import Path
# Import the health snapshot module
snapshot_path = Path(settings.repo_root) / "timmy_automations" / "daily_run"
if str(snapshot_path) not in sys.path:
sys.path.insert(0, str(snapshot_path))
try:
from health_snapshot import generate_snapshot, get_token, load_config
config = load_config()
token = get_token(config)
# Run the health snapshot (in thread to avoid blocking)
snapshot = await asyncio.to_thread(generate_snapshot, config, token)
return snapshot.to_dict()
except Exception as exc:
logger.warning("Health snapshot failed: %s", exc)
# Return graceful fallback
return {
"timestamp": datetime.now(UTC).isoformat(),
"overall_status": "unknown",
"error": str(exc),
"ci": {"status": "unknown", "message": "Snapshot failed"},
"issues": {"count": 0, "p0_count": 0, "p1_count": 0, "issues": []},
"flakiness": {
"status": "unknown",
"recent_failures": 0,
"recent_cycles": 0,
"failure_rate": 0.0,
"message": "Snapshot failed",
},
"tokens": {"status": "unknown", "message": "Snapshot failed"},
}

View File

@@ -489,5 +489,43 @@ def focus(
typer.echo("No active focus (broad mode).")
@app.command(name="healthcheck")
def healthcheck(
json_output: bool = typer.Option(False, "--json", "-j", help="Output as JSON"),
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Show verbose output including issue details"
),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"),
):
"""Quick health snapshot before coding.
Shows CI status, critical issues (P0/P1), test flakiness, and token economy.
Fast execution (< 5 seconds) for pre-work checks.
Refs: #710
"""
import subprocess
import sys
from pathlib import Path
script_path = (
Path(__file__).resolve().parent.parent.parent
/ "timmy_automations"
/ "daily_run"
/ "health_snapshot.py"
)
cmd = [sys.executable, str(script_path)]
if json_output:
cmd.append("--json")
if verbose:
cmd.append("--verbose")
if quiet:
cmd.append("--quiet")
result = subprocess.run(cmd)
raise typer.Exit(result.returncode)
def main():
app()

View File

@@ -130,6 +130,13 @@ class TestAPIEndpoints:
r = client.get("/health/sovereignty")
assert r.status_code == 200
def test_health_snapshot(self, client):
r = client.get("/health/snapshot")
assert r.status_code == 200
data = r.json()
assert "overall_status" in data
assert data["overall_status"] in ["green", "yellow", "red", "unknown"]
def test_queue_status(self, client):
r = client.get("/api/queue/status")
assert r.status_code == 200
@@ -186,6 +193,7 @@ class TestNo500:
"/health",
"/health/status",
"/health/sovereignty",
"/health/snapshot",
"/health/components",
"/agents/default/panel",
"/agents/default/history",

View File

@@ -0,0 +1,401 @@
"""Tests for health_snapshot module."""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import patch
# Add timmy_automations to path for imports
sys.path.insert(
0, str(Path(__file__).resolve().parent.parent.parent / "timmy_automations" / "daily_run")
)
from datetime import UTC
import health_snapshot as hs
class TestLoadConfig:
"""Test configuration loading."""
def test_loads_default_config(self):
"""Load default configuration."""
config = hs.load_config()
assert "gitea_api" in config
assert "repo_slug" in config
assert "critical_labels" in config
assert "flakiness_lookback_cycles" in config
def test_environment_overrides(self, monkeypatch):
"""Environment variables override defaults."""
monkeypatch.setenv("TIMMY_GITEA_API", "http://test:3000/api/v1")
monkeypatch.setenv("TIMMY_REPO_SLUG", "test/repo")
config = hs.load_config()
assert config["gitea_api"] == "http://test:3000/api/v1"
assert config["repo_slug"] == "test/repo"
class TestGetToken:
"""Test token retrieval."""
def test_returns_config_token(self):
"""Return token from config if present."""
config = {"token": "test-token-123"}
token = hs.get_token(config)
assert token == "test-token-123"
def test_reads_from_file(self, tmp_path, monkeypatch):
"""Read token from file if no config token."""
token_file = tmp_path / "gitea_token"
token_file.write_text("file-token-456")
config = {"token_file": str(token_file)}
token = hs.get_token(config)
assert token == "file-token-456"
def test_returns_none_when_no_token(self):
"""Return None when no token available."""
config = {"token_file": "/nonexistent/path"}
token = hs.get_token(config)
assert token is None
class TestCISignal:
"""Test CISignal dataclass."""
def test_default_details(self):
"""Details defaults to empty dict."""
signal = hs.CISignal(status="pass", message="CI passing")
assert signal.details == {}
def test_with_details(self):
"""Can include details."""
signal = hs.CISignal(status="pass", message="CI passing", details={"sha": "abc123"})
assert signal.details["sha"] == "abc123"
class TestIssueSignal:
"""Test IssueSignal dataclass."""
def test_default_issues_list(self):
"""Issues defaults to empty list."""
signal = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
assert signal.issues == []
def test_with_issues(self):
"""Can include issues."""
issues = [{"number": 1, "title": "Test"}]
signal = hs.IssueSignal(count=1, p0_count=1, p1_count=0, issues=issues)
assert len(signal.issues) == 1
class TestFlakinessSignal:
"""Test FlakinessSignal dataclass."""
def test_calculated_fields(self):
"""All fields set correctly."""
signal = hs.FlakinessSignal(
status="healthy",
recent_failures=2,
recent_cycles=20,
failure_rate=0.1,
message="Low flakiness",
)
assert signal.status == "healthy"
assert signal.recent_failures == 2
assert signal.failure_rate == 0.1
class TestHealthSnapshot:
"""Test HealthSnapshot dataclass."""
def test_to_dict_structure(self):
"""to_dict produces expected structure."""
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="green",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=0, p0_count=0, p1_count=0),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
data = snapshot.to_dict()
assert data["timestamp"] == "2026-01-01T00:00:00+00:00"
assert data["overall_status"] == "green"
assert "ci" in data
assert "issues" in data
assert "flakiness" in data
assert "tokens" in data
def test_to_dict_limits_issues(self):
"""to_dict limits issues to 5."""
many_issues = [{"number": i, "title": f"Issue {i}"} for i in range(10)]
snapshot = hs.HealthSnapshot(
timestamp="2026-01-01T00:00:00+00:00",
overall_status="green",
ci=hs.CISignal(status="pass", message="CI passing"),
issues=hs.IssueSignal(count=10, p0_count=5, p1_count=5, issues=many_issues),
flakiness=hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
),
tokens=hs.TokenEconomySignal(status="balanced", message="Balanced"),
)
data = snapshot.to_dict()
assert len(data["issues"]["issues"]) == 5
class TestCalculateOverallStatus:
"""Test overall status calculation."""
def test_green_when_all_healthy(self):
"""Status is green when all signals healthy."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "green"
def test_red_when_ci_fails(self):
"""Status is red when CI fails."""
ci = hs.CISignal(status="fail", message="CI failed")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
def test_red_when_p0_issues(self):
"""Status is red when P0 issues exist."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=1, p0_count=1, p1_count=0)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
def test_yellow_when_p1_issues(self):
"""Status is yellow when P1 issues exist."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=1, p0_count=0, p1_count=1)
flakiness = hs.FlakinessSignal(
status="healthy",
recent_failures=0,
recent_cycles=10,
failure_rate=0.0,
message="All good",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "yellow"
def test_yellow_when_flakiness_degraded(self):
"""Status is yellow when flakiness degraded."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="degraded",
recent_failures=5,
recent_cycles=20,
failure_rate=0.25,
message="Moderate flakiness",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "yellow"
def test_red_when_flakiness_critical(self):
"""Status is red when flakiness critical."""
ci = hs.CISignal(status="pass", message="CI passing")
issues = hs.IssueSignal(count=0, p0_count=0, p1_count=0)
flakiness = hs.FlakinessSignal(
status="critical",
recent_failures=10,
recent_cycles=20,
failure_rate=0.5,
message="High flakiness",
)
status = hs.calculate_overall_status(ci, issues, flakiness)
assert status == "red"
class TestCheckFlakiness:
"""Test flakiness checking."""
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
"""Return unknown when no cycle data exists."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
config = {"flakiness_lookback_cycles": 20}
signal = hs.check_flakiness(config)
assert signal.status == "unknown"
assert signal.message == "No cycle data available"
def test_calculates_failure_rate(self, tmp_path, monkeypatch):
"""Calculate failure rate from cycle data."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
retro_dir = tmp_path / ".loop" / "retro"
retro_dir.mkdir(parents=True)
cycles = [
json.dumps({"success": True, "cycle": 1}),
json.dumps({"success": True, "cycle": 2}),
json.dumps({"success": False, "cycle": 3}),
json.dumps({"success": True, "cycle": 4}),
json.dumps({"success": False, "cycle": 5}),
]
retro_file = retro_dir / "cycles.jsonl"
retro_file.write_text("\n".join(cycles))
config = {"flakiness_lookback_cycles": 20}
signal = hs.check_flakiness(config)
assert signal.recent_cycles == 5
assert signal.recent_failures == 2
assert signal.failure_rate == 0.4
assert signal.status == "critical" # 40% > 30%
class TestCheckTokenEconomy:
"""Test token economy checking."""
def test_no_data_returns_unknown(self, tmp_path, monkeypatch):
"""Return unknown when no token data exists."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
config = {}
signal = hs.check_token_economy(config)
assert signal.status == "unknown"
def test_calculates_balanced(self, tmp_path, monkeypatch):
"""Detect balanced token economy."""
monkeypatch.setattr(hs, "REPO_ROOT", tmp_path)
loop_dir = tmp_path / ".loop"
loop_dir.mkdir(parents=True)
from datetime import datetime
now = datetime.now(UTC).isoformat()
transactions = [
json.dumps({"timestamp": now, "delta": 10}),
json.dumps({"timestamp": now, "delta": -5}),
]
ledger_file = loop_dir / "token_economy.jsonl"
ledger_file.write_text("\n".join(transactions))
config = {}
signal = hs.check_token_economy(config)
assert signal.status == "balanced"
assert signal.recent_mint == 10
assert signal.recent_burn == 5
class TestGiteaClient:
"""Test Gitea API client."""
def test_initialization(self):
"""Initialize with config and token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, "token123")
assert client.api_base == "http://test:3000/api/v1"
assert client.repo_slug == "test/repo"
assert client.token == "token123"
def test_headers_with_token(self):
"""Include authorization header with token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, "token123")
headers = client._headers()
assert headers["Authorization"] == "token token123"
assert headers["Accept"] == "application/json"
def test_headers_without_token(self):
"""No authorization header without token."""
config = {"gitea_api": "http://test:3000/api/v1", "repo_slug": "test/repo"}
client = hs.GiteaClient(config, None)
headers = client._headers()
assert "Authorization" not in headers
assert headers["Accept"] == "application/json"
class TestGenerateSnapshot:
"""Test snapshot generation."""
def test_returns_snapshot(self):
"""Generate a complete snapshot."""
config = hs.load_config()
with (
patch.object(hs.GiteaClient, "is_available", return_value=False),
patch.object(hs.GiteaClient, "__init__", return_value=None),
):
snapshot = hs.generate_snapshot(config, None)
assert isinstance(snapshot, hs.HealthSnapshot)
assert snapshot.overall_status in ["green", "yellow", "red", "unknown"]
assert snapshot.ci is not None
assert snapshot.issues is not None
assert snapshot.flakiness is not None
assert snapshot.tokens is not None

View File

@@ -1,6 +1,9 @@
{
"version": "1.0.0",
"description": "Master manifest of all Timmy automations",
"_health_snapshot": {
"note": "Quick health check before coding — CI, P0/P1 issues, flakiness"
},
"last_updated": "2026-03-21",
"automations": [
{
@@ -249,6 +252,22 @@
".loop/weekly_narrative.json",
".loop/weekly_narrative.md"
]
},
{
"id": "health_snapshot",
"name": "Health Snapshot",
"description": "Quick health check before coding — CI status, P0/P1 issues, test flakiness, token economy",
"script": "timmy_automations/daily_run/health_snapshot.py",
"category": "daily_run",
"enabled": true,
"trigger": "pre_cycle",
"executable": "python3",
"config": {
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
"flakiness_lookback_cycles": 20,
"ci_timeout_seconds": 5
},
"outputs": []
}
]
}

View File

@@ -0,0 +1,619 @@
#!/usr/bin/env python3
"""Quick health snapshot before coding — checks CI, issues, flakiness.
A fast status check that shows major red/green signals before deeper work.
Runs in a few seconds and produces a concise summary.
Run: python3 timmy_automations/daily_run/health_snapshot.py
Env: GITEA_API, GITEA_TOKEN, REPO_SLUG
Refs: #710
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# ── Configuration ─────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
DEFAULT_CONFIG = {
"gitea_api": "http://localhost:3000/api/v1",
"repo_slug": "rockachopa/Timmy-time-dashboard",
"token_file": "~/.hermes/gitea_token",
"critical_labels": ["P0", "P1", "priority/critical", "priority/high"],
"flakiness_lookback_cycles": 20,
"ci_timeout_seconds": 5,
}
def load_config() -> dict:
"""Load configuration with fallback to defaults."""
config = DEFAULT_CONFIG.copy()
# Environment variable overrides
if os.environ.get("TIMMY_GITEA_API"):
config["gitea_api"] = os.environ["TIMMY_GITEA_API"]
if os.environ.get("TIMMY_REPO_SLUG"):
config["repo_slug"] = os.environ["TIMMY_REPO_SLUG"]
if os.environ.get("TIMMY_GITEA_TOKEN"):
config["token"] = os.environ["TIMMY_GITEA_TOKEN"]
return config
def get_token(config: dict) -> str | None:
"""Get Gitea token from environment or file."""
if "token" in config:
return config["token"]
# Try timmy's token file
repo_root = Path(__file__).resolve().parent.parent.parent
timmy_token_path = repo_root / ".timmy_gitea_token"
if timmy_token_path.exists():
return timmy_token_path.read_text().strip()
# Fallback to legacy token file
token_file = Path(config["token_file"]).expanduser()
if token_file.exists():
return token_file.read_text().strip()
return None
# ── Gitea API Client ──────────────────────────────────────────────────────
class GiteaClient:
"""Simple Gitea API client with graceful degradation."""
def __init__(self, config: dict, token: str | None):
self.api_base = config["gitea_api"].rstrip("/")
self.repo_slug = config["repo_slug"]
self.token = token
self._available: bool | None = None
def _headers(self) -> dict:
headers = {"Accept": "application/json"}
if self.token:
headers["Authorization"] = f"token {self.token}"
return headers
def _api_url(self, path: str) -> str:
return f"{self.api_base}/repos/{self.repo_slug}/{path}"
def is_available(self) -> bool:
"""Check if Gitea API is reachable."""
if self._available is not None:
return self._available
try:
req = Request(
f"{self.api_base}/version",
headers=self._headers(),
method="GET",
)
with urlopen(req, timeout=3) as resp:
self._available = resp.status == 200
return self._available
except (HTTPError, URLError, TimeoutError):
self._available = False
return False
def get(self, path: str, params: dict | None = None) -> list | dict:
"""Make a GET request to the Gitea API."""
url = self._api_url(path)
if params:
query = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query}"
req = Request(url, headers=self._headers(), method="GET")
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
def get_paginated(self, path: str, params: dict | None = None) -> list:
"""Fetch all pages of a paginated endpoint."""
all_items = []
page = 1
limit = 50
while True:
page_params = {"limit": limit, "page": page}
if params:
page_params.update(params)
batch = self.get(path, page_params)
if not batch:
break
all_items.extend(batch)
if len(batch) < limit:
break
page += 1
return all_items
# ── Data Models ───────────────────────────────────────────────────────────
@dataclass
class CISignal:
"""CI pipeline status signal."""
status: str # "pass", "fail", "unknown", "unavailable"
message: str
details: dict[str, Any] = field(default_factory=dict)
@dataclass
class IssueSignal:
"""Critical issues signal."""
count: int
p0_count: int
p1_count: int
issues: list[dict[str, Any]] = field(default_factory=list)
@dataclass
class FlakinessSignal:
"""Test flakiness/error rate signal."""
status: str # "healthy", "degraded", "critical", "unknown"
recent_failures: int
recent_cycles: int
failure_rate: float
message: str
@dataclass
class TokenEconomySignal:
"""Token economy temperature indicator."""
status: str # "balanced", "inflationary", "deflationary", "unknown"
message: str
recent_mint: int = 0
recent_burn: int = 0
@dataclass
class HealthSnapshot:
"""Complete health snapshot."""
timestamp: str
overall_status: str # "green", "yellow", "red"
ci: CISignal
issues: IssueSignal
flakiness: FlakinessSignal
tokens: TokenEconomySignal
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"overall_status": self.overall_status,
"ci": {
"status": self.ci.status,
"message": self.ci.message,
"details": self.ci.details,
},
"issues": {
"count": self.issues.count,
"p0_count": self.issues.p0_count,
"p1_count": self.issues.p1_count,
"issues": self.issues.issues[:5], # Limit to 5
},
"flakiness": {
"status": self.flakiness.status,
"recent_failures": self.flakiness.recent_failures,
"recent_cycles": self.flakiness.recent_cycles,
"failure_rate": round(self.flakiness.failure_rate, 2),
"message": self.flakiness.message,
},
"tokens": {
"status": self.tokens.status,
"message": self.tokens.message,
"recent_mint": self.tokens.recent_mint,
"recent_burn": self.tokens.recent_burn,
},
}
# ── Health Check Functions ────────────────────────────────────────────────
def check_ci_status(client: GiteaClient, config: dict) -> CISignal:
"""Check CI pipeline status from recent commits."""
try:
# Get recent commits with status
commits = client.get_paginated("commits", {"limit": 5})
if not commits:
return CISignal(
status="unknown",
message="No recent commits found",
)
# Check status for most recent commit
latest = commits[0]
sha = latest.get("sha", "")
try:
statuses = client.get(f"commits/{sha}/status")
state = statuses.get("state", "unknown")
if state == "success":
return CISignal(
status="pass",
message="CI passing",
details={"sha": sha[:8], "state": state},
)
elif state in ("failure", "error"):
return CISignal(
status="fail",
message=f"CI failed ({state})",
details={"sha": sha[:8], "state": state},
)
elif state == "pending":
return CISignal(
status="unknown",
message="CI pending",
details={"sha": sha[:8], "state": state},
)
else:
return CISignal(
status="unknown",
message=f"CI status: {state}",
details={"sha": sha[:8], "state": state},
)
except (HTTPError, URLError) as exc:
return CISignal(
status="unknown",
message=f"Could not fetch CI status: {exc}",
)
except (HTTPError, URLError) as exc:
return CISignal(
status="unavailable",
message=f"CI check failed: {exc}",
)
def check_critical_issues(client: GiteaClient, config: dict) -> IssueSignal:
"""Check for open P0/P1 issues."""
critical_labels = config.get("critical_labels", ["P0", "P1"])
try:
# Fetch open issues
issues = client.get_paginated("issues", {"state": "open", "limit": 100})
p0_issues = []
p1_issues = []
other_critical = []
for issue in issues:
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
# Check for P0/P1 labels
is_p0 = any("p0" in l or "critical" in l for l in labels)
is_p1 = any("p1" in l or "high" in l for l in labels)
issue_summary = {
"number": issue.get("number"),
"title": issue.get("title", "Untitled")[:60],
"url": issue.get("html_url", ""),
}
if is_p0:
p0_issues.append(issue_summary)
elif is_p1:
p1_issues.append(issue_summary)
elif any(cl.lower() in labels for cl in critical_labels):
other_critical.append(issue_summary)
all_critical = p0_issues + p1_issues + other_critical
return IssueSignal(
count=len(all_critical),
p0_count=len(p0_issues),
p1_count=len(p1_issues),
issues=all_critical[:10], # Limit stored issues
)
except (HTTPError, URLError) as exc:
return IssueSignal(
count=0,
p0_count=0,
p1_count=0,
issues=[],
)
def check_flakiness(config: dict) -> FlakinessSignal:
"""Check test flakiness from cycle retrospective data."""
retro_file = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
lookback = config.get("flakiness_lookback_cycles", 20)
if not retro_file.exists():
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message="No cycle data available",
)
try:
entries = []
for line in retro_file.read_text().strip().splitlines():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
# Get recent entries
recent = entries[-lookback:] if len(entries) > lookback else entries
failures = [e for e in recent if not e.get("success", True)]
failure_count = len(failures)
total_count = len(recent)
if total_count == 0:
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message="No recent cycle data",
)
failure_rate = failure_count / total_count
# Determine status based on failure rate
if failure_rate < 0.1:
status = "healthy"
message = f"Low flakiness ({failure_rate:.0%})"
elif failure_rate < 0.3:
status = "degraded"
message = f"Moderate flakiness ({failure_rate:.0%})"
else:
status = "critical"
message = f"High flakiness ({failure_rate:.0%})"
return FlakinessSignal(
status=status,
recent_failures=failure_count,
recent_cycles=total_count,
failure_rate=failure_rate,
message=message,
)
except (OSError, ValueError) as exc:
return FlakinessSignal(
status="unknown",
recent_failures=0,
recent_cycles=0,
failure_rate=0.0,
message=f"Could not read cycle data: {exc}",
)
def check_token_economy(config: dict) -> TokenEconomySignal:
"""Check token economy temperature from recent transactions."""
# This is a simplified check - in a full implementation,
# this would query the token ledger
ledger_file = REPO_ROOT / ".loop" / "token_economy.jsonl"
if not ledger_file.exists():
return TokenEconomySignal(
status="unknown",
message="No token economy data",
)
try:
# Read last 24 hours of transactions
since = datetime.now(timezone.utc) - timedelta(hours=24)
recent_mint = 0
recent_burn = 0
for line in ledger_file.read_text().strip().splitlines():
try:
tx = json.loads(line)
tx_time = datetime.fromisoformat(tx.get("timestamp", "1970-01-01").replace("Z", "+00:00"))
if tx_time >= since:
delta = tx.get("delta", 0)
if delta > 0:
recent_mint += delta
else:
recent_burn += abs(delta)
except (json.JSONDecodeError, ValueError):
continue
# Simple temperature check
if recent_mint > recent_burn * 2:
status = "inflationary"
message = f"High mint activity (+{recent_mint}/-{recent_burn})"
elif recent_burn > recent_mint * 2:
status = "deflationary"
message = f"High burn activity (+{recent_mint}/-{recent_burn})"
else:
status = "balanced"
message = f"Balanced flow (+{recent_mint}/-{recent_burn})"
return TokenEconomySignal(
status=status,
message=message,
recent_mint=recent_mint,
recent_burn=recent_burn,
)
except (OSError, ValueError) as exc:
return TokenEconomySignal(
status="unknown",
message=f"Could not read token data: {exc}",
)
def calculate_overall_status(
ci: CISignal,
issues: IssueSignal,
flakiness: FlakinessSignal,
) -> str:
"""Calculate overall status from individual signals."""
# Red conditions
if ci.status == "fail":
return "red"
if issues.p0_count > 0:
return "red"
if flakiness.status == "critical":
return "red"
# Yellow conditions
if ci.status == "unknown":
return "yellow"
if issues.p1_count > 0:
return "yellow"
if flakiness.status == "degraded":
return "yellow"
# Green
return "green"
# ── Main Functions ────────────────────────────────────────────────────────
def generate_snapshot(config: dict, token: str | None) -> HealthSnapshot:
"""Generate a complete health snapshot."""
client = GiteaClient(config, token)
# Always run all checks (don't short-circuit)
if client.is_available():
ci = check_ci_status(client, config)
issues = check_critical_issues(client, config)
else:
ci = CISignal(
status="unavailable",
message="Gitea unavailable",
)
issues = IssueSignal(count=0, p0_count=0, p1_count=0, issues=[])
flakiness = check_flakiness(config)
tokens = check_token_economy(config)
overall = calculate_overall_status(ci, issues, flakiness)
return HealthSnapshot(
timestamp=datetime.now(timezone.utc).isoformat(),
overall_status=overall,
ci=ci,
issues=issues,
flakiness=flakiness,
tokens=tokens,
)
def print_snapshot(snapshot: HealthSnapshot, verbose: bool = False) -> None:
"""Print a formatted health snapshot."""
# Status emoji
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print("=" * 60)
print(f"{status_emoji} HEALTH SNAPSHOT")
print("=" * 60)
print(f"Generated: {snapshot.timestamp}")
print(f"Overall: {snapshot.overall_status.upper()}")
print()
# CI Status
ci_emoji = {"pass": "", "fail": "", "unknown": "⚠️", "unavailable": ""}.get(
snapshot.ci.status, ""
)
print(f"{ci_emoji} CI: {snapshot.ci.message}")
# Issues
if snapshot.issues.p0_count > 0:
issue_emoji = "🔴"
elif snapshot.issues.p1_count > 0:
issue_emoji = "🟡"
else:
issue_emoji = ""
print(f"{issue_emoji} Issues: {snapshot.issues.count} critical")
if snapshot.issues.p0_count > 0:
print(f" 🔴 P0: {snapshot.issues.p0_count}")
if snapshot.issues.p1_count > 0:
print(f" 🟡 P1: {snapshot.issues.p1_count}")
# Flakiness
flak_emoji = {"healthy": "", "degraded": "🟡", "critical": "🔴", "unknown": ""}.get(
snapshot.flakiness.status, ""
)
print(f"{flak_emoji} Flakiness: {snapshot.flakiness.message}")
# Token Economy
token_emoji = {"balanced": "", "inflationary": "🟡", "deflationary": "🔵", "unknown": ""}.get(
snapshot.tokens.status, ""
)
print(f"{token_emoji} Tokens: {snapshot.tokens.message}")
# Verbose: show issue details
if verbose and snapshot.issues.issues:
print()
print("Critical Issues:")
for issue in snapshot.issues.issues[:5]:
print(f" #{issue['number']}: {issue['title'][:50]}")
print()
print("" * 60)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Quick health snapshot before coding",
)
p.add_argument(
"--json", "-j",
action="store_true",
help="Output as JSON",
)
p.add_argument(
"--verbose", "-v",
action="store_true",
help="Show verbose output including issue details",
)
p.add_argument(
"--quiet", "-q",
action="store_true",
help="Only show status line (no details)",
)
return p.parse_args()
def main() -> int:
"""Main entry point for CLI."""
args = parse_args()
config = load_config()
token = get_token(config)
snapshot = generate_snapshot(config, token)
if args.json:
print(json.dumps(snapshot.to_dict(), indent=2))
elif args.quiet:
status_emoji = {"green": "🟢", "yellow": "🟡", "red": "🔴"}.get(
snapshot.overall_status, ""
)
print(f"{status_emoji} {snapshot.overall_status.upper()}")
else:
print_snapshot(snapshot, verbose=args.verbose)
# Exit with non-zero if red status
return 0 if snapshot.overall_status != "red" else 1
if __name__ == "__main__":
sys.exit(main())