Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0b4b20f62e feat: add fleet dispatch planning scaffold (#552)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
2026-04-15 00:48:41 -04:00
6 changed files with 237 additions and 373 deletions

View File

@@ -1,61 +0,0 @@
# [PHASE-1] Survival - Keep the Lights On
Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.
## Phase Definition
- Current state: fleet exists, agents run, everything important still depends on human vigilance.
- Resources tracked here: Capacity, Uptime.
- Next phase: [PHASE-2] Automation - Self-Healing Infrastructure
## Current Buildings
- VPS hosts: Ezra, Allegro, Bezalel
- Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker
- Gitea forge
- Evennia worlds
## Current Resource Snapshot
- Fleet operational: yes
- Uptime baseline: 0.0%
- Days at or above 95% uptime: 0
- Capacity utilization: 0.0%
## Next Phase Trigger
To unlock [PHASE-2] Automation - Self-Healing Infrastructure, the fleet must hold both of these conditions at once:
- Uptime >= 95% for 30 consecutive days
- Capacity utilization > 60%
- Current trigger state: NOT READY
## Missing Requirements
- Uptime 0.0% / 95.0%
- Days at or above 95% uptime: 0/30
- Capacity utilization 0.0% / >60.0%
## Manual Clicker Interpretation
Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.
Every restart, every SSH, every check is a manual click.
## Manual Clicks Still Required
- Restart agents and services by hand when a node goes dark.
- SSH into machines to verify health, disk, and memory.
- Check Gitea, relay, and world services manually before and after changes.
- Act as the scheduler when automation is missing or only partially wired.
## Repo Signals Already Present
- `scripts/fleet_health_probe.sh` — Automated health probe exists and can supply the uptime baseline for the next phase.
- `scripts/fleet_milestones.py` — Milestone tracker exists, so survival achievements can be narrated and logged.
- `scripts/auto_restart_agent.sh` — Auto-restart tooling already exists as phase-2 groundwork.
- `scripts/backup_pipeline.sh` — Backup pipeline scaffold exists for post-survival automation work.
- `infrastructure/timmy-bridge/reports/generate_report.py` — Bridge reporting exists and can summarize heartbeat-driven uptime.
## Notes
- The fleet is alive, but the human is still the control loop.
- Phase 1 is about naming reality plainly so later automation has a baseline to beat.

View File

@@ -12,7 +12,6 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
| Render Phase-1 survival report | timmy-home | `python3 scripts/fleet_phase_status.py --output docs/FLEET_PHASE_1_SURVIVAL.md` |
## the-nexus (Frontend + Brain)

View File

@@ -1,31 +1,169 @@
#!/usr/bin/env python3
import json
import os
import yaml
from pathlib import Path
"""Dynamic dispatch optimizer for fleet-wide coordination.
# Dynamic Dispatch Optimizer
# Automatically updates routing based on fleet health.
Refs: timmy-home #552
Takes a fleet dispatch spec plus optional failover status and produces a
capacity-aware assignment plan. Safe by default: it prints the plan and only
writes an output file when explicitly requested.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
STATUS_FILE = Path.home() / ".timmy" / "failover_status.json"
CONFIG_FILE = Path.home() / "timmy" / "config.yaml"
SPEC_FILE = Path.home() / ".timmy" / "fleet_dispatch.json"
OUTPUT_FILE = Path.home() / ".timmy" / "dispatch_plan.json"
def load_json(path: Path, default: Any):
if not path.exists():
return default
return json.loads(path.read_text())
def _host_status(host: dict[str, Any], failover_status: dict[str, Any]) -> str:
if host.get("always_available"):
return "ONLINE"
fleet = failover_status.get("fleet") or {}
return str(fleet.get(host["name"], "ONLINE")).upper()
def _lane_matches(host: dict[str, Any], lane: str) -> bool:
host_lanes = set(host.get("lanes") or ["general"])
if host.get("always_available", False):
return True
if lane == "general":
return "general" in host_lanes
return lane in host_lanes
def _choose_candidate(task: dict[str, Any], hosts: list[dict[str, Any]]):
lane = task.get("lane", "general")
preferred = task.get("preferred_hosts") or []
preferred_map = {host["name"]: host for host in hosts}
for host_name in preferred:
host = preferred_map.get(host_name)
if not host:
continue
if host["remaining_capacity"] <= 0:
continue
if _lane_matches(host, lane):
return host
matching = [host for host in hosts if host["remaining_capacity"] > 0 and _lane_matches(host, lane)]
if matching:
matching.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
return matching[0]
fallbacks = [host for host in hosts if host["remaining_capacity"] > 0 and host.get("always_available")]
if fallbacks:
fallbacks.sort(key=lambda host: (host["assigned_count"], -host["remaining_capacity"], host["name"]))
return fallbacks[0]
return None
def generate_plan(spec: dict[str, Any], failover_status: dict[str, Any] | None = None) -> dict[str, Any]:
failover_status = failover_status or {}
raw_hosts = spec.get("hosts") or []
tasks = list(spec.get("tasks") or [])
online_hosts = []
offline_hosts = []
for host in raw_hosts:
normalized = {
"name": host["name"],
"capacity": int(host.get("capacity", 1)),
"remaining_capacity": int(host.get("capacity", 1)),
"assigned_count": 0,
"lanes": list(host.get("lanes") or ["general"]),
"always_available": bool(host.get("always_available", False)),
"status": _host_status(host, failover_status),
}
if normalized["status"] == "ONLINE":
online_hosts.append(normalized)
else:
offline_hosts.append(normalized["name"])
ordered_tasks = sorted(
tasks,
key=lambda item: (-int(item.get("priority", 0)), str(item.get("id", ""))),
)
assignments = []
unassigned = []
for task in ordered_tasks:
candidate = _choose_candidate(task, online_hosts)
if candidate is None:
unassigned.append({
"task_id": task.get("id"),
"reason": f"no_online_host_for_lane:{task.get('lane', 'general')}",
})
continue
candidate["remaining_capacity"] -= 1
candidate["assigned_count"] += 1
assignments.append({
"task_id": task.get("id"),
"host": candidate["name"],
"lane": task.get("lane", "general"),
"priority": int(task.get("priority", 0)),
})
return {
"assignments": assignments,
"offline_hosts": sorted(offline_hosts),
"unassigned": unassigned,
}
def write_plan(plan: dict[str, Any], output_path: Path):
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(plan, indent=2))
def parse_args():
parser = argparse.ArgumentParser(description="Generate a fleet dispatch plan from host health and task demand.")
parser.add_argument("--spec-file", type=Path, default=SPEC_FILE, help="JSON fleet spec with hosts[] and tasks[]")
parser.add_argument("--status-file", type=Path, default=STATUS_FILE, help="Failover monitor JSON payload")
parser.add_argument("--output", type=Path, default=OUTPUT_FILE, help="Output path for the generated plan")
parser.add_argument("--write-output", action="store_true", help="Persist the generated plan to --output")
parser.add_argument("--json", action="store_true", help="Print JSON only")
return parser.parse_args()
def main():
print("--- Allegro's Dynamic Dispatch Optimizer ---")
if not STATUS_FILE.exists():
print("No failover status found.")
args = parse_args()
spec = load_json(args.spec_file, {"hosts": [], "tasks": []})
failover_status = load_json(args.status_file, {})
plan = generate_plan(spec, failover_status)
if args.write_output:
write_plan(plan, args.output)
if args.json:
print(json.dumps(plan, indent=2))
return
status = json.loads(STATUS_FILE.read_text())
fleet = status.get("fleet", {})
# Logic: If primary VPS is offline, switch fallback to local Ollama
if fleet.get("ezra") == "OFFLINE":
print("Ezra (Primary) is OFFLINE. Optimizing for local-only fallback...")
# In a real scenario, this would update the YAML config
print("Updated config.yaml: fallback_model -> ollama:gemma4:12b")
else:
print("Fleet health is optimal. Maintaining high-performance routing.")
print("--- Dynamic Dispatch Optimizer ---")
print(f"Assignments: {len(plan['assignments'])}")
if plan["offline_hosts"]:
print("Offline hosts: " + ", ".join(plan["offline_hosts"]))
for assignment in plan["assignments"]:
print(f"- {assignment['task_id']} -> {assignment['host']} ({assignment['lane']}, p={assignment['priority']})")
if plan["unassigned"]:
print("Unassigned:")
for item in plan["unassigned"]:
print(f"- {item['task_id']}: {item['reason']}")
if args.write_output:
print(f"Wrote plan to {args.output}")
if __name__ == "__main__":
main()

View File

@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""Render the current fleet survival phase as a durable report."""
from __future__ import annotations
import argparse
import json
from copy import deepcopy
from pathlib import Path
from typing import Any
PHASE_NAME = "[PHASE-1] Survival - Keep the Lights On"
NEXT_PHASE_NAME = "[PHASE-2] Automation - Self-Healing Infrastructure"
TARGET_UPTIME_PERCENT = 95.0
TARGET_UPTIME_DAYS = 30
TARGET_CAPACITY_PERCENT = 60.0
DEFAULT_BUILDINGS = [
"VPS hosts: Ezra, Allegro, Bezalel",
"Agents: Timmy harness, Code Claw heartbeat, Gemini AI Studio worker",
"Gitea forge",
"Evennia worlds",
]
DEFAULT_MANUAL_CLICKS = [
"Restart agents and services by hand when a node goes dark.",
"SSH into machines to verify health, disk, and memory.",
"Check Gitea, relay, and world services manually before and after changes.",
"Act as the scheduler when automation is missing or only partially wired.",
]
REPO_SIGNAL_FILES = {
"scripts/fleet_health_probe.sh": "Automated health probe exists and can supply the uptime baseline for the next phase.",
"scripts/fleet_milestones.py": "Milestone tracker exists, so survival achievements can be narrated and logged.",
"scripts/auto_restart_agent.sh": "Auto-restart tooling already exists as phase-2 groundwork.",
"scripts/backup_pipeline.sh": "Backup pipeline scaffold exists for post-survival automation work.",
"infrastructure/timmy-bridge/reports/generate_report.py": "Bridge reporting exists and can summarize heartbeat-driven uptime.",
}
DEFAULT_SNAPSHOT = {
"fleet_operational": True,
"resources": {
"uptime_percent": 0.0,
"days_at_or_above_95_percent": 0,
"capacity_utilization_percent": 0.0,
},
"current_buildings": DEFAULT_BUILDINGS,
"manual_clicks": DEFAULT_MANUAL_CLICKS,
"notes": [
"The fleet is alive, but the human is still the control loop.",
"Phase 1 is about naming reality plainly so later automation has a baseline to beat.",
],
}
def default_snapshot() -> dict[str, Any]:
return deepcopy(DEFAULT_SNAPSHOT)
def _deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
result = deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
def load_snapshot(snapshot_path: Path | None = None) -> dict[str, Any]:
snapshot = default_snapshot()
if snapshot_path is None:
return snapshot
override = json.loads(snapshot_path.read_text(encoding="utf-8"))
return _deep_merge(snapshot, override)
def collect_repo_signals(repo_root: Path) -> list[str]:
signals: list[str] = []
for rel_path, description in REPO_SIGNAL_FILES.items():
if (repo_root / rel_path).exists():
signals.append(f"`{rel_path}` — {description}")
return signals
def compute_phase_status(snapshot: dict[str, Any], repo_root: Path | None = None) -> dict[str, Any]:
repo_root = repo_root or Path(__file__).resolve().parents[1]
resources = snapshot.get("resources", {})
uptime_percent = float(resources.get("uptime_percent", 0.0))
uptime_days = int(resources.get("days_at_or_above_95_percent", 0))
capacity_percent = float(resources.get("capacity_utilization_percent", 0.0))
fleet_operational = bool(snapshot.get("fleet_operational", False))
missing: list[str] = []
if not fleet_operational:
missing.append("Fleet operational flag is false.")
if uptime_percent < TARGET_UPTIME_PERCENT:
missing.append(f"Uptime {uptime_percent:.1f}% / {TARGET_UPTIME_PERCENT:.1f}%")
if uptime_days < TARGET_UPTIME_DAYS:
missing.append(f"Days at or above 95% uptime: {uptime_days}/{TARGET_UPTIME_DAYS}")
if capacity_percent <= TARGET_CAPACITY_PERCENT:
missing.append(f"Capacity utilization {capacity_percent:.1f}% / >{TARGET_CAPACITY_PERCENT:.1f}%")
return {
"title": PHASE_NAME,
"current_phase": "PHASE-1 Survival",
"fleet_operational": fleet_operational,
"resources": {
"uptime_percent": uptime_percent,
"days_at_or_above_95_percent": uptime_days,
"capacity_utilization_percent": capacity_percent,
},
"current_buildings": list(snapshot.get("current_buildings", DEFAULT_BUILDINGS)),
"manual_clicks": list(snapshot.get("manual_clicks", DEFAULT_MANUAL_CLICKS)),
"notes": list(snapshot.get("notes", [])),
"repo_signals": collect_repo_signals(repo_root),
"next_phase": NEXT_PHASE_NAME,
"next_phase_ready": fleet_operational and not missing,
"missing_requirements": missing,
}
def render_markdown(status: dict[str, Any]) -> str:
resources = status["resources"]
missing = status["missing_requirements"]
ready_line = "READY" if status["next_phase_ready"] else "NOT READY"
lines = [
f"# {status['title']}",
"",
"Phase 1 is the manual-clicker stage of the fleet. The machines exist. The services exist. The human is still the automation loop.",
"",
"## Phase Definition",
"",
"- Current state: fleet exists, agents run, everything important still depends on human vigilance.",
"- Resources tracked here: Capacity, Uptime.",
f"- Next phase: {status['next_phase']}",
"",
"## Current Buildings",
"",
]
lines.extend(f"- {item}" for item in status["current_buildings"])
lines.extend([
"",
"## Current Resource Snapshot",
"",
f"- Fleet operational: {'yes' if status['fleet_operational'] else 'no'}",
f"- Uptime baseline: {resources['uptime_percent']:.1f}%",
f"- Days at or above 95% uptime: {resources['days_at_or_above_95_percent']}",
f"- Capacity utilization: {resources['capacity_utilization_percent']:.1f}%",
"",
"## Next Phase Trigger",
"",
f"To unlock {status['next_phase']}, the fleet must hold both of these conditions at once:",
f"- Uptime >= {TARGET_UPTIME_PERCENT:.0f}% for {TARGET_UPTIME_DAYS} consecutive days",
f"- Capacity utilization > {TARGET_CAPACITY_PERCENT:.0f}%",
f"- Current trigger state: {ready_line}",
"",
"## Missing Requirements",
"",
])
if missing:
lines.extend(f"- {item}" for item in missing)
else:
lines.append("- None. Phase 2 can unlock now.")
lines.extend([
"",
"## Manual Clicker Interpretation",
"",
"Paperclips analogy: Phase 1 = Manual clicker. You ARE the automation.",
"Every restart, every SSH, every check is a manual click.",
"",
"## Manual Clicks Still Required",
"",
])
lines.extend(f"- {item}" for item in status["manual_clicks"])
lines.extend([
"",
"## Repo Signals Already Present",
"",
])
if status["repo_signals"]:
lines.extend(f"- {item}" for item in status["repo_signals"])
else:
lines.append("- No survival-adjacent repo signals detected.")
if status["notes"]:
lines.extend(["", "## Notes", ""])
lines.extend(f"- {item}" for item in status["notes"])
return "\n".join(lines).rstrip() + "\n"
def main() -> None:
parser = argparse.ArgumentParser(description="Render the fleet phase-1 survival report")
parser.add_argument("--snapshot", help="Optional JSON snapshot overriding the default phase-1 baseline")
parser.add_argument("--output", help="Write markdown report to this path")
parser.add_argument("--json", action="store_true", help="Print computed status as JSON instead of markdown")
args = parser.parse_args()
snapshot = load_snapshot(Path(args.snapshot).expanduser() if args.snapshot else None)
repo_root = Path(__file__).resolve().parents[1]
status = compute_phase_status(snapshot, repo_root=repo_root)
if args.json:
rendered = json.dumps(status, indent=2)
else:
rendered = render_markdown(status)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"Phase status written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,79 @@
import json
from scripts.dynamic_dispatch_optimizer import generate_plan, write_plan
def test_generate_plan_rebalances_offline_host_tasks_to_online_capacity():
spec = {
"hosts": [
{"name": "ezra", "capacity": 2, "lanes": ["research", "general"]},
{"name": "bezalel", "capacity": 2, "lanes": ["build", "general"]},
{"name": "local", "capacity": 1, "lanes": ["general"], "always_available": True},
],
"tasks": [
{"id": "ISSUE-1", "lane": "build", "priority": 100},
{"id": "ISSUE-2", "lane": "general", "priority": 80},
{"id": "ISSUE-3", "lane": "research", "priority": 60},
],
}
failover_status = {"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"}}
plan = generate_plan(spec, failover_status)
assignments = {item["task_id"]: item["host"] for item in plan["assignments"]}
assert assignments == {
"ISSUE-1": "local",
"ISSUE-2": "ezra",
"ISSUE-3": "ezra",
}
assert plan["offline_hosts"] == ["bezalel"]
assert plan["unassigned"] == []
def test_generate_plan_prefers_preferred_host_when_online():
spec = {
"hosts": [
{"name": "ezra", "capacity": 2, "lanes": ["general"]},
{"name": "bezalel", "capacity": 2, "lanes": ["general"]},
],
"tasks": [
{"id": "ISSUE-9", "lane": "general", "priority": 100, "preferred_hosts": ["bezalel", "ezra"]},
],
}
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE", "bezalel": "ONLINE"}})
assert plan["assignments"] == [
{"task_id": "ISSUE-9", "host": "bezalel", "lane": "general", "priority": 100}
]
def test_generate_plan_reports_unassigned_when_no_host_matches_lane():
spec = {
"hosts": [
{"name": "ezra", "capacity": 1, "lanes": ["research"]},
],
"tasks": [
{"id": "ISSUE-5", "lane": "build", "priority": 50},
],
}
plan = generate_plan(spec, {"fleet": {"ezra": "ONLINE"}})
assert plan["assignments"] == []
assert plan["unassigned"] == [
{"task_id": "ISSUE-5", "reason": "no_online_host_for_lane:build"}
]
def test_write_plan_persists_json(tmp_path):
plan = {
"assignments": [{"task_id": "ISSUE-1", "host": "ezra", "lane": "general", "priority": 10}],
"offline_hosts": [],
"unassigned": [],
}
output_path = tmp_path / "dispatch-plan.json"
write_plan(plan, output_path)
assert json.loads(output_path.read_text()) == plan

View File

@@ -1,67 +0,0 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT_PATH = ROOT / "scripts" / "fleet_phase_status.py"
DOC_PATH = ROOT / "docs" / "FLEET_PHASE_1_SURVIVAL.md"
def _load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_compute_phase_status_tracks_survival_gate_requirements() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(
{
"fleet_operational": True,
"resources": {
"uptime_percent": 94.5,
"days_at_or_above_95_percent": 12,
"capacity_utilization_percent": 45.0,
},
}
)
assert status["current_phase"] == "PHASE-1 Survival"
assert status["next_phase_ready"] is False
assert any("94.5% / 95.0%" in item for item in status["missing_requirements"])
assert any("12/30" in item for item in status["missing_requirements"])
assert any("45.0% / >60.0%" in item for item in status["missing_requirements"])
def test_render_markdown_preserves_phase_buildings_and_manual_clicker_language() -> None:
mod = _load_module(SCRIPT_PATH, "fleet_phase_status")
status = mod.compute_phase_status(mod.default_snapshot())
report = mod.render_markdown(status)
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"VPS hosts: Ezra, Allegro, Bezalel",
"Timmy harness",
"Gitea forge",
"Evennia worlds",
"Every restart, every SSH, every check is a manual click.",
):
assert snippet in report
def test_repo_contains_generated_phase_1_doc() -> None:
assert DOC_PATH.exists(), "missing committed phase-1 survival doc"
text = DOC_PATH.read_text(encoding="utf-8")
for snippet in (
"# [PHASE-1] Survival - Keep the Lights On",
"## Current Buildings",
"## Next Phase Trigger",
"## Manual Clicker Interpretation",
):
assert snippet in text