Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b4b20f62e |
@@ -1,31 +1,169 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import json
|
"""Dynamic dispatch optimizer for fleet-wide coordination.
|
||||||
import os
|
|
||||||
import yaml
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
# Dynamic Dispatch Optimizer
|
Refs: timmy-home #552
|
||||||
# Automatically updates routing based on fleet health.
|
|
||||||
|
Takes a fleet dispatch spec plus optional failover status and produces a
|
||||||
|
capacity-aware assignment plan. Safe by default: it prints the plan and only
|
||||||
|
writes an output file when explicitly requested.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
STATUS_FILE = Path.home() / ".timmy" / "failover_status.json"
|
STATUS_FILE = Path.home() / ".timmy" / "failover_status.json"
|
||||||
CONFIG_FILE = Path.home() / "timmy" / "config.yaml"
|
SPEC_FILE = Path.home() / ".timmy" / "fleet_dispatch.json"
|
||||||
|
OUTPUT_FILE = Path.home() / ".timmy" / "dispatch_plan.json"
|
||||||
|
|
||||||
|
|
||||||
|
def load_json(path: Path, default: Any):
|
||||||
|
if not path.exists():
|
||||||
|
return default
|
||||||
|
return json.loads(path.read_text())
|
||||||
|
|
||||||
|
|
||||||
|
def _host_status(host: dict[str, Any], failover_status: dict[str, Any]) -> str:
|
||||||
|
if host.get("always_available"):
|
||||||
|
return "ONLINE"
|
||||||
|
fleet = failover_status.get("fleet") or {}
|
||||||
|
return str(fleet.get(host["name"], "ONLINE")).upper()
|
||||||
|
|
||||||
|
|
||||||
|
def _lane_matches(host: dict[str, Any], lane: str) -> bool:
|
||||||
|
host_lanes = set(host.get("lanes") or ["general"])
|
||||||
|
if host.get("always_available", False):
|
||||||
|
return True
|
||||||
|
if lane == "general":
|
||||||
|
return "general" in host_lanes
|
||||||
|
return lane in host_lanes
|
||||||
|
|
||||||
|
|
||||||
|
def _choose_candidate(task: dict[str, Any], hosts: list[dict[str, Any]]):
|
||||||
|
lane = task.get("lane", "general")
|
||||||
|
preferred = task.get("preferred_hosts") or []
|
||||||
|
|
||||||
|
preferred_map = {host["name"]: host for host in hosts}
|
||||||
|
for host_name in preferred:
|
||||||
|
host = preferred_map.get(host_name)
|
||||||
|
if not host:
|
||||||
|
continue
|
||||||
|
if host["remaining_capacity"] <= 0:
|
||||||
|
continue
|
||||||
|
if _lane_matches(host, lane):
|
||||||
|
return host
|
||||||
|
|
||||||
|
matching = [host for host in hosts if host["remaining_capacity"] > 0 and _lane_matches(host, lane)]
|
||||||
|
if matching:
|
||||||
|
matching.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
|
||||||
|
return matching[0]
|
||||||
|
|
||||||
|
fallbacks = [host for host in hosts if host["remaining_capacity"] > 0 and host.get("always_available")]
|
||||||
|
if fallbacks:
|
||||||
|
fallbacks.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
|
||||||
|
return fallbacks[0]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def generate_plan(spec: dict[str, Any], failover_status: dict[str, Any] | None = None) -> dict[str, Any]:
|
||||||
|
failover_status = failover_status or {}
|
||||||
|
raw_hosts = spec.get("hosts") or []
|
||||||
|
tasks = list(spec.get("tasks") or [])
|
||||||
|
|
||||||
|
online_hosts = []
|
||||||
|
offline_hosts = []
|
||||||
|
for host in raw_hosts:
|
||||||
|
normalized = {
|
||||||
|
"name": host["name"],
|
||||||
|
"capacity": int(host.get("capacity", 1)),
|
||||||
|
"remaining_capacity": int(host.get("capacity", 1)),
|
||||||
|
"assigned_count": 0,
|
||||||
|
"lanes": list(host.get("lanes") or ["general"]),
|
||||||
|
"always_available": bool(host.get("always_available", False)),
|
||||||
|
"status": _host_status(host, failover_status),
|
||||||
|
}
|
||||||
|
if normalized["status"] == "ONLINE":
|
||||||
|
online_hosts.append(normalized)
|
||||||
|
else:
|
||||||
|
offline_hosts.append(normalized["name"])
|
||||||
|
|
||||||
|
ordered_tasks = sorted(
|
||||||
|
tasks,
|
||||||
|
key=lambda item: (-int(item.get("priority", 0)), str(item.get("id", ""))),
|
||||||
|
)
|
||||||
|
|
||||||
|
assignments = []
|
||||||
|
unassigned = []
|
||||||
|
for task in ordered_tasks:
|
||||||
|
candidate = _choose_candidate(task, online_hosts)
|
||||||
|
if candidate is None:
|
||||||
|
unassigned.append({
|
||||||
|
"task_id": task.get("id"),
|
||||||
|
"reason": f"no_online_host_for_lane:{task.get('lane', 'general')}",
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
candidate["remaining_capacity"] -= 1
|
||||||
|
candidate["assigned_count"] += 1
|
||||||
|
assignments.append({
|
||||||
|
"task_id": task.get("id"),
|
||||||
|
"host": candidate["name"],
|
||||||
|
"lane": task.get("lane", "general"),
|
||||||
|
"priority": int(task.get("priority", 0)),
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"assignments": assignments,
|
||||||
|
"offline_hosts": sorted(offline_hosts),
|
||||||
|
"unassigned": unassigned,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def write_plan(plan: dict[str, Any], output_path: Path):
|
||||||
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
output_path.write_text(json.dumps(plan, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(description="Generate a fleet dispatch plan from host health and task demand.")
|
||||||
|
parser.add_argument("--spec-file", type=Path, default=SPEC_FILE, help="JSON fleet spec with hosts[] and tasks[]")
|
||||||
|
parser.add_argument("--status-file", type=Path, default=STATUS_FILE, help="Failover monitor JSON payload")
|
||||||
|
parser.add_argument("--output", type=Path, default=OUTPUT_FILE, help="Output path for the generated plan")
|
||||||
|
parser.add_argument("--write-output", action="store_true", help="Persist the generated plan to --output")
|
||||||
|
parser.add_argument("--json", action="store_true", help="Print JSON only")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
print("--- Allegro's Dynamic Dispatch Optimizer ---")
|
args = parse_args()
|
||||||
if not STATUS_FILE.exists():
|
spec = load_json(args.spec_file, {"hosts": [], "tasks": []})
|
||||||
print("No failover status found.")
|
failover_status = load_json(args.status_file, {})
|
||||||
|
plan = generate_plan(spec, failover_status)
|
||||||
|
|
||||||
|
if args.write_output:
|
||||||
|
write_plan(plan, args.output)
|
||||||
|
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps(plan, indent=2))
|
||||||
return
|
return
|
||||||
|
|
||||||
status = json.loads(STATUS_FILE.read_text())
|
print("--- Dynamic Dispatch Optimizer ---")
|
||||||
fleet = status.get("fleet", {})
|
print(f"Assignments: {len(plan['assignments'])}")
|
||||||
|
if plan["offline_hosts"]:
|
||||||
# Logic: If primary VPS is offline, switch fallback to local Ollama
|
print("Offline hosts: " + ", ".join(plan["offline_hosts"]))
|
||||||
if fleet.get("ezra") == "OFFLINE":
|
for assignment in plan["assignments"]:
|
||||||
print("Ezra (Primary) is OFFLINE. Optimizing for local-only fallback...")
|
print(f"- {assignment['task_id']} -> {assignment['host']} ({assignment['lane']}, p={assignment['priority']})")
|
||||||
# In a real scenario, this would update the YAML config
|
if plan["unassigned"]:
|
||||||
print("Updated config.yaml: fallback_model -> ollama:gemma4:12b")
|
print("Unassigned:")
|
||||||
else:
|
for item in plan["unassigned"]:
|
||||||
print("Fleet health is optimal. Maintaining high-performance routing.")
|
print(f"- {item['task_id']}: {item['reason']}")
|
||||||
|
if args.write_output:
|
||||||
|
print(f"Wrote plan to {args.output}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
79
tests/test_dynamic_dispatch_optimizer.py
Normal file
79
tests/test_dynamic_dispatch_optimizer.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from scripts.dynamic_dispatch_optimizer import generate_plan, write_plan
|
||||||
|
|
||||||
|
|
||||||
|
def test_generate_plan_rebalances_offline_host_tasks_to_online_capacity():
|
||||||
|
spec = {
|
||||||
|
"hosts": [
|
||||||
|
{"name": "ezra", "capacity": 2, "lanes": ["research", "general"]},
|
||||||
|
{"name": "bezalel", "capacity": 2, "lanes": ["build", "general"]},
|
||||||
|
{"name": "local", "capacity": 1, "lanes": ["general"], "always_available": True},
|
||||||
|
],
|
||||||
|
"tasks": [
|
||||||
|
{"id": "ISSUE-1", "lane": "build", "priority": 100},
|
||||||
|
{"id": "ISSUE-2", "lane": "general", "priority": 80},
|
||||||
|
{"id": "ISSUE-3", "lane": "research", "priority": 60},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
failover_status = {"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"}}
|
||||||
|
|
||||||
|
plan = generate_plan(spec, failover_status)
|
||||||
|
|
||||||
|
assignments = {item["task_id"]: item["host"] for item in plan["assignments"]}
|
||||||
|
assert assignments == {
|
||||||
|
"ISSUE-1": "local",
|
||||||
|
"ISSUE-2": "ezra",
|
||||||
|
"ISSUE-3": "ezra",
|
||||||
|
}
|
||||||
|
assert plan["offline_hosts"] == ["bezalel"]
|
||||||
|
assert plan["unassigned"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_generate_plan_prefers_preferred_host_when_online():
|
||||||
|
spec = {
|
||||||
|
"hosts": [
|
||||||
|
{"name": "ezra", "capacity": 2, "lanes": ["general"]},
|
||||||
|
{"name": "bezalel", "capacity": 2, "lanes": ["general"]},
|
||||||
|
],
|
||||||
|
"tasks": [
|
||||||
|
{"id": "ISSUE-9", "lane": "general", "priority": 100, "preferred_hosts": ["bezalel", "ezra"]},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE", "bezalel": "ONLINE"}})
|
||||||
|
|
||||||
|
assert plan["assignments"] == [
|
||||||
|
{"task_id": "ISSUE-9", "host": "bezalel", "lane": "general", "priority": 100}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_generate_plan_reports_unassigned_when_no_host_matches_lane():
|
||||||
|
spec = {
|
||||||
|
"hosts": [
|
||||||
|
{"name": "ezra", "capacity": 1, "lanes": ["research"]},
|
||||||
|
],
|
||||||
|
"tasks": [
|
||||||
|
{"id": "ISSUE-5", "lane": "build", "priority": 50},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE"}})
|
||||||
|
|
||||||
|
assert plan["assignments"] == []
|
||||||
|
assert plan["unassigned"] == [
|
||||||
|
{"task_id": "ISSUE-5", "reason": "no_online_host_for_lane:build"}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_plan_persists_json(tmp_path):
|
||||||
|
plan = {
|
||||||
|
"assignments": [{"task_id": "ISSUE-1", "host": "ezra", "lane": "general", "priority": 10}],
|
||||||
|
"offline_hosts": [],
|
||||||
|
"unassigned": [],
|
||||||
|
}
|
||||||
|
output_path = tmp_path / "dispatch-plan.json"
|
||||||
|
|
||||||
|
write_plan(plan, output_path)
|
||||||
|
|
||||||
|
assert json.loads(output_path.read_text()) == plan
|
||||||
Reference in New Issue
Block a user