Files
timmy-home/scripts/predictive_resource_allocator.py
Alexander Whitestone e4c4c21170
Some checks failed
Smoke Test / smoke (pull_request) Failing after 21s
feat: add predictive resource allocation forecast (#749)
2026-04-15 21:08:26 -04:00

228 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""Predictive resource allocation for the Timmy fleet.
Forecasts near-term pressure from historical metrics and heartbeat logs so the
operator can pre-warm models, defer background work, and protect dispatch.
"""
from __future__ import annotations
import argparse
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable
def parse_timestamp(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(path_or_paths: Path | str | Iterable[Path | str]) -> list[dict]:
if isinstance(path_or_paths, (str, Path)):
paths = [Path(path_or_paths)]
else:
paths = [Path(p) for p in path_or_paths]
rows: list[dict] = []
for path in paths:
if not path.exists():
continue
with path.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def recent_window_cutoff(rows: list[dict], horizon_hours: int) -> datetime:
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
return latest - timedelta(hours=horizon_hours)
def summarize_callers(rows: list[dict], cutoff: datetime) -> list[dict]:
counts: Counter[str] = Counter()
prompt_tokens: Counter[str] = Counter()
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
counts[caller] += 1
prompt_tokens[caller] += int(row.get("prompt_len", 0))
summary = [
{"caller": caller, "requests": counts[caller], "prompt_tokens": prompt_tokens[caller]}
for caller in counts
]
summary.sort(key=lambda item: (-item["requests"], -item["prompt_tokens"], item["caller"]))
return summary
def compute_request_rates(rows: list[dict], horizon_hours: int) -> tuple[float, float, float, float, float]:
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if parse_timestamp(r["timestamp"]) >= recent_cutoff]
baseline = [r for r in rows if baseline_cutoff <= parse_timestamp(r["timestamp"]) < recent_cutoff]
recent_rate = len(recent) / float(horizon_hours)
baseline_rate = (len(baseline) / float(horizon_hours)) if baseline else max(1.0, recent_rate)
recent_prompt_rate = sum(int(r.get("prompt_len", 0)) for r in recent) / float(horizon_hours)
baseline_prompt_rate = (
sum(int(r.get("prompt_len", 0)) for r in baseline) / float(horizon_hours)
if baseline else max(1.0, recent_prompt_rate)
)
request_surge = recent_rate / baseline_rate if baseline_rate else 1.0
prompt_surge = recent_prompt_rate / baseline_prompt_rate if baseline_prompt_rate else 1.0
surge_factor = max(request_surge, prompt_surge)
return recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate
def count_recent_heartbeat_risks(rows: list[dict], horizon_hours: int) -> tuple[int, int]:
if not rows:
return 0, 0
cutoff = recent_window_cutoff(rows, horizon_hours)
gitea_outages = 0
inference_failures = 0
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return gitea_outages, inference_failures
def build_recommendations(
surge_factor: float,
top_callers_recent: list[dict],
gitea_outages_recent: int,
inference_failures_recent: int,
) -> tuple[str, str, list[str]]:
resource_mode = "steady"
dispatch_posture = "normal"
actions: list[str] = []
if surge_factor > 1.5:
resource_mode = "surge"
actions.append("Pre-warm local inference before the next forecast window.")
heavy_background = any(
caller["prompt_tokens"] >= 10000 and caller["caller"].startswith("know-thy-father")
for caller in top_callers_recent
)
if heavy_background:
actions.append("Throttle or defer large background jobs until off-peak capacity is available.")
if inference_failures_recent >= 2:
resource_mode = "surge"
actions.append("Investigate local model reliability and reserve headroom for heartbeat traffic.")
if gitea_outages_recent >= 1:
dispatch_posture = "degraded"
actions.append("Pre-fetch or cache forge state before the next dispatch window.")
if not actions:
actions.append("Maintain current resource allocation; no surge indicators detected.")
return resource_mode, dispatch_posture, actions
def forecast_resources(
metrics_paths: Path | str | Iterable[Path | str],
heartbeat_paths: Path | str | Iterable[Path | str],
horizon_hours: int = 6,
) -> dict:
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate = compute_request_rates(metric_rows, horizon_hours)
cutoff = recent_window_cutoff(metric_rows or heartbeat_rows or [{"timestamp": datetime.now(timezone.utc).isoformat()}], horizon_hours) if (metric_rows or heartbeat_rows) else datetime.now(timezone.utc)
top_callers_recent = summarize_callers(metric_rows, cutoff) if metric_rows else []
gitea_outages_recent, inference_failures_recent = count_recent_heartbeat_risks(heartbeat_rows, horizon_hours)
resource_mode, dispatch_posture, recommended_actions = build_recommendations(
surge_factor, top_callers_recent, gitea_outages_recent, inference_failures_recent
)
predicted_request_rate = round(max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2)
predicted_prompt_tokens = sum(caller["prompt_tokens"] for caller in top_callers_recent)
return {
"horizon_hours": horizon_hours,
"resource_mode": resource_mode,
"dispatch_posture": dispatch_posture,
"recent_request_rate_per_hour": round(recent_rate, 2),
"baseline_request_rate_per_hour": round(baseline_rate, 2),
"predicted_request_rate_per_hour": predicted_request_rate,
"predicted_prompt_tokens_recent_window": predicted_prompt_tokens,
"recent_prompt_tokens_per_hour": round(recent_prompt_rate, 2),
"baseline_prompt_tokens_per_hour": round(baseline_prompt_rate, 2),
"surge_factor": round(surge_factor, 2),
"gitea_outages_recent": gitea_outages_recent,
"inference_failures_recent": inference_failures_recent,
"top_callers_recent": top_callers_recent,
"recommended_actions": recommended_actions,
}
def render_markdown(forecast: dict) -> str:
lines = [
"# Predictive Resource Allocation",
"",
f"Forecast horizon: {forecast['horizon_hours']} hours",
f"resource_mode: {forecast['resource_mode']}",
f"dispatch_posture: {forecast['dispatch_posture']}",
f"Recent request rate/hour: {forecast['recent_request_rate_per_hour']}",
f"Baseline request rate/hour: {forecast['baseline_request_rate_per_hour']}",
f"Predicted request rate/hour: {forecast['predicted_request_rate_per_hour']}",
f"Surge factor: {forecast['surge_factor']}",
f"Recent prompt tokens/hour: {forecast['recent_prompt_tokens_per_hour']}",
f"Baseline prompt tokens/hour: {forecast['baseline_prompt_tokens_per_hour']}",
f"Gitea outages (recent): {forecast['gitea_outages_recent']}",
f"Inference failures (recent): {forecast['inference_failures_recent']}",
"",
"## Recommended actions",
"",
]
lines.extend(f"- {action}" for action in forecast["recommended_actions"])
lines.extend([
"",
"## Top callers in recent window",
"",
"| Caller | Requests | Prompt tokens |",
"|---|---:|---:|",
])
for caller in forecast["top_callers_recent"]:
lines.append(f"| {caller['caller']} | {caller['requests']} | {caller['prompt_tokens']} |")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Forecast fleet resource needs from historical metrics")
parser.add_argument("--metrics", nargs="*", default=["metrics/local_20260326.jsonl", "metrics/local_20260327.jsonl", "metrics/local_20260328.jsonl", "metrics/local_20260329.jsonl", "metrics/local_20260330.jsonl"])
parser.add_argument("--heartbeat", nargs="*", default=["heartbeat/ticks_20260325.jsonl", "heartbeat/ticks_20260326.jsonl", "heartbeat/ticks_20260327.jsonl", "heartbeat/ticks_20260328.jsonl", "heartbeat/ticks_20260329.jsonl", "heartbeat/ticks_20260330.jsonl"])
parser.add_argument("--horizon-hours", type=int, default=6)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
forecast = forecast_resources(args.metrics, args.heartbeat, horizon_hours=args.horizon_hours)
if args.json:
print(json.dumps(forecast, indent=2))
else:
print(render_markdown(forecast))
return 0
if __name__ == "__main__":
raise SystemExit(main())