Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
cb46d56147 feat: add autonomous incident creation scaffold (#553)
Some checks are pending
Smoke Test / smoke (pull_request) Waiting to run
2026-04-15 00:40:14 -04:00
2 changed files with 460 additions and 0 deletions

View File

@@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""Create or refresh fleet incidents on Gitea from local infrastructure signals.
Refs: timmy-home #553
"""
from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from urllib import request
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_REPO = "timmy-home"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_FAILOVER_STATUS = Path.home() / ".timmy" / "failover_status.json"
DEFAULT_RESTART_STATE_DIR = Path("/var/lib/timmy/restarts")
DEFAULT_HEARTBEAT_FILE = Path("/var/lib/timmy/heartbeats/fleet_health.last")
@dataclass(frozen=True)
class Incident:
fingerprint: str
title: str
body: str
def latest_evidence(self) -> str:
lines = [line for line in self.body.splitlines() if line.strip()]
if lines and lines[0].startswith("Fingerprint: "):
lines = lines[1:]
return "\n".join(lines).strip()
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.repo = repo
self.base_url = base_url.rstrip("/")
def _request(self, path: str, *, method: str = "GET", data: dict | None = None):
payload = None if data is None else json.dumps(data).encode()
headers = {"Authorization": f"token {self.token}"}
if payload is not None:
headers["Content-Type"] = "application/json"
req = request.Request(f"{self.base_url}{path}", data=payload, headers=headers, method=method)
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_open_issues(self):
issues = self._request(f"/repos/{self.owner}/{self.repo}/issues?state=open&limit=100")
return [issue for issue in issues if not issue.get("pull_request")]
def create_issue(self, title: str, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues",
method="POST",
data={"title": title, "body": body},
)
def comment_issue(self, issue_number: int, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments",
method="POST",
data={"body": body},
)
def load_json(path: Path):
if not path.exists():
return None
return json.loads(path.read_text())
def load_restart_counts(state_dir: Path) -> dict[str, int]:
if not state_dir.exists():
return {}
counts: dict[str, int] = {}
for path in sorted(state_dir.glob("*.count")):
try:
counts[path.stem] = int(path.read_text().strip())
except ValueError:
continue
return counts
def heartbeat_is_stale(path: Path, *, now: datetime | None = None, max_age_seconds: int = 900) -> bool:
if now is None:
now = datetime.now(timezone.utc)
if not path.exists():
return True
age = now.timestamp() - path.stat().st_mtime
return age > max_age_seconds
def _iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _build_body(fingerprint: str, *details: str) -> str:
detail_lines = [detail for detail in details if detail]
return "\n".join([f"Fingerprint: {fingerprint}", *detail_lines])
def build_incidents(
*,
failover_status: dict | None,
restart_counts: dict[str, int],
heartbeat_stale: bool,
now: datetime | None = None,
restart_escalation_threshold: int = 3,
) -> list[Incident]:
if now is None:
now = datetime.now(timezone.utc)
incidents: list[Incident] = []
failover_timestamp = None
fleet = {}
if failover_status:
failover_timestamp = failover_status.get("timestamp")
fleet = failover_status.get("fleet") or {}
for host, status in sorted(fleet.items()):
if str(status).upper() == "ONLINE":
continue
fingerprint = f"host-offline:{host}"
failover_detail = f"Failover status timestamp: {failover_timestamp}" if failover_timestamp is not None else "Failover status timestamp: unknown"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Fleet host offline: {host}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
failover_detail,
f"Host `{host}` reported `{status}` by failover monitor.",
),
)
)
for process_name, count in sorted(restart_counts.items()):
if count <= restart_escalation_threshold:
continue
fingerprint = f"restart-escalation:{process_name}"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Restart escalation: {process_name}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
f"Process `{process_name}` has crossed the restart escalation threshold with count={count}.",
),
)
)
if heartbeat_stale:
fingerprint = "probe-stale:fleet-health"
incidents.append(
Incident(
fingerprint=fingerprint,
title="[AUTO] Fleet health probe stale",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
"Heartbeat missing or older than the configured fleet health maximum age.",
),
)
)
return incidents
def find_matching_issue(incident: Incident, open_issues: Iterable[dict]) -> dict | None:
for issue in open_issues:
haystack = "\n".join([issue.get("title") or "", issue.get("body") or ""])
if incident.fingerprint in haystack or incident.title == issue.get("title"):
return issue
return None
def build_repeat_comment(incident: Incident) -> str:
return (
"Autonomous infrastructure detector saw the same incident again.\n\n"
f"Fingerprint: {incident.fingerprint}\n\n"
f"Latest evidence:\n{incident.latest_evidence()}"
)
def sync_incidents(
incidents: Iterable[Incident],
client: GiteaClient,
*,
apply: bool = False,
comment_existing: bool = True,
):
open_issues = list(client.list_open_issues())
results = []
for incident in incidents:
existing = find_matching_issue(incident, open_issues)
if existing:
action = "existing"
if apply and comment_existing:
client.comment_issue(existing["number"], build_repeat_comment(incident))
action = "commented"
results.append(
{
"action": action,
"fingerprint": incident.fingerprint,
"issue_number": existing["number"],
"title": existing.get("title"),
}
)
continue
if apply:
created = client.create_issue(incident.title, incident.body)
open_issues.append(created)
results.append(
{
"action": "created",
"fingerprint": incident.fingerprint,
"issue_number": created["number"],
"title": created.get("title"),
}
)
else:
results.append(
{
"action": "would_create",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
)
return results
def parse_args():
parser = argparse.ArgumentParser(description="Create or refresh fleet incidents on Gitea from local infrastructure signals.")
parser.add_argument("--owner", default=DEFAULT_OWNER)
parser.add_argument("--repo", default=DEFAULT_REPO)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE)
parser.add_argument("--failover-status", type=Path, default=DEFAULT_FAILOVER_STATUS)
parser.add_argument("--restart-state-dir", type=Path, default=DEFAULT_RESTART_STATE_DIR)
parser.add_argument("--heartbeat-file", type=Path, default=DEFAULT_HEARTBEAT_FILE)
parser.add_argument("--heartbeat-max-age-seconds", type=int, default=900)
parser.add_argument("--restart-escalation-threshold", type=int, default=3)
parser.add_argument("--apply", action="store_true", help="Create/comment issues instead of reporting what would happen.")
parser.add_argument("--no-comment-existing", action="store_true", help="Do not comment on existing matching issues.")
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output.")
return parser.parse_args()
def main():
args = parse_args()
now = datetime.now(timezone.utc)
failover_status = load_json(args.failover_status)
restart_counts = load_restart_counts(args.restart_state_dir)
heartbeat_stale = heartbeat_is_stale(
args.heartbeat_file,
now=now,
max_age_seconds=args.heartbeat_max_age_seconds,
)
incidents = build_incidents(
failover_status=failover_status,
restart_counts=restart_counts,
heartbeat_stale=heartbeat_stale,
now=now,
restart_escalation_threshold=args.restart_escalation_threshold,
)
payload = {
"generated_at": _iso(now),
"incidents": [incident.__dict__ for incident in incidents],
"results": [],
}
token = None
if args.token_file.exists():
token = args.token_file.read_text().strip()
if args.apply and not token:
raise SystemExit(f"Token file not found: {args.token_file}")
if token:
client = GiteaClient(token=token, owner=args.owner, repo=args.repo, base_url=args.base_url)
payload["results"] = sync_incidents(
incidents,
client,
apply=args.apply,
comment_existing=not args.no_comment_existing,
)
else:
payload["results"] = [
{
"action": "local_only",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
for incident in incidents
]
if args.json:
print(json.dumps(payload, indent=2))
else:
print(f"Generated at: {payload['generated_at']}")
if not incidents:
print("No autonomous infrastructure incidents detected.")
for incident in incidents:
print(f"- {incident.title} [{incident.fingerprint}]")
for result in payload["results"]:
print(f" -> {result['action']}: {result['title']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,131 @@
from datetime import datetime, timezone
from pathlib import Path
import pytest
from scripts.autonomous_issue_creator import (
Incident,
build_incidents,
heartbeat_is_stale,
load_restart_counts,
sync_incidents,
)
class FakeGiteaClient:
def __init__(self, open_issues=None):
self._open_issues = list(open_issues or [])
self.created = []
self.commented = []
def list_open_issues(self):
return list(self._open_issues)
def create_issue(self, title, body):
issue = {"number": 100 + len(self.created), "title": title, "body": body}
self.created.append(issue)
return issue
def comment_issue(self, issue_number, body):
self.commented.append({"issue_number": issue_number, "body": body})
def test_load_restart_counts_reads_only_count_files(tmp_path):
(tmp_path / "act_runner.count").write_text("4\n")
(tmp_path / "worker.count").write_text("2\n")
(tmp_path / "notes.txt").write_text("ignore me")
(tmp_path / "bad.count").write_text("not-an-int")
counts = load_restart_counts(tmp_path)
assert counts == {"act_runner": 4, "worker": 2}
def test_heartbeat_is_stale_handles_missing_and_old_files(tmp_path):
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
missing = heartbeat_is_stale(tmp_path / "missing.last", now=now, max_age_seconds=900)
assert missing is True
heartbeat = tmp_path / "fleet_health.last"
heartbeat.write_text("")
old = now.timestamp() - 1800
recent = now.timestamp() - 60
heartbeat.touch()
os = __import__("os")
os.utime(heartbeat, (old, old))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is True
os.utime(heartbeat, (recent, recent))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is False
def test_build_incidents_captures_offline_hosts_restart_escalations_and_stale_probe():
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
failover_status = {
"timestamp": 1713148800.0,
"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"},
}
incidents = build_incidents(
failover_status=failover_status,
restart_counts={"act_runner": 4, "worker": 2},
heartbeat_stale=True,
now=now,
restart_escalation_threshold=3,
)
fingerprints = {incident.fingerprint for incident in incidents}
assert fingerprints == {
"host-offline:bezalel",
"restart-escalation:act_runner",
"probe-stale:fleet-health",
}
titles = {incident.title for incident in incidents}
assert "[AUTO] Fleet host offline: bezalel" in titles
assert "[AUTO] Restart escalation: act_runner" in titles
assert "[AUTO] Fleet health probe stale" in titles
def test_sync_incidents_reuses_open_issues_and_creates_missing_ones():
client = FakeGiteaClient(
open_issues=[
{
"number": 71,
"title": "[AUTO] Fleet host offline: bezalel",
"body": "Fingerprint: host-offline:bezalel\n",
}
]
)
incidents = [
Incident(
fingerprint="host-offline:bezalel",
title="[AUTO] Fleet host offline: bezalel",
body="Fingerprint: host-offline:bezalel\nHost unreachable",
),
Incident(
fingerprint="probe-stale:fleet-health",
title="[AUTO] Fleet health probe stale",
body="Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
),
]
results = sync_incidents(incidents, client, apply=True, comment_existing=True)
assert [result["action"] for result in results] == ["commented", "created"]
assert client.commented == [
{
"issue_number": 71,
"body": "Autonomous infrastructure detector saw the same incident again.\n\nFingerprint: host-offline:bezalel\n\nLatest evidence:\nHost unreachable",
}
]
assert client.created == [
{
"number": 100,
"title": "[AUTO] Fleet health probe stale",
"body": "Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
}
]