Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b4b20f62e |
@@ -1,329 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Create or refresh fleet incidents on Gitea from local infrastructure signals.
|
||||
|
||||
Refs: timmy-home #553
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
from urllib import request
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_REPO = "timmy-home"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_FAILOVER_STATUS = Path.home() / ".timmy" / "failover_status.json"
|
||||
DEFAULT_RESTART_STATE_DIR = Path("/var/lib/timmy/restarts")
|
||||
DEFAULT_HEARTBEAT_FILE = Path("/var/lib/timmy/heartbeats/fleet_health.last")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Incident:
|
||||
fingerprint: str
|
||||
title: str
|
||||
body: str
|
||||
|
||||
def latest_evidence(self) -> str:
|
||||
lines = [line for line in self.body.splitlines() if line.strip()]
|
||||
if lines and lines[0].startswith("Fingerprint: "):
|
||||
lines = lines[1:]
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def _request(self, path: str, *, method: str = "GET", data: dict | None = None):
|
||||
payload = None if data is None else json.dumps(data).encode()
|
||||
headers = {"Authorization": f"token {self.token}"}
|
||||
if payload is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = request.Request(f"{self.base_url}{path}", data=payload, headers=headers, method=method)
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
def list_open_issues(self):
|
||||
issues = self._request(f"/repos/{self.owner}/{self.repo}/issues?state=open&limit=100")
|
||||
return [issue for issue in issues if not issue.get("pull_request")]
|
||||
|
||||
def create_issue(self, title: str, body: str):
|
||||
return self._request(
|
||||
f"/repos/{self.owner}/{self.repo}/issues",
|
||||
method="POST",
|
||||
data={"title": title, "body": body},
|
||||
)
|
||||
|
||||
def comment_issue(self, issue_number: int, body: str):
|
||||
return self._request(
|
||||
f"/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments",
|
||||
method="POST",
|
||||
data={"body": body},
|
||||
)
|
||||
|
||||
|
||||
def load_json(path: Path):
|
||||
if not path.exists():
|
||||
return None
|
||||
return json.loads(path.read_text())
|
||||
|
||||
|
||||
def load_restart_counts(state_dir: Path) -> dict[str, int]:
|
||||
if not state_dir.exists():
|
||||
return {}
|
||||
|
||||
counts: dict[str, int] = {}
|
||||
for path in sorted(state_dir.glob("*.count")):
|
||||
try:
|
||||
counts[path.stem] = int(path.read_text().strip())
|
||||
except ValueError:
|
||||
continue
|
||||
return counts
|
||||
|
||||
|
||||
def heartbeat_is_stale(path: Path, *, now: datetime | None = None, max_age_seconds: int = 900) -> bool:
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
if not path.exists():
|
||||
return True
|
||||
age = now.timestamp() - path.stat().st_mtime
|
||||
return age > max_age_seconds
|
||||
|
||||
|
||||
def _iso(dt: datetime) -> str:
|
||||
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def _build_body(fingerprint: str, *details: str) -> str:
|
||||
detail_lines = [detail for detail in details if detail]
|
||||
return "\n".join([f"Fingerprint: {fingerprint}", *detail_lines])
|
||||
|
||||
|
||||
def build_incidents(
|
||||
*,
|
||||
failover_status: dict | None,
|
||||
restart_counts: dict[str, int],
|
||||
heartbeat_stale: bool,
|
||||
now: datetime | None = None,
|
||||
restart_escalation_threshold: int = 3,
|
||||
) -> list[Incident]:
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
incidents: list[Incident] = []
|
||||
failover_timestamp = None
|
||||
fleet = {}
|
||||
if failover_status:
|
||||
failover_timestamp = failover_status.get("timestamp")
|
||||
fleet = failover_status.get("fleet") or {}
|
||||
|
||||
for host, status in sorted(fleet.items()):
|
||||
if str(status).upper() == "ONLINE":
|
||||
continue
|
||||
fingerprint = f"host-offline:{host}"
|
||||
failover_detail = f"Failover status timestamp: {failover_timestamp}" if failover_timestamp is not None else "Failover status timestamp: unknown"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title=f"[AUTO] Fleet host offline: {host}",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
failover_detail,
|
||||
f"Host `{host}` reported `{status}` by failover monitor.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
for process_name, count in sorted(restart_counts.items()):
|
||||
if count <= restart_escalation_threshold:
|
||||
continue
|
||||
fingerprint = f"restart-escalation:{process_name}"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title=f"[AUTO] Restart escalation: {process_name}",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
f"Process `{process_name}` has crossed the restart escalation threshold with count={count}.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
if heartbeat_stale:
|
||||
fingerprint = "probe-stale:fleet-health"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title="[AUTO] Fleet health probe stale",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
"Heartbeat missing or older than the configured fleet health maximum age.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
return incidents
|
||||
|
||||
|
||||
def find_matching_issue(incident: Incident, open_issues: Iterable[dict]) -> dict | None:
|
||||
for issue in open_issues:
|
||||
haystack = "\n".join([issue.get("title") or "", issue.get("body") or ""])
|
||||
if incident.fingerprint in haystack or incident.title == issue.get("title"):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def build_repeat_comment(incident: Incident) -> str:
|
||||
return (
|
||||
"Autonomous infrastructure detector saw the same incident again.\n\n"
|
||||
f"Fingerprint: {incident.fingerprint}\n\n"
|
||||
f"Latest evidence:\n{incident.latest_evidence()}"
|
||||
)
|
||||
|
||||
|
||||
def sync_incidents(
|
||||
incidents: Iterable[Incident],
|
||||
client: GiteaClient,
|
||||
*,
|
||||
apply: bool = False,
|
||||
comment_existing: bool = True,
|
||||
):
|
||||
open_issues = list(client.list_open_issues())
|
||||
results = []
|
||||
|
||||
for incident in incidents:
|
||||
existing = find_matching_issue(incident, open_issues)
|
||||
if existing:
|
||||
action = "existing"
|
||||
if apply and comment_existing:
|
||||
client.comment_issue(existing["number"], build_repeat_comment(incident))
|
||||
action = "commented"
|
||||
results.append(
|
||||
{
|
||||
"action": action,
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": existing["number"],
|
||||
"title": existing.get("title"),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if apply:
|
||||
created = client.create_issue(incident.title, incident.body)
|
||||
open_issues.append(created)
|
||||
results.append(
|
||||
{
|
||||
"action": "created",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": created["number"],
|
||||
"title": created.get("title"),
|
||||
}
|
||||
)
|
||||
else:
|
||||
results.append(
|
||||
{
|
||||
"action": "would_create",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": None,
|
||||
"title": incident.title,
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Create or refresh fleet incidents on Gitea from local infrastructure signals.")
|
||||
parser.add_argument("--owner", default=DEFAULT_OWNER)
|
||||
parser.add_argument("--repo", default=DEFAULT_REPO)
|
||||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
||||
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE)
|
||||
parser.add_argument("--failover-status", type=Path, default=DEFAULT_FAILOVER_STATUS)
|
||||
parser.add_argument("--restart-state-dir", type=Path, default=DEFAULT_RESTART_STATE_DIR)
|
||||
parser.add_argument("--heartbeat-file", type=Path, default=DEFAULT_HEARTBEAT_FILE)
|
||||
parser.add_argument("--heartbeat-max-age-seconds", type=int, default=900)
|
||||
parser.add_argument("--restart-escalation-threshold", type=int, default=3)
|
||||
parser.add_argument("--apply", action="store_true", help="Create/comment issues instead of reporting what would happen.")
|
||||
parser.add_argument("--no-comment-existing", action="store_true", help="Do not comment on existing matching issues.")
|
||||
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
failover_status = load_json(args.failover_status)
|
||||
restart_counts = load_restart_counts(args.restart_state_dir)
|
||||
heartbeat_stale = heartbeat_is_stale(
|
||||
args.heartbeat_file,
|
||||
now=now,
|
||||
max_age_seconds=args.heartbeat_max_age_seconds,
|
||||
)
|
||||
incidents = build_incidents(
|
||||
failover_status=failover_status,
|
||||
restart_counts=restart_counts,
|
||||
heartbeat_stale=heartbeat_stale,
|
||||
now=now,
|
||||
restart_escalation_threshold=args.restart_escalation_threshold,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"generated_at": _iso(now),
|
||||
"incidents": [incident.__dict__ for incident in incidents],
|
||||
"results": [],
|
||||
}
|
||||
|
||||
token = None
|
||||
if args.token_file.exists():
|
||||
token = args.token_file.read_text().strip()
|
||||
|
||||
if args.apply and not token:
|
||||
raise SystemExit(f"Token file not found: {args.token_file}")
|
||||
|
||||
if token:
|
||||
client = GiteaClient(token=token, owner=args.owner, repo=args.repo, base_url=args.base_url)
|
||||
payload["results"] = sync_incidents(
|
||||
incidents,
|
||||
client,
|
||||
apply=args.apply,
|
||||
comment_existing=not args.no_comment_existing,
|
||||
)
|
||||
else:
|
||||
payload["results"] = [
|
||||
{
|
||||
"action": "local_only",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": None,
|
||||
"title": incident.title,
|
||||
}
|
||||
for incident in incidents
|
||||
]
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(payload, indent=2))
|
||||
else:
|
||||
print(f"Generated at: {payload['generated_at']}")
|
||||
if not incidents:
|
||||
print("No autonomous infrastructure incidents detected.")
|
||||
for incident in incidents:
|
||||
print(f"- {incident.title} [{incident.fingerprint}]")
|
||||
for result in payload["results"]:
|
||||
print(f" -> {result['action']}: {result['title']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,31 +1,169 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import os
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
"""Dynamic dispatch optimizer for fleet-wide coordination.
|
||||
|
||||
# Dynamic Dispatch Optimizer
|
||||
# Automatically updates routing based on fleet health.
|
||||
Refs: timmy-home #552
|
||||
|
||||
Takes a fleet dispatch spec plus optional failover status and produces a
|
||||
capacity-aware assignment plan. Safe by default: it prints the plan and only
|
||||
writes an output file when explicitly requested.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
STATUS_FILE = Path.home() / ".timmy" / "failover_status.json"
|
||||
CONFIG_FILE = Path.home() / "timmy" / "config.yaml"
|
||||
SPEC_FILE = Path.home() / ".timmy" / "fleet_dispatch.json"
|
||||
OUTPUT_FILE = Path.home() / ".timmy" / "dispatch_plan.json"
|
||||
|
||||
|
||||
def load_json(path: Path, default: Any):
|
||||
if not path.exists():
|
||||
return default
|
||||
return json.loads(path.read_text())
|
||||
|
||||
|
||||
def _host_status(host: dict[str, Any], failover_status: dict[str, Any]) -> str:
|
||||
if host.get("always_available"):
|
||||
return "ONLINE"
|
||||
fleet = failover_status.get("fleet") or {}
|
||||
return str(fleet.get(host["name"], "ONLINE")).upper()
|
||||
|
||||
|
||||
def _lane_matches(host: dict[str, Any], lane: str) -> bool:
|
||||
host_lanes = set(host.get("lanes") or ["general"])
|
||||
if host.get("always_available", False):
|
||||
return True
|
||||
if lane == "general":
|
||||
return "general" in host_lanes
|
||||
return lane in host_lanes
|
||||
|
||||
|
||||
def _choose_candidate(task: dict[str, Any], hosts: list[dict[str, Any]]):
|
||||
lane = task.get("lane", "general")
|
||||
preferred = task.get("preferred_hosts") or []
|
||||
|
||||
preferred_map = {host["name"]: host for host in hosts}
|
||||
for host_name in preferred:
|
||||
host = preferred_map.get(host_name)
|
||||
if not host:
|
||||
continue
|
||||
if host["remaining_capacity"] <= 0:
|
||||
continue
|
||||
if _lane_matches(host, lane):
|
||||
return host
|
||||
|
||||
matching = [host for host in hosts if host["remaining_capacity"] > 0 and _lane_matches(host, lane)]
|
||||
if matching:
|
||||
matching.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
|
||||
return matching[0]
|
||||
|
||||
fallbacks = [host for host in hosts if host["remaining_capacity"] > 0 and host.get("always_available")]
|
||||
if fallbacks:
|
||||
fallbacks.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
|
||||
return fallbacks[0]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def generate_plan(spec: dict[str, Any], failover_status: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||
failover_status = failover_status or {}
|
||||
raw_hosts = spec.get("hosts") or []
|
||||
tasks = list(spec.get("tasks") or [])
|
||||
|
||||
online_hosts = []
|
||||
offline_hosts = []
|
||||
for host in raw_hosts:
|
||||
normalized = {
|
||||
"name": host["name"],
|
||||
"capacity": int(host.get("capacity", 1)),
|
||||
"remaining_capacity": int(host.get("capacity", 1)),
|
||||
"assigned_count": 0,
|
||||
"lanes": list(host.get("lanes") or ["general"]),
|
||||
"always_available": bool(host.get("always_available", False)),
|
||||
"status": _host_status(host, failover_status),
|
||||
}
|
||||
if normalized["status"] == "ONLINE":
|
||||
online_hosts.append(normalized)
|
||||
else:
|
||||
offline_hosts.append(normalized["name"])
|
||||
|
||||
ordered_tasks = sorted(
|
||||
tasks,
|
||||
key=lambda item: (-int(item.get("priority", 0)), str(item.get("id", ""))),
|
||||
)
|
||||
|
||||
assignments = []
|
||||
unassigned = []
|
||||
for task in ordered_tasks:
|
||||
candidate = _choose_candidate(task, online_hosts)
|
||||
if candidate is None:
|
||||
unassigned.append({
|
||||
"task_id": task.get("id"),
|
||||
"reason": f"no_online_host_for_lane:{task.get('lane', 'general')}",
|
||||
})
|
||||
continue
|
||||
|
||||
candidate["remaining_capacity"] -= 1
|
||||
candidate["assigned_count"] += 1
|
||||
assignments.append({
|
||||
"task_id": task.get("id"),
|
||||
"host": candidate["name"],
|
||||
"lane": task.get("lane", "general"),
|
||||
"priority": int(task.get("priority", 0)),
|
||||
})
|
||||
|
||||
return {
|
||||
"assignments": assignments,
|
||||
"offline_hosts": sorted(offline_hosts),
|
||||
"unassigned": unassigned,
|
||||
}
|
||||
|
||||
|
||||
def write_plan(plan: dict[str, Any], output_path: Path):
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(plan, indent=2))
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Generate a fleet dispatch plan from host health and task demand.")
|
||||
parser.add_argument("--spec-file", type=Path, default=SPEC_FILE, help="JSON fleet spec with hosts[] and tasks[]")
|
||||
parser.add_argument("--status-file", type=Path, default=STATUS_FILE, help="Failover monitor JSON payload")
|
||||
parser.add_argument("--output", type=Path, default=OUTPUT_FILE, help="Output path for the generated plan")
|
||||
parser.add_argument("--write-output", action="store_true", help="Persist the generated plan to --output")
|
||||
parser.add_argument("--json", action="store_true", help="Print JSON only")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
print("--- Allegro's Dynamic Dispatch Optimizer ---")
|
||||
if not STATUS_FILE.exists():
|
||||
print("No failover status found.")
|
||||
args = parse_args()
|
||||
spec = load_json(args.spec_file, {"hosts": [], "tasks": []})
|
||||
failover_status = load_json(args.status_file, {})
|
||||
plan = generate_plan(spec, failover_status)
|
||||
|
||||
if args.write_output:
|
||||
write_plan(plan, args.output)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(plan, indent=2))
|
||||
return
|
||||
|
||||
status = json.loads(STATUS_FILE.read_text())
|
||||
fleet = status.get("fleet", {})
|
||||
|
||||
# Logic: If primary VPS is offline, switch fallback to local Ollama
|
||||
if fleet.get("ezra") == "OFFLINE":
|
||||
print("Ezra (Primary) is OFFLINE. Optimizing for local-only fallback...")
|
||||
# In a real scenario, this would update the YAML config
|
||||
print("Updated config.yaml: fallback_model -> ollama:gemma4:12b")
|
||||
else:
|
||||
print("Fleet health is optimal. Maintaining high-performance routing.")
|
||||
print("--- Dynamic Dispatch Optimizer ---")
|
||||
print(f"Assignments: {len(plan['assignments'])}")
|
||||
if plan["offline_hosts"]:
|
||||
print("Offline hosts: " + ", ".join(plan["offline_hosts"]))
|
||||
for assignment in plan["assignments"]:
|
||||
print(f"- {assignment['task_id']} -> {assignment['host']} ({assignment['lane']}, p={assignment['priority']})")
|
||||
if plan["unassigned"]:
|
||||
print("Unassigned:")
|
||||
for item in plan["unassigned"]:
|
||||
print(f"- {item['task_id']}: {item['reason']}")
|
||||
if args.write_output:
|
||||
print(f"Wrote plan to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1,131 +0,0 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from scripts.autonomous_issue_creator import (
|
||||
Incident,
|
||||
build_incidents,
|
||||
heartbeat_is_stale,
|
||||
load_restart_counts,
|
||||
sync_incidents,
|
||||
)
|
||||
|
||||
|
||||
class FakeGiteaClient:
|
||||
def __init__(self, open_issues=None):
|
||||
self._open_issues = list(open_issues or [])
|
||||
self.created = []
|
||||
self.commented = []
|
||||
|
||||
def list_open_issues(self):
|
||||
return list(self._open_issues)
|
||||
|
||||
def create_issue(self, title, body):
|
||||
issue = {"number": 100 + len(self.created), "title": title, "body": body}
|
||||
self.created.append(issue)
|
||||
return issue
|
||||
|
||||
def comment_issue(self, issue_number, body):
|
||||
self.commented.append({"issue_number": issue_number, "body": body})
|
||||
|
||||
|
||||
def test_load_restart_counts_reads_only_count_files(tmp_path):
|
||||
(tmp_path / "act_runner.count").write_text("4\n")
|
||||
(tmp_path / "worker.count").write_text("2\n")
|
||||
(tmp_path / "notes.txt").write_text("ignore me")
|
||||
(tmp_path / "bad.count").write_text("not-an-int")
|
||||
|
||||
counts = load_restart_counts(tmp_path)
|
||||
|
||||
assert counts == {"act_runner": 4, "worker": 2}
|
||||
|
||||
|
||||
def test_heartbeat_is_stale_handles_missing_and_old_files(tmp_path):
|
||||
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
missing = heartbeat_is_stale(tmp_path / "missing.last", now=now, max_age_seconds=900)
|
||||
assert missing is True
|
||||
|
||||
heartbeat = tmp_path / "fleet_health.last"
|
||||
heartbeat.write_text("")
|
||||
old = now.timestamp() - 1800
|
||||
recent = now.timestamp() - 60
|
||||
|
||||
heartbeat.touch()
|
||||
os = __import__("os")
|
||||
os.utime(heartbeat, (old, old))
|
||||
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is True
|
||||
|
||||
os.utime(heartbeat, (recent, recent))
|
||||
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is False
|
||||
|
||||
|
||||
def test_build_incidents_captures_offline_hosts_restart_escalations_and_stale_probe():
|
||||
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
|
||||
failover_status = {
|
||||
"timestamp": 1713148800.0,
|
||||
"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"},
|
||||
}
|
||||
|
||||
incidents = build_incidents(
|
||||
failover_status=failover_status,
|
||||
restart_counts={"act_runner": 4, "worker": 2},
|
||||
heartbeat_stale=True,
|
||||
now=now,
|
||||
restart_escalation_threshold=3,
|
||||
)
|
||||
|
||||
fingerprints = {incident.fingerprint for incident in incidents}
|
||||
assert fingerprints == {
|
||||
"host-offline:bezalel",
|
||||
"restart-escalation:act_runner",
|
||||
"probe-stale:fleet-health",
|
||||
}
|
||||
|
||||
titles = {incident.title for incident in incidents}
|
||||
assert "[AUTO] Fleet host offline: bezalel" in titles
|
||||
assert "[AUTO] Restart escalation: act_runner" in titles
|
||||
assert "[AUTO] Fleet health probe stale" in titles
|
||||
|
||||
|
||||
def test_sync_incidents_reuses_open_issues_and_creates_missing_ones():
|
||||
client = FakeGiteaClient(
|
||||
open_issues=[
|
||||
{
|
||||
"number": 71,
|
||||
"title": "[AUTO] Fleet host offline: bezalel",
|
||||
"body": "Fingerprint: host-offline:bezalel\n",
|
||||
}
|
||||
]
|
||||
)
|
||||
incidents = [
|
||||
Incident(
|
||||
fingerprint="host-offline:bezalel",
|
||||
title="[AUTO] Fleet host offline: bezalel",
|
||||
body="Fingerprint: host-offline:bezalel\nHost unreachable",
|
||||
),
|
||||
Incident(
|
||||
fingerprint="probe-stale:fleet-health",
|
||||
title="[AUTO] Fleet health probe stale",
|
||||
body="Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
|
||||
),
|
||||
]
|
||||
|
||||
results = sync_incidents(incidents, client, apply=True, comment_existing=True)
|
||||
|
||||
assert [result["action"] for result in results] == ["commented", "created"]
|
||||
assert client.commented == [
|
||||
{
|
||||
"issue_number": 71,
|
||||
"body": "Autonomous infrastructure detector saw the same incident again.\n\nFingerprint: host-offline:bezalel\n\nLatest evidence:\nHost unreachable",
|
||||
}
|
||||
]
|
||||
assert client.created == [
|
||||
{
|
||||
"number": 100,
|
||||
"title": "[AUTO] Fleet health probe stale",
|
||||
"body": "Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
|
||||
}
|
||||
]
|
||||
79
tests/test_dynamic_dispatch_optimizer.py
Normal file
79
tests/test_dynamic_dispatch_optimizer.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import json
|
||||
|
||||
from scripts.dynamic_dispatch_optimizer import generate_plan, write_plan
|
||||
|
||||
|
||||
def test_generate_plan_rebalances_offline_host_tasks_to_online_capacity():
|
||||
spec = {
|
||||
"hosts": [
|
||||
{"name": "ezra", "capacity": 2, "lanes": ["research", "general"]},
|
||||
{"name": "bezalel", "capacity": 2, "lanes": ["build", "general"]},
|
||||
{"name": "local", "capacity": 1, "lanes": ["general"], "always_available": True},
|
||||
],
|
||||
"tasks": [
|
||||
{"id": "ISSUE-1", "lane": "build", "priority": 100},
|
||||
{"id": "ISSUE-2", "lane": "general", "priority": 80},
|
||||
{"id": "ISSUE-3", "lane": "research", "priority": 60},
|
||||
],
|
||||
}
|
||||
failover_status = {"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"}}
|
||||
|
||||
plan = generate_plan(spec, failover_status)
|
||||
|
||||
assignments = {item["task_id"]: item["host"] for item in plan["assignments"]}
|
||||
assert assignments == {
|
||||
"ISSUE-1": "local",
|
||||
"ISSUE-2": "ezra",
|
||||
"ISSUE-3": "ezra",
|
||||
}
|
||||
assert plan["offline_hosts"] == ["bezalel"]
|
||||
assert plan["unassigned"] == []
|
||||
|
||||
|
||||
def test_generate_plan_prefers_preferred_host_when_online():
|
||||
spec = {
|
||||
"hosts": [
|
||||
{"name": "ezra", "capacity": 2, "lanes": ["general"]},
|
||||
{"name": "bezalel", "capacity": 2, "lanes": ["general"]},
|
||||
],
|
||||
"tasks": [
|
||||
{"id": "ISSUE-9", "lane": "general", "priority": 100, "preferred_hosts": ["bezalel", "ezra"]},
|
||||
],
|
||||
}
|
||||
|
||||
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE", "bezalel": "ONLINE"}})
|
||||
|
||||
assert plan["assignments"] == [
|
||||
{"task_id": "ISSUE-9", "host": "bezalel", "lane": "general", "priority": 100}
|
||||
]
|
||||
|
||||
|
||||
def test_generate_plan_reports_unassigned_when_no_host_matches_lane():
|
||||
spec = {
|
||||
"hosts": [
|
||||
{"name": "ezra", "capacity": 1, "lanes": ["research"]},
|
||||
],
|
||||
"tasks": [
|
||||
{"id": "ISSUE-5", "lane": "build", "priority": 50},
|
||||
],
|
||||
}
|
||||
|
||||
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE"}})
|
||||
|
||||
assert plan["assignments"] == []
|
||||
assert plan["unassigned"] == [
|
||||
{"task_id": "ISSUE-5", "reason": "no_online_host_for_lane:build"}
|
||||
]
|
||||
|
||||
|
||||
def test_write_plan_persists_json(tmp_path):
|
||||
plan = {
|
||||
"assignments": [{"task_id": "ISSUE-1", "host": "ezra", "lane": "general", "priority": 10}],
|
||||
"offline_hosts": [],
|
||||
"unassigned": [],
|
||||
}
|
||||
output_path = tmp_path / "dispatch-plan.json"
|
||||
|
||||
write_plan(plan, output_path)
|
||||
|
||||
assert json.loads(output_path.read_text()) == plan
|
||||
Reference in New Issue
Block a user