diff --git a/docs/PREDICTIVE_RESOURCE_ALLOCATION.md b/docs/PREDICTIVE_RESOURCE_ALLOCATION.md new file mode 100644 index 0000000..5111d13 --- /dev/null +++ b/docs/PREDICTIVE_RESOURCE_ALLOCATION.md @@ -0,0 +1,50 @@ +# Predictive Resource Allocation + +This feature forecasts near-term fleet pressure from historical metrics and heartbeat logs so the operator can act before the system exhausts itself. + +## Source data + +- `metrics/local_*.jsonl` — historical request cadence, prompt size, success/failure, caller names +- `heartbeat/ticks_*.jsonl` — control-plane health (`gitea_alive`) and inference health (`inference_ok`) + +## Script + +`python3 scripts/predictive_resource_allocator.py --json` + +The script computes: +- `resource_mode` +- `dispatch_posture` +- recent vs baseline request rate/hour +- `surge_factor` +- recent forge outages +- recent inference failures +- top callers in the recent window + +## Example operator actions + +- Pre-warm local inference before the next forecast window. +- Throttle or defer large background jobs until off-peak capacity is available. +- Investigate local model reliability and reserve headroom for heartbeat traffic. +- Pre-fetch or cache forge state before the next dispatch window. + +## Why this matters + +Predictive allocation is the missing bridge between passive dashboards and active sovereignty. +Instead of merely noticing resource exhaustion after the fact, the fleet can infer likely pressure from: +- rising heartbeat traffic +- large background batch jobs like `know-thy-father-draft:*` +- repeated inference failures +- repeated Gitea outages + +## Output contract + +The script intentionally emits a small, stable schema so future automation can consume it: + +- `resource_mode` +- `dispatch_posture` +- `predicted_request_rate_per_hour` +- `surge_factor` +- `recommended_actions` +- `top_callers_recent` + +That makes it safe to wire into later dashboards, cron nudges, or pre-provisioning hooks without rewriting the forecasting core. diff --git a/scripts/predictive_resource_allocator.py b/scripts/predictive_resource_allocator.py new file mode 100644 index 0000000..2e81bfb --- /dev/null +++ b/scripts/predictive_resource_allocator.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +"""Predictive resource allocation for the Timmy fleet. + +Forecasts near-term pressure from historical metrics and heartbeat logs so the +operator can pre-warm models, defer background work, and protect dispatch. +""" + +from __future__ import annotations + +import argparse +import json +from collections import Counter, defaultdict +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Iterable + + +def parse_timestamp(value: str) -> datetime: + return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc) + + +def load_jsonl(path_or_paths: Path | str | Iterable[Path | str]) -> list[dict]: + if isinstance(path_or_paths, (str, Path)): + paths = [Path(path_or_paths)] + else: + paths = [Path(p) for p in path_or_paths] + rows: list[dict] = [] + for path in paths: + if not path.exists(): + continue + with path.open(encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def recent_window_cutoff(rows: list[dict], horizon_hours: int) -> datetime: + latest = max(parse_timestamp(r["timestamp"]) for r in rows) + return latest - timedelta(hours=horizon_hours) + + +def summarize_callers(rows: list[dict], cutoff: datetime) -> list[dict]: + counts: Counter[str] = Counter() + prompt_tokens: Counter[str] = Counter() + for row in rows: + ts = parse_timestamp(row["timestamp"]) + if ts < cutoff: + continue + caller = row.get("caller", "unknown") + counts[caller] += 1 + prompt_tokens[caller] += int(row.get("prompt_len", 0)) + summary = [ + {"caller": caller, "requests": counts[caller], "prompt_tokens": prompt_tokens[caller]} + for caller in counts + ] + summary.sort(key=lambda item: (-item["requests"], -item["prompt_tokens"], item["caller"])) + return summary + + +def compute_request_rates(rows: list[dict], horizon_hours: int) -> tuple[float, float, float, float, float]: + if not rows: + return 0.0, 0.0, 1.0, 0.0, 0.0 + latest = max(parse_timestamp(r["timestamp"]) for r in rows) + recent_cutoff = latest - timedelta(hours=horizon_hours) + baseline_cutoff = latest - timedelta(hours=horizon_hours * 2) + + recent = [r for r in rows if parse_timestamp(r["timestamp"]) >= recent_cutoff] + baseline = [r for r in rows if baseline_cutoff <= parse_timestamp(r["timestamp"]) < recent_cutoff] + + recent_rate = len(recent) / float(horizon_hours) + baseline_rate = (len(baseline) / float(horizon_hours)) if baseline else max(1.0, recent_rate) + + recent_prompt_rate = sum(int(r.get("prompt_len", 0)) for r in recent) / float(horizon_hours) + baseline_prompt_rate = ( + sum(int(r.get("prompt_len", 0)) for r in baseline) / float(horizon_hours) + if baseline else max(1.0, recent_prompt_rate) + ) + + request_surge = recent_rate / baseline_rate if baseline_rate else 1.0 + prompt_surge = recent_prompt_rate / baseline_prompt_rate if baseline_prompt_rate else 1.0 + surge_factor = max(request_surge, prompt_surge) + return recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate + + +def count_recent_heartbeat_risks(rows: list[dict], horizon_hours: int) -> tuple[int, int]: + if not rows: + return 0, 0 + cutoff = recent_window_cutoff(rows, horizon_hours) + gitea_outages = 0 + inference_failures = 0 + for row in rows: + ts = parse_timestamp(row["timestamp"]) + if ts < cutoff: + continue + perception = row.get("perception", {}) + if perception.get("gitea_alive") is False: + gitea_outages += 1 + model_health = perception.get("model_health", {}) + if model_health.get("inference_ok") is False: + inference_failures += 1 + return gitea_outages, inference_failures + + +def build_recommendations( + surge_factor: float, + top_callers_recent: list[dict], + gitea_outages_recent: int, + inference_failures_recent: int, +) -> tuple[str, str, list[str]]: + resource_mode = "steady" + dispatch_posture = "normal" + actions: list[str] = [] + + if surge_factor > 1.5: + resource_mode = "surge" + actions.append("Pre-warm local inference before the next forecast window.") + + heavy_background = any( + caller["prompt_tokens"] >= 10000 and caller["caller"].startswith("know-thy-father") + for caller in top_callers_recent + ) + if heavy_background: + actions.append("Throttle or defer large background jobs until off-peak capacity is available.") + + if inference_failures_recent >= 2: + resource_mode = "surge" + actions.append("Investigate local model reliability and reserve headroom for heartbeat traffic.") + + if gitea_outages_recent >= 1: + dispatch_posture = "degraded" + actions.append("Pre-fetch or cache forge state before the next dispatch window.") + + if not actions: + actions.append("Maintain current resource allocation; no surge indicators detected.") + + return resource_mode, dispatch_posture, actions + + +def forecast_resources( + metrics_paths: Path | str | Iterable[Path | str], + heartbeat_paths: Path | str | Iterable[Path | str], + horizon_hours: int = 6, +) -> dict: + metric_rows = load_jsonl(metrics_paths) + heartbeat_rows = load_jsonl(heartbeat_paths) + + recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate = compute_request_rates(metric_rows, horizon_hours) + cutoff = recent_window_cutoff(metric_rows or heartbeat_rows or [{"timestamp": datetime.now(timezone.utc).isoformat()}], horizon_hours) if (metric_rows or heartbeat_rows) else datetime.now(timezone.utc) + top_callers_recent = summarize_callers(metric_rows, cutoff) if metric_rows else [] + gitea_outages_recent, inference_failures_recent = count_recent_heartbeat_risks(heartbeat_rows, horizon_hours) + resource_mode, dispatch_posture, recommended_actions = build_recommendations( + surge_factor, top_callers_recent, gitea_outages_recent, inference_failures_recent + ) + + predicted_request_rate = round(max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2) + predicted_prompt_tokens = sum(caller["prompt_tokens"] for caller in top_callers_recent) + + return { + "horizon_hours": horizon_hours, + "resource_mode": resource_mode, + "dispatch_posture": dispatch_posture, + "recent_request_rate_per_hour": round(recent_rate, 2), + "baseline_request_rate_per_hour": round(baseline_rate, 2), + "predicted_request_rate_per_hour": predicted_request_rate, + "predicted_prompt_tokens_recent_window": predicted_prompt_tokens, + "recent_prompt_tokens_per_hour": round(recent_prompt_rate, 2), + "baseline_prompt_tokens_per_hour": round(baseline_prompt_rate, 2), + "surge_factor": round(surge_factor, 2), + "gitea_outages_recent": gitea_outages_recent, + "inference_failures_recent": inference_failures_recent, + "top_callers_recent": top_callers_recent, + "recommended_actions": recommended_actions, + } + + +def render_markdown(forecast: dict) -> str: + lines = [ + "# Predictive Resource Allocation", + "", + f"Forecast horizon: {forecast['horizon_hours']} hours", + f"resource_mode: {forecast['resource_mode']}", + f"dispatch_posture: {forecast['dispatch_posture']}", + f"Recent request rate/hour: {forecast['recent_request_rate_per_hour']}", + f"Baseline request rate/hour: {forecast['baseline_request_rate_per_hour']}", + f"Predicted request rate/hour: {forecast['predicted_request_rate_per_hour']}", + f"Surge factor: {forecast['surge_factor']}", + f"Recent prompt tokens/hour: {forecast['recent_prompt_tokens_per_hour']}", + f"Baseline prompt tokens/hour: {forecast['baseline_prompt_tokens_per_hour']}", + f"Gitea outages (recent): {forecast['gitea_outages_recent']}", + f"Inference failures (recent): {forecast['inference_failures_recent']}", + "", + "## Recommended actions", + "", + ] + lines.extend(f"- {action}" for action in forecast["recommended_actions"]) + lines.extend([ + "", + "## Top callers in recent window", + "", + "| Caller | Requests | Prompt tokens |", + "|---|---:|---:|", + ]) + for caller in forecast["top_callers_recent"]: + lines.append(f"| {caller['caller']} | {caller['requests']} | {caller['prompt_tokens']} |") + return "\n".join(lines) + "\n" + + +def main() -> int: + parser = argparse.ArgumentParser(description="Forecast fleet resource needs from historical metrics") + parser.add_argument("--metrics", nargs="*", default=["metrics/local_20260326.jsonl", "metrics/local_20260327.jsonl", "metrics/local_20260328.jsonl", "metrics/local_20260329.jsonl", "metrics/local_20260330.jsonl"]) + parser.add_argument("--heartbeat", nargs="*", default=["heartbeat/ticks_20260325.jsonl", "heartbeat/ticks_20260326.jsonl", "heartbeat/ticks_20260327.jsonl", "heartbeat/ticks_20260328.jsonl", "heartbeat/ticks_20260329.jsonl", "heartbeat/ticks_20260330.jsonl"]) + parser.add_argument("--horizon-hours", type=int, default=6) + parser.add_argument("--json", action="store_true") + args = parser.parse_args() + + forecast = forecast_resources(args.metrics, args.heartbeat, horizon_hours=args.horizon_hours) + if args.json: + print(json.dumps(forecast, indent=2)) + else: + print(render_markdown(forecast)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_predictive_resource_allocator.py b/tests/test_predictive_resource_allocator.py new file mode 100644 index 0000000..8cdf0d7 --- /dev/null +++ b/tests/test_predictive_resource_allocator.py @@ -0,0 +1,123 @@ +from pathlib import Path +import importlib.util +import json + + +ROOT = Path(__file__).resolve().parent.parent +SCRIPT_PATH = ROOT / "scripts" / "predictive_resource_allocator.py" +DOC_PATH = ROOT / "docs" / "PREDICTIVE_RESOURCE_ALLOCATION.md" + + +def load_module(path: Path, name: str): + assert path.exists(), f"missing {path.relative_to(ROOT)}" + spec = importlib.util.spec_from_file_location(name, path) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def write_jsonl(path: Path, rows: list[dict]) -> None: + path.write_text("".join(json.dumps(row) + "\n" for row in rows), encoding="utf-8") + + +def test_forecast_detects_recent_surge_and_recommends_prewarm(tmp_path): + mod = load_module(SCRIPT_PATH, "predictive_resource_allocator") + + metrics_path = tmp_path / "metrics.jsonl" + heartbeat_path = tmp_path / "heartbeat.jsonl" + + metric_rows = [] + # baseline: 1 req/hour for 6 earlier hours + for hour in range(6): + metric_rows.append({ + "timestamp": f"2026-03-29T0{hour}:00:00+00:00", + "caller": "heartbeat_tick", + "prompt_len": 1000, + "response_len": 50, + "success": True, + }) + # recent surge: 5 req/hour plus large batch job + for minute in [0, 10, 20, 30, 40]: + metric_rows.append({ + "timestamp": f"2026-03-29T12:{minute:02d}:00+00:00", + "caller": "heartbeat_tick", + "prompt_len": 1200, + "response_len": 50, + "success": True, + }) + metric_rows.append({ + "timestamp": "2026-03-29T12:15:00+00:00", + "caller": "know-thy-father-draft:batch_003", + "prompt_len": 14420, + "response_len": 50, + "success": True, + }) + write_jsonl(metrics_path, metric_rows) + + heartbeat_rows = [ + { + "timestamp": "2026-03-29T12:10:00+00:00", + "perception": {"gitea_alive": True, "model_health": {"inference_ok": False}}, + }, + { + "timestamp": "2026-03-29T12:20:00+00:00", + "perception": {"gitea_alive": True, "model_health": {"inference_ok": False}}, + }, + ] + write_jsonl(heartbeat_path, heartbeat_rows) + + forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6) + + assert forecast["resource_mode"] == "surge" + assert forecast["surge_factor"] > 1.5 + assert any("Pre-warm local inference" in action for action in forecast["recommended_actions"]) + assert any("Throttle or defer large background jobs" in action for action in forecast["recommended_actions"]) + assert forecast["top_callers_recent"][0]["caller"] == "heartbeat_tick" + + +def test_forecast_detects_control_plane_risk_from_gitea_outage(tmp_path): + mod = load_module(SCRIPT_PATH, "predictive_resource_allocator") + + metrics_path = tmp_path / "metrics.jsonl" + heartbeat_path = tmp_path / "heartbeat.jsonl" + write_jsonl(metrics_path, [ + { + "timestamp": "2026-03-29T13:00:00+00:00", + "caller": "heartbeat_tick", + "prompt_len": 1000, + "response_len": 50, + "success": True, + } + ]) + write_jsonl(heartbeat_path, [ + { + "timestamp": "2026-03-29T13:00:00+00:00", + "perception": {"gitea_alive": False, "model_health": {"inference_ok": True}}, + }, + { + "timestamp": "2026-03-29T13:10:00+00:00", + "perception": {"gitea_alive": False, "model_health": {"inference_ok": True}}, + }, + ]) + + forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6) + + assert forecast["gitea_outages_recent"] == 2 + assert any("Pre-fetch or cache forge state" in action for action in forecast["recommended_actions"]) + assert forecast["dispatch_posture"] == "degraded" + + +def test_repo_contains_predictive_resource_allocation_doc(): + assert DOC_PATH.exists(), "missing predictive resource allocation doc" + text = DOC_PATH.read_text(encoding="utf-8") + required = [ + "# Predictive Resource Allocation", + "scripts/predictive_resource_allocator.py", + "resource_mode", + "dispatch_posture", + "Pre-warm local inference", + "Throttle or defer large background jobs", + ] + for snippet in required: + assert snippet in text