Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0b4b20f62e feat: add fleet dispatch planning scaffold (#552)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
2026-04-15 00:48:41 -04:00
4 changed files with 237 additions and 480 deletions

View File

@@ -1,329 +0,0 @@
#!/usr/bin/env python3
"""Create or refresh fleet incidents on Gitea from local infrastructure signals.
Refs: timmy-home #553
"""
from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from urllib import request
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_REPO = "timmy-home"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_FAILOVER_STATUS = Path.home() / ".timmy" / "failover_status.json"
DEFAULT_RESTART_STATE_DIR = Path("/var/lib/timmy/restarts")
DEFAULT_HEARTBEAT_FILE = Path("/var/lib/timmy/heartbeats/fleet_health.last")
@dataclass(frozen=True)
class Incident:
fingerprint: str
title: str
body: str
def latest_evidence(self) -> str:
lines = [line for line in self.body.splitlines() if line.strip()]
if lines and lines[0].startswith("Fingerprint: "):
lines = lines[1:]
return "\n".join(lines).strip()
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.repo = repo
self.base_url = base_url.rstrip("/")
def _request(self, path: str, *, method: str = "GET", data: dict | None = None):
payload = None if data is None else json.dumps(data).encode()
headers = {"Authorization": f"token {self.token}"}
if payload is not None:
headers["Content-Type"] = "application/json"
req = request.Request(f"{self.base_url}{path}", data=payload, headers=headers, method=method)
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_open_issues(self):
issues = self._request(f"/repos/{self.owner}/{self.repo}/issues?state=open&limit=100")
return [issue for issue in issues if not issue.get("pull_request")]
def create_issue(self, title: str, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues",
method="POST",
data={"title": title, "body": body},
)
def comment_issue(self, issue_number: int, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments",
method="POST",
data={"body": body},
)
def load_json(path: Path):
if not path.exists():
return None
return json.loads(path.read_text())
def load_restart_counts(state_dir: Path) -> dict[str, int]:
if not state_dir.exists():
return {}
counts: dict[str, int] = {}
for path in sorted(state_dir.glob("*.count")):
try:
counts[path.stem] = int(path.read_text().strip())
except ValueError:
continue
return counts
def heartbeat_is_stale(path: Path, *, now: datetime | None = None, max_age_seconds: int = 900) -> bool:
if now is None:
now = datetime.now(timezone.utc)
if not path.exists():
return True
age = now.timestamp() - path.stat().st_mtime
return age > max_age_seconds
def _iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _build_body(fingerprint: str, *details: str) -> str:
detail_lines = [detail for detail in details if detail]
return "\n".join([f"Fingerprint: {fingerprint}", *detail_lines])
def build_incidents(
*,
failover_status: dict | None,
restart_counts: dict[str, int],
heartbeat_stale: bool,
now: datetime | None = None,
restart_escalation_threshold: int = 3,
) -> list[Incident]:
if now is None:
now = datetime.now(timezone.utc)
incidents: list[Incident] = []
failover_timestamp = None
fleet = {}
if failover_status:
failover_timestamp = failover_status.get("timestamp")
fleet = failover_status.get("fleet") or {}
for host, status in sorted(fleet.items()):
if str(status).upper() == "ONLINE":
continue
fingerprint = f"host-offline:{host}"
failover_detail = f"Failover status timestamp: {failover_timestamp}" if failover_timestamp is not None else "Failover status timestamp: unknown"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Fleet host offline: {host}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
failover_detail,
f"Host `{host}` reported `{status}` by failover monitor.",
),
)
)
for process_name, count in sorted(restart_counts.items()):
if count <= restart_escalation_threshold:
continue
fingerprint = f"restart-escalation:{process_name}"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Restart escalation: {process_name}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
f"Process `{process_name}` has crossed the restart escalation threshold with count={count}.",
),
)
)
if heartbeat_stale:
fingerprint = "probe-stale:fleet-health"
incidents.append(
Incident(
fingerprint=fingerprint,
title="[AUTO] Fleet health probe stale",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
"Heartbeat missing or older than the configured fleet health maximum age.",
),
)
)
return incidents
def find_matching_issue(incident: Incident, open_issues: Iterable[dict]) -> dict | None:
for issue in open_issues:
haystack = "\n".join([issue.get("title") or "", issue.get("body") or ""])
if incident.fingerprint in haystack or incident.title == issue.get("title"):
return issue
return None
def build_repeat_comment(incident: Incident) -> str:
return (
"Autonomous infrastructure detector saw the same incident again.\n\n"
f"Fingerprint: {incident.fingerprint}\n\n"
f"Latest evidence:\n{incident.latest_evidence()}"
)
def sync_incidents(
incidents: Iterable[Incident],
client: GiteaClient,
*,
apply: bool = False,
comment_existing: bool = True,
):
open_issues = list(client.list_open_issues())
results = []
for incident in incidents:
existing = find_matching_issue(incident, open_issues)
if existing:
action = "existing"
if apply and comment_existing:
client.comment_issue(existing["number"], build_repeat_comment(incident))
action = "commented"
results.append(
{
"action": action,
"fingerprint": incident.fingerprint,
"issue_number": existing["number"],
"title": existing.get("title"),
}
)
continue
if apply:
created = client.create_issue(incident.title, incident.body)
open_issues.append(created)
results.append(
{
"action": "created",
"fingerprint": incident.fingerprint,
"issue_number": created["number"],
"title": created.get("title"),
}
)
else:
results.append(
{
"action": "would_create",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
)
return results
def parse_args():
parser = argparse.ArgumentParser(description="Create or refresh fleet incidents on Gitea from local infrastructure signals.")
parser.add_argument("--owner", default=DEFAULT_OWNER)
parser.add_argument("--repo", default=DEFAULT_REPO)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE)
parser.add_argument("--failover-status", type=Path, default=DEFAULT_FAILOVER_STATUS)
parser.add_argument("--restart-state-dir", type=Path, default=DEFAULT_RESTART_STATE_DIR)
parser.add_argument("--heartbeat-file", type=Path, default=DEFAULT_HEARTBEAT_FILE)
parser.add_argument("--heartbeat-max-age-seconds", type=int, default=900)
parser.add_argument("--restart-escalation-threshold", type=int, default=3)
parser.add_argument("--apply", action="store_true", help="Create/comment issues instead of reporting what would happen.")
parser.add_argument("--no-comment-existing", action="store_true", help="Do not comment on existing matching issues.")
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output.")
return parser.parse_args()
def main():
args = parse_args()
now = datetime.now(timezone.utc)
failover_status = load_json(args.failover_status)
restart_counts = load_restart_counts(args.restart_state_dir)
heartbeat_stale = heartbeat_is_stale(
args.heartbeat_file,
now=now,
max_age_seconds=args.heartbeat_max_age_seconds,
)
incidents = build_incidents(
failover_status=failover_status,
restart_counts=restart_counts,
heartbeat_stale=heartbeat_stale,
now=now,
restart_escalation_threshold=args.restart_escalation_threshold,
)
payload = {
"generated_at": _iso(now),
"incidents": [incident.__dict__ for incident in incidents],
"results": [],
}
token = None
if args.token_file.exists():
token = args.token_file.read_text().strip()
if args.apply and not token:
raise SystemExit(f"Token file not found: {args.token_file}")
if token:
client = GiteaClient(token=token, owner=args.owner, repo=args.repo, base_url=args.base_url)
payload["results"] = sync_incidents(
incidents,
client,
apply=args.apply,
comment_existing=not args.no_comment_existing,
)
else:
payload["results"] = [
{
"action": "local_only",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
for incident in incidents
]
if args.json:
print(json.dumps(payload, indent=2))
else:
print(f"Generated at: {payload['generated_at']}")
if not incidents:
print("No autonomous infrastructure incidents detected.")
for incident in incidents:
print(f"- {incident.title} [{incident.fingerprint}]")
for result in payload["results"]:
print(f" -> {result['action']}: {result['title']}")
if __name__ == "__main__":
main()

View File

@@ -1,31 +1,169 @@
#!/usr/bin/env python3
import json
import os
import yaml
from pathlib import Path
"""Dynamic dispatch optimizer for fleet-wide coordination.
# Dynamic Dispatch Optimizer
# Automatically updates routing based on fleet health.
Refs: timmy-home #552
Takes a fleet dispatch spec plus optional failover status and produces a
capacity-aware assignment plan. Safe by default: it prints the plan and only
writes an output file when explicitly requested.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
STATUS_FILE = Path.home() / ".timmy" / "failover_status.json"
CONFIG_FILE = Path.home() / "timmy" / "config.yaml"
SPEC_FILE = Path.home() / ".timmy" / "fleet_dispatch.json"
OUTPUT_FILE = Path.home() / ".timmy" / "dispatch_plan.json"
def load_json(path: Path, default: Any):
if not path.exists():
return default
return json.loads(path.read_text())
def _host_status(host: dict[str, Any], failover_status: dict[str, Any]) -> str:
if host.get("always_available"):
return "ONLINE"
fleet = failover_status.get("fleet") or {}
return str(fleet.get(host["name"], "ONLINE")).upper()
def _lane_matches(host: dict[str, Any], lane: str) -> bool:
host_lanes = set(host.get("lanes") or ["general"])
if host.get("always_available", False):
return True
if lane == "general":
return "general" in host_lanes
return lane in host_lanes
def _choose_candidate(task: dict[str, Any], hosts: list[dict[str, Any]]):
lane = task.get("lane", "general")
preferred = task.get("preferred_hosts") or []
preferred_map = {host["name"]: host for host in hosts}
for host_name in preferred:
host = preferred_map.get(host_name)
if not host:
continue
if host["remaining_capacity"] <= 0:
continue
if _lane_matches(host, lane):
return host
matching = [host for host in hosts if host["remaining_capacity"] > 0 and _lane_matches(host, lane)]
if matching:
matching.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
return matching[0]
fallbacks = [host for host in hosts if host["remaining_capacity"] > 0 and host.get("always_available")]
if fallbacks:
fallbacks.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
return fallbacks[0]
return None
def generate_plan(spec: dict[str, Any], failover_status: dict[str, Any] | None = None) -> dict[str, Any]:
failover_status = failover_status or {}
raw_hosts = spec.get("hosts") or []
tasks = list(spec.get("tasks") or [])
online_hosts = []
offline_hosts = []
for host in raw_hosts:
normalized = {
"name": host["name"],
"capacity": int(host.get("capacity", 1)),
"remaining_capacity": int(host.get("capacity", 1)),
"assigned_count": 0,
"lanes": list(host.get("lanes") or ["general"]),
"always_available": bool(host.get("always_available", False)),
"status": _host_status(host, failover_status),
}
if normalized["status"] == "ONLINE":
online_hosts.append(normalized)
else:
offline_hosts.append(normalized["name"])
ordered_tasks = sorted(
tasks,
key=lambda item: (-int(item.get("priority", 0)), str(item.get("id", ""))),
)
assignments = []
unassigned = []
for task in ordered_tasks:
candidate = _choose_candidate(task, online_hosts)
if candidate is None:
unassigned.append({
"task_id": task.get("id"),
"reason": f"no_online_host_for_lane:{task.get('lane', 'general')}",
})
continue
candidate["remaining_capacity"] -= 1
candidate["assigned_count"] += 1
assignments.append({
"task_id": task.get("id"),
"host": candidate["name"],
"lane": task.get("lane", "general"),
"priority": int(task.get("priority", 0)),
})
return {
"assignments": assignments,
"offline_hosts": sorted(offline_hosts),
"unassigned": unassigned,
}
def write_plan(plan: dict[str, Any], output_path: Path):
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(plan, indent=2))
def parse_args():
parser = argparse.ArgumentParser(description="Generate a fleet dispatch plan from host health and task demand.")
parser.add_argument("--spec-file", type=Path, default=SPEC_FILE, help="JSON fleet spec with hosts[] and tasks[]")
parser.add_argument("--status-file", type=Path, default=STATUS_FILE, help="Failover monitor JSON payload")
parser.add_argument("--output", type=Path, default=OUTPUT_FILE, help="Output path for the generated plan")
parser.add_argument("--write-output", action="store_true", help="Persist the generated plan to --output")
parser.add_argument("--json", action="store_true", help="Print JSON only")
return parser.parse_args()
def main():
print("--- Allegro's Dynamic Dispatch Optimizer ---")
if not STATUS_FILE.exists():
print("No failover status found.")
args = parse_args()
spec = load_json(args.spec_file, {"hosts": [], "tasks": []})
failover_status = load_json(args.status_file, {})
plan = generate_plan(spec, failover_status)
if args.write_output:
write_plan(plan, args.output)
if args.json:
print(json.dumps(plan, indent=2))
return
status = json.loads(STATUS_FILE.read_text())
fleet = status.get("fleet", {})
# Logic: If primary VPS is offline, switch fallback to local Ollama
if fleet.get("ezra") == "OFFLINE":
print("Ezra (Primary) is OFFLINE. Optimizing for local-only fallback...")
# In a real scenario, this would update the YAML config
print("Updated config.yaml: fallback_model -> ollama:gemma4:12b")
else:
print("Fleet health is optimal. Maintaining high-performance routing.")
print("--- Dynamic Dispatch Optimizer ---")
print(f"Assignments: {len(plan['assignments'])}")
if plan["offline_hosts"]:
print("Offline hosts: " + ", ".join(plan["offline_hosts"]))
for assignment in plan["assignments"]:
print(f"- {assignment['task_id']} -> {assignment['host']} ({assignment['lane']}, p={assignment['priority']})")
if plan["unassigned"]:
print("Unassigned:")
for item in plan["unassigned"]:
print(f"- {item['task_id']}: {item['reason']}")
if args.write_output:
print(f"Wrote plan to {args.output}")
if __name__ == "__main__":
main()

View File

@@ -1,131 +0,0 @@
from datetime import datetime, timezone
from pathlib import Path
import pytest
from scripts.autonomous_issue_creator import (
Incident,
build_incidents,
heartbeat_is_stale,
load_restart_counts,
sync_incidents,
)
class FakeGiteaClient:
def __init__(self, open_issues=None):
self._open_issues = list(open_issues or [])
self.created = []
self.commented = []
def list_open_issues(self):
return list(self._open_issues)
def create_issue(self, title, body):
issue = {"number": 100 + len(self.created), "title": title, "body": body}
self.created.append(issue)
return issue
def comment_issue(self, issue_number, body):
self.commented.append({"issue_number": issue_number, "body": body})
def test_load_restart_counts_reads_only_count_files(tmp_path):
(tmp_path / "act_runner.count").write_text("4\n")
(tmp_path / "worker.count").write_text("2\n")
(tmp_path / "notes.txt").write_text("ignore me")
(tmp_path / "bad.count").write_text("not-an-int")
counts = load_restart_counts(tmp_path)
assert counts == {"act_runner": 4, "worker": 2}
def test_heartbeat_is_stale_handles_missing_and_old_files(tmp_path):
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
missing = heartbeat_is_stale(tmp_path / "missing.last", now=now, max_age_seconds=900)
assert missing is True
heartbeat = tmp_path / "fleet_health.last"
heartbeat.write_text("")
old = now.timestamp() - 1800
recent = now.timestamp() - 60
heartbeat.touch()
os = __import__("os")
os.utime(heartbeat, (old, old))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is True
os.utime(heartbeat, (recent, recent))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is False
def test_build_incidents_captures_offline_hosts_restart_escalations_and_stale_probe():
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
failover_status = {
"timestamp": 1713148800.0,
"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"},
}
incidents = build_incidents(
failover_status=failover_status,
restart_counts={"act_runner": 4, "worker": 2},
heartbeat_stale=True,
now=now,
restart_escalation_threshold=3,
)
fingerprints = {incident.fingerprint for incident in incidents}
assert fingerprints == {
"host-offline:bezalel",
"restart-escalation:act_runner",
"probe-stale:fleet-health",
}
titles = {incident.title for incident in incidents}
assert "[AUTO] Fleet host offline: bezalel" in titles
assert "[AUTO] Restart escalation: act_runner" in titles
assert "[AUTO] Fleet health probe stale" in titles
def test_sync_incidents_reuses_open_issues_and_creates_missing_ones():
client = FakeGiteaClient(
open_issues=[
{
"number": 71,
"title": "[AUTO] Fleet host offline: bezalel",
"body": "Fingerprint: host-offline:bezalel\n",
}
]
)
incidents = [
Incident(
fingerprint="host-offline:bezalel",
title="[AUTO] Fleet host offline: bezalel",
body="Fingerprint: host-offline:bezalel\nHost unreachable",
),
Incident(
fingerprint="probe-stale:fleet-health",
title="[AUTO] Fleet health probe stale",
body="Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
),
]
results = sync_incidents(incidents, client, apply=True, comment_existing=True)
assert [result["action"] for result in results] == ["commented", "created"]
assert client.commented == [
{
"issue_number": 71,
"body": "Autonomous infrastructure detector saw the same incident again.\n\nFingerprint: host-offline:bezalel\n\nLatest evidence:\nHost unreachable",
}
]
assert client.created == [
{
"number": 100,
"title": "[AUTO] Fleet health probe stale",
"body": "Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
}
]

View File

@@ -0,0 +1,79 @@
import json
from scripts.dynamic_dispatch_optimizer import generate_plan, write_plan
def test_generate_plan_rebalances_offline_host_tasks_to_online_capacity():
spec = {
"hosts": [
{"name": "ezra", "capacity": 2, "lanes": ["research", "general"]},
{"name": "bezalel", "capacity": 2, "lanes": ["build", "general"]},
{"name": "local", "capacity": 1, "lanes": ["general"], "always_available": True},
],
"tasks": [
{"id": "ISSUE-1", "lane": "build", "priority": 100},
{"id": "ISSUE-2", "lane": "general", "priority": 80},
{"id": "ISSUE-3", "lane": "research", "priority": 60},
],
}
failover_status = {"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"}}
plan = generate_plan(spec, failover_status)
assignments = {item["task_id"]: item["host"] for item in plan["assignments"]}
assert assignments == {
"ISSUE-1": "local",
"ISSUE-2": "ezra",
"ISSUE-3": "ezra",
}
assert plan["offline_hosts"] == ["bezalel"]
assert plan["unassigned"] == []
def test_generate_plan_prefers_preferred_host_when_online():
spec = {
"hosts": [
{"name": "ezra", "capacity": 2, "lanes": ["general"]},
{"name": "bezalel", "capacity": 2, "lanes": ["general"]},
],
"tasks": [
{"id": "ISSUE-9", "lane": "general", "priority": 100, "preferred_hosts": ["bezalel", "ezra"]},
],
}
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE", "bezalel": "ONLINE"}})
assert plan["assignments"] == [
{"task_id": "ISSUE-9", "host": "bezalel", "lane": "general", "priority": 100}
]
def test_generate_plan_reports_unassigned_when_no_host_matches_lane():
spec = {
"hosts": [
{"name": "ezra", "capacity": 1, "lanes": ["research"]},
],
"tasks": [
{"id": "ISSUE-5", "lane": "build", "priority": 50},
],
}
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE"}})
assert plan["assignments"] == []
assert plan["unassigned"] == [
{"task_id": "ISSUE-5", "reason": "no_online_host_for_lane:build"}
]
def test_write_plan_persists_json(tmp_path):
plan = {
"assignments": [{"task_id": "ISSUE-1", "host": "ezra", "lane": "general", "priority": 10}],
"offline_hosts": [],
"unassigned": [],
}
output_path = tmp_path / "dispatch-plan.json"
write_plan(plan, output_path)
assert json.loads(output_path.read_text()) == plan