Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e4c4c21170 feat: add predictive resource allocation forecast (#749)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 21s
2026-04-15 21:08:26 -04:00
3 changed files with 400 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
# Predictive Resource Allocation
This feature forecasts near-term fleet pressure from historical metrics and heartbeat logs so the operator can act before the system exhausts itself.
## Source data
- `metrics/local_*.jsonl` — historical request cadence, prompt size, success/failure, caller names
- `heartbeat/ticks_*.jsonl` — control-plane health (`gitea_alive`) and inference health (`inference_ok`)
## Script
`python3 scripts/predictive_resource_allocator.py --json`
The script computes:
- `resource_mode`
- `dispatch_posture`
- recent vs baseline request rate/hour
- `surge_factor`
- recent forge outages
- recent inference failures
- top callers in the recent window
## Example operator actions
- Pre-warm local inference before the next forecast window.
- Throttle or defer large background jobs until off-peak capacity is available.
- Investigate local model reliability and reserve headroom for heartbeat traffic.
- Pre-fetch or cache forge state before the next dispatch window.
## Why this matters
Predictive allocation is the missing bridge between passive dashboards and active sovereignty.
Instead of merely noticing resource exhaustion after the fact, the fleet can infer likely pressure from:
- rising heartbeat traffic
- large background batch jobs like `know-thy-father-draft:*`
- repeated inference failures
- repeated Gitea outages
## Output contract
The script intentionally emits a small, stable schema so future automation can consume it:
- `resource_mode`
- `dispatch_posture`
- `predicted_request_rate_per_hour`
- `surge_factor`
- `recommended_actions`
- `top_callers_recent`
That makes it safe to wire into later dashboards, cron nudges, or pre-provisioning hooks without rewriting the forecasting core.

View File

@@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""Predictive resource allocation for the Timmy fleet.
Forecasts near-term pressure from historical metrics and heartbeat logs so the
operator can pre-warm models, defer background work, and protect dispatch.
"""
from __future__ import annotations
import argparse
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable
def parse_timestamp(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(path_or_paths: Path | str | Iterable[Path | str]) -> list[dict]:
if isinstance(path_or_paths, (str, Path)):
paths = [Path(path_or_paths)]
else:
paths = [Path(p) for p in path_or_paths]
rows: list[dict] = []
for path in paths:
if not path.exists():
continue
with path.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def recent_window_cutoff(rows: list[dict], horizon_hours: int) -> datetime:
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
return latest - timedelta(hours=horizon_hours)
def summarize_callers(rows: list[dict], cutoff: datetime) -> list[dict]:
counts: Counter[str] = Counter()
prompt_tokens: Counter[str] = Counter()
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
counts[caller] += 1
prompt_tokens[caller] += int(row.get("prompt_len", 0))
summary = [
{"caller": caller, "requests": counts[caller], "prompt_tokens": prompt_tokens[caller]}
for caller in counts
]
summary.sort(key=lambda item: (-item["requests"], -item["prompt_tokens"], item["caller"]))
return summary
def compute_request_rates(rows: list[dict], horizon_hours: int) -> tuple[float, float, float, float, float]:
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if parse_timestamp(r["timestamp"]) >= recent_cutoff]
baseline = [r for r in rows if baseline_cutoff <= parse_timestamp(r["timestamp"]) < recent_cutoff]
recent_rate = len(recent) / float(horizon_hours)
baseline_rate = (len(baseline) / float(horizon_hours)) if baseline else max(1.0, recent_rate)
recent_prompt_rate = sum(int(r.get("prompt_len", 0)) for r in recent) / float(horizon_hours)
baseline_prompt_rate = (
sum(int(r.get("prompt_len", 0)) for r in baseline) / float(horizon_hours)
if baseline else max(1.0, recent_prompt_rate)
)
request_surge = recent_rate / baseline_rate if baseline_rate else 1.0
prompt_surge = recent_prompt_rate / baseline_prompt_rate if baseline_prompt_rate else 1.0
surge_factor = max(request_surge, prompt_surge)
return recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate
def count_recent_heartbeat_risks(rows: list[dict], horizon_hours: int) -> tuple[int, int]:
if not rows:
return 0, 0
cutoff = recent_window_cutoff(rows, horizon_hours)
gitea_outages = 0
inference_failures = 0
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return gitea_outages, inference_failures
def build_recommendations(
surge_factor: float,
top_callers_recent: list[dict],
gitea_outages_recent: int,
inference_failures_recent: int,
) -> tuple[str, str, list[str]]:
resource_mode = "steady"
dispatch_posture = "normal"
actions: list[str] = []
if surge_factor > 1.5:
resource_mode = "surge"
actions.append("Pre-warm local inference before the next forecast window.")
heavy_background = any(
caller["prompt_tokens"] >= 10000 and caller["caller"].startswith("know-thy-father")
for caller in top_callers_recent
)
if heavy_background:
actions.append("Throttle or defer large background jobs until off-peak capacity is available.")
if inference_failures_recent >= 2:
resource_mode = "surge"
actions.append("Investigate local model reliability and reserve headroom for heartbeat traffic.")
if gitea_outages_recent >= 1:
dispatch_posture = "degraded"
actions.append("Pre-fetch or cache forge state before the next dispatch window.")
if not actions:
actions.append("Maintain current resource allocation; no surge indicators detected.")
return resource_mode, dispatch_posture, actions
def forecast_resources(
metrics_paths: Path | str | Iterable[Path | str],
heartbeat_paths: Path | str | Iterable[Path | str],
horizon_hours: int = 6,
) -> dict:
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate = compute_request_rates(metric_rows, horizon_hours)
cutoff = recent_window_cutoff(metric_rows or heartbeat_rows or [{"timestamp": datetime.now(timezone.utc).isoformat()}], horizon_hours) if (metric_rows or heartbeat_rows) else datetime.now(timezone.utc)
top_callers_recent = summarize_callers(metric_rows, cutoff) if metric_rows else []
gitea_outages_recent, inference_failures_recent = count_recent_heartbeat_risks(heartbeat_rows, horizon_hours)
resource_mode, dispatch_posture, recommended_actions = build_recommendations(
surge_factor, top_callers_recent, gitea_outages_recent, inference_failures_recent
)
predicted_request_rate = round(max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2)
predicted_prompt_tokens = sum(caller["prompt_tokens"] for caller in top_callers_recent)
return {
"horizon_hours": horizon_hours,
"resource_mode": resource_mode,
"dispatch_posture": dispatch_posture,
"recent_request_rate_per_hour": round(recent_rate, 2),
"baseline_request_rate_per_hour": round(baseline_rate, 2),
"predicted_request_rate_per_hour": predicted_request_rate,
"predicted_prompt_tokens_recent_window": predicted_prompt_tokens,
"recent_prompt_tokens_per_hour": round(recent_prompt_rate, 2),
"baseline_prompt_tokens_per_hour": round(baseline_prompt_rate, 2),
"surge_factor": round(surge_factor, 2),
"gitea_outages_recent": gitea_outages_recent,
"inference_failures_recent": inference_failures_recent,
"top_callers_recent": top_callers_recent,
"recommended_actions": recommended_actions,
}
def render_markdown(forecast: dict) -> str:
lines = [
"# Predictive Resource Allocation",
"",
f"Forecast horizon: {forecast['horizon_hours']} hours",
f"resource_mode: {forecast['resource_mode']}",
f"dispatch_posture: {forecast['dispatch_posture']}",
f"Recent request rate/hour: {forecast['recent_request_rate_per_hour']}",
f"Baseline request rate/hour: {forecast['baseline_request_rate_per_hour']}",
f"Predicted request rate/hour: {forecast['predicted_request_rate_per_hour']}",
f"Surge factor: {forecast['surge_factor']}",
f"Recent prompt tokens/hour: {forecast['recent_prompt_tokens_per_hour']}",
f"Baseline prompt tokens/hour: {forecast['baseline_prompt_tokens_per_hour']}",
f"Gitea outages (recent): {forecast['gitea_outages_recent']}",
f"Inference failures (recent): {forecast['inference_failures_recent']}",
"",
"## Recommended actions",
"",
]
lines.extend(f"- {action}" for action in forecast["recommended_actions"])
lines.extend([
"",
"## Top callers in recent window",
"",
"| Caller | Requests | Prompt tokens |",
"|---|---:|---:|",
])
for caller in forecast["top_callers_recent"]:
lines.append(f"| {caller['caller']} | {caller['requests']} | {caller['prompt_tokens']} |")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Forecast fleet resource needs from historical metrics")
parser.add_argument("--metrics", nargs="*", default=["metrics/local_20260326.jsonl", "metrics/local_20260327.jsonl", "metrics/local_20260328.jsonl", "metrics/local_20260329.jsonl", "metrics/local_20260330.jsonl"])
parser.add_argument("--heartbeat", nargs="*", default=["heartbeat/ticks_20260325.jsonl", "heartbeat/ticks_20260326.jsonl", "heartbeat/ticks_20260327.jsonl", "heartbeat/ticks_20260328.jsonl", "heartbeat/ticks_20260329.jsonl", "heartbeat/ticks_20260330.jsonl"])
parser.add_argument("--horizon-hours", type=int, default=6)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
forecast = forecast_resources(args.metrics, args.heartbeat, horizon_hours=args.horizon_hours)
if args.json:
print(json.dumps(forecast, indent=2))
else:
print(render_markdown(forecast))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,123 @@
from pathlib import Path
import importlib.util
import json
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "predictive_resource_allocator.py"
DOC_PATH = ROOT / "docs" / "PREDICTIVE_RESOURCE_ALLOCATION.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def write_jsonl(path: Path, rows: list[dict]) -> None:
path.write_text("".join(json.dumps(row) + "\n" for row in rows), encoding="utf-8")
def test_forecast_detects_recent_surge_and_recommends_prewarm(tmp_path):
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
metric_rows = []
# baseline: 1 req/hour for 6 earlier hours
for hour in range(6):
metric_rows.append({
"timestamp": f"2026-03-29T0{hour}:00:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1000,
"response_len": 50,
"success": True,
})
# recent surge: 5 req/hour plus large batch job
for minute in [0, 10, 20, 30, 40]:
metric_rows.append({
"timestamp": f"2026-03-29T12:{minute:02d}:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1200,
"response_len": 50,
"success": True,
})
metric_rows.append({
"timestamp": "2026-03-29T12:15:00+00:00",
"caller": "know-thy-father-draft:batch_003",
"prompt_len": 14420,
"response_len": 50,
"success": True,
})
write_jsonl(metrics_path, metric_rows)
heartbeat_rows = [
{
"timestamp": "2026-03-29T12:10:00+00:00",
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
},
{
"timestamp": "2026-03-29T12:20:00+00:00",
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
},
]
write_jsonl(heartbeat_path, heartbeat_rows)
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
assert forecast["resource_mode"] == "surge"
assert forecast["surge_factor"] > 1.5
assert any("Pre-warm local inference" in action for action in forecast["recommended_actions"])
assert any("Throttle or defer large background jobs" in action for action in forecast["recommended_actions"])
assert forecast["top_callers_recent"][0]["caller"] == "heartbeat_tick"
def test_forecast_detects_control_plane_risk_from_gitea_outage(tmp_path):
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
write_jsonl(metrics_path, [
{
"timestamp": "2026-03-29T13:00:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1000,
"response_len": 50,
"success": True,
}
])
write_jsonl(heartbeat_path, [
{
"timestamp": "2026-03-29T13:00:00+00:00",
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
},
{
"timestamp": "2026-03-29T13:10:00+00:00",
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
},
])
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
assert forecast["gitea_outages_recent"] == 2
assert any("Pre-fetch or cache forge state" in action for action in forecast["recommended_actions"])
assert forecast["dispatch_posture"] == "degraded"
def test_repo_contains_predictive_resource_allocation_doc():
assert DOC_PATH.exists(), "missing predictive resource allocation doc"
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Predictive Resource Allocation",
"scripts/predictive_resource_allocator.py",
"resource_mode",
"dispatch_posture",
"Pre-warm local inference",
"Throttle or defer large background jobs",
]
for snippet in required:
assert snippet in text