Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4c4c21170 |
50
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
50
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
@@ -0,0 +1,50 @@
|
||||
# Predictive Resource Allocation
|
||||
|
||||
This feature forecasts near-term fleet pressure from historical metrics and heartbeat logs so the operator can act before the system exhausts itself.
|
||||
|
||||
## Source data
|
||||
|
||||
- `metrics/local_*.jsonl` — historical request cadence, prompt size, success/failure, caller names
|
||||
- `heartbeat/ticks_*.jsonl` — control-plane health (`gitea_alive`) and inference health (`inference_ok`)
|
||||
|
||||
## Script
|
||||
|
||||
`python3 scripts/predictive_resource_allocator.py --json`
|
||||
|
||||
The script computes:
|
||||
- `resource_mode`
|
||||
- `dispatch_posture`
|
||||
- recent vs baseline request rate/hour
|
||||
- `surge_factor`
|
||||
- recent forge outages
|
||||
- recent inference failures
|
||||
- top callers in the recent window
|
||||
|
||||
## Example operator actions
|
||||
|
||||
- Pre-warm local inference before the next forecast window.
|
||||
- Throttle or defer large background jobs until off-peak capacity is available.
|
||||
- Investigate local model reliability and reserve headroom for heartbeat traffic.
|
||||
- Pre-fetch or cache forge state before the next dispatch window.
|
||||
|
||||
## Why this matters
|
||||
|
||||
Predictive allocation is the missing bridge between passive dashboards and active sovereignty.
|
||||
Instead of merely noticing resource exhaustion after the fact, the fleet can infer likely pressure from:
|
||||
- rising heartbeat traffic
|
||||
- large background batch jobs like `know-thy-father-draft:*`
|
||||
- repeated inference failures
|
||||
- repeated Gitea outages
|
||||
|
||||
## Output contract
|
||||
|
||||
The script intentionally emits a small, stable schema so future automation can consume it:
|
||||
|
||||
- `resource_mode`
|
||||
- `dispatch_posture`
|
||||
- `predicted_request_rate_per_hour`
|
||||
- `surge_factor`
|
||||
- `recommended_actions`
|
||||
- `top_callers_recent`
|
||||
|
||||
That makes it safe to wire into later dashboards, cron nudges, or pre-provisioning hooks without rewriting the forecasting core.
|
||||
227
scripts/predictive_resource_allocator.py
Normal file
227
scripts/predictive_resource_allocator.py
Normal file
@@ -0,0 +1,227 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Predictive resource allocation for the Timmy fleet.
|
||||
|
||||
Forecasts near-term pressure from historical metrics and heartbeat logs so the
|
||||
operator can pre-warm models, defer background work, and protect dispatch.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def parse_timestamp(value: str) -> datetime:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
|
||||
|
||||
|
||||
def load_jsonl(path_or_paths: Path | str | Iterable[Path | str]) -> list[dict]:
|
||||
if isinstance(path_or_paths, (str, Path)):
|
||||
paths = [Path(path_or_paths)]
|
||||
else:
|
||||
paths = [Path(p) for p in path_or_paths]
|
||||
rows: list[dict] = []
|
||||
for path in paths:
|
||||
if not path.exists():
|
||||
continue
|
||||
with path.open(encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
line = line.strip()
|
||||
if line:
|
||||
rows.append(json.loads(line))
|
||||
return rows
|
||||
|
||||
|
||||
def recent_window_cutoff(rows: list[dict], horizon_hours: int) -> datetime:
|
||||
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
|
||||
return latest - timedelta(hours=horizon_hours)
|
||||
|
||||
|
||||
def summarize_callers(rows: list[dict], cutoff: datetime) -> list[dict]:
|
||||
counts: Counter[str] = Counter()
|
||||
prompt_tokens: Counter[str] = Counter()
|
||||
for row in rows:
|
||||
ts = parse_timestamp(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
caller = row.get("caller", "unknown")
|
||||
counts[caller] += 1
|
||||
prompt_tokens[caller] += int(row.get("prompt_len", 0))
|
||||
summary = [
|
||||
{"caller": caller, "requests": counts[caller], "prompt_tokens": prompt_tokens[caller]}
|
||||
for caller in counts
|
||||
]
|
||||
summary.sort(key=lambda item: (-item["requests"], -item["prompt_tokens"], item["caller"]))
|
||||
return summary
|
||||
|
||||
|
||||
def compute_request_rates(rows: list[dict], horizon_hours: int) -> tuple[float, float, float, float, float]:
|
||||
if not rows:
|
||||
return 0.0, 0.0, 1.0, 0.0, 0.0
|
||||
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
|
||||
recent_cutoff = latest - timedelta(hours=horizon_hours)
|
||||
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
|
||||
|
||||
recent = [r for r in rows if parse_timestamp(r["timestamp"]) >= recent_cutoff]
|
||||
baseline = [r for r in rows if baseline_cutoff <= parse_timestamp(r["timestamp"]) < recent_cutoff]
|
||||
|
||||
recent_rate = len(recent) / float(horizon_hours)
|
||||
baseline_rate = (len(baseline) / float(horizon_hours)) if baseline else max(1.0, recent_rate)
|
||||
|
||||
recent_prompt_rate = sum(int(r.get("prompt_len", 0)) for r in recent) / float(horizon_hours)
|
||||
baseline_prompt_rate = (
|
||||
sum(int(r.get("prompt_len", 0)) for r in baseline) / float(horizon_hours)
|
||||
if baseline else max(1.0, recent_prompt_rate)
|
||||
)
|
||||
|
||||
request_surge = recent_rate / baseline_rate if baseline_rate else 1.0
|
||||
prompt_surge = recent_prompt_rate / baseline_prompt_rate if baseline_prompt_rate else 1.0
|
||||
surge_factor = max(request_surge, prompt_surge)
|
||||
return recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate
|
||||
|
||||
|
||||
def count_recent_heartbeat_risks(rows: list[dict], horizon_hours: int) -> tuple[int, int]:
|
||||
if not rows:
|
||||
return 0, 0
|
||||
cutoff = recent_window_cutoff(rows, horizon_hours)
|
||||
gitea_outages = 0
|
||||
inference_failures = 0
|
||||
for row in rows:
|
||||
ts = parse_timestamp(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
perception = row.get("perception", {})
|
||||
if perception.get("gitea_alive") is False:
|
||||
gitea_outages += 1
|
||||
model_health = perception.get("model_health", {})
|
||||
if model_health.get("inference_ok") is False:
|
||||
inference_failures += 1
|
||||
return gitea_outages, inference_failures
|
||||
|
||||
|
||||
def build_recommendations(
|
||||
surge_factor: float,
|
||||
top_callers_recent: list[dict],
|
||||
gitea_outages_recent: int,
|
||||
inference_failures_recent: int,
|
||||
) -> tuple[str, str, list[str]]:
|
||||
resource_mode = "steady"
|
||||
dispatch_posture = "normal"
|
||||
actions: list[str] = []
|
||||
|
||||
if surge_factor > 1.5:
|
||||
resource_mode = "surge"
|
||||
actions.append("Pre-warm local inference before the next forecast window.")
|
||||
|
||||
heavy_background = any(
|
||||
caller["prompt_tokens"] >= 10000 and caller["caller"].startswith("know-thy-father")
|
||||
for caller in top_callers_recent
|
||||
)
|
||||
if heavy_background:
|
||||
actions.append("Throttle or defer large background jobs until off-peak capacity is available.")
|
||||
|
||||
if inference_failures_recent >= 2:
|
||||
resource_mode = "surge"
|
||||
actions.append("Investigate local model reliability and reserve headroom for heartbeat traffic.")
|
||||
|
||||
if gitea_outages_recent >= 1:
|
||||
dispatch_posture = "degraded"
|
||||
actions.append("Pre-fetch or cache forge state before the next dispatch window.")
|
||||
|
||||
if not actions:
|
||||
actions.append("Maintain current resource allocation; no surge indicators detected.")
|
||||
|
||||
return resource_mode, dispatch_posture, actions
|
||||
|
||||
|
||||
def forecast_resources(
|
||||
metrics_paths: Path | str | Iterable[Path | str],
|
||||
heartbeat_paths: Path | str | Iterable[Path | str],
|
||||
horizon_hours: int = 6,
|
||||
) -> dict:
|
||||
metric_rows = load_jsonl(metrics_paths)
|
||||
heartbeat_rows = load_jsonl(heartbeat_paths)
|
||||
|
||||
recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate = compute_request_rates(metric_rows, horizon_hours)
|
||||
cutoff = recent_window_cutoff(metric_rows or heartbeat_rows or [{"timestamp": datetime.now(timezone.utc).isoformat()}], horizon_hours) if (metric_rows or heartbeat_rows) else datetime.now(timezone.utc)
|
||||
top_callers_recent = summarize_callers(metric_rows, cutoff) if metric_rows else []
|
||||
gitea_outages_recent, inference_failures_recent = count_recent_heartbeat_risks(heartbeat_rows, horizon_hours)
|
||||
resource_mode, dispatch_posture, recommended_actions = build_recommendations(
|
||||
surge_factor, top_callers_recent, gitea_outages_recent, inference_failures_recent
|
||||
)
|
||||
|
||||
predicted_request_rate = round(max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2)
|
||||
predicted_prompt_tokens = sum(caller["prompt_tokens"] for caller in top_callers_recent)
|
||||
|
||||
return {
|
||||
"horizon_hours": horizon_hours,
|
||||
"resource_mode": resource_mode,
|
||||
"dispatch_posture": dispatch_posture,
|
||||
"recent_request_rate_per_hour": round(recent_rate, 2),
|
||||
"baseline_request_rate_per_hour": round(baseline_rate, 2),
|
||||
"predicted_request_rate_per_hour": predicted_request_rate,
|
||||
"predicted_prompt_tokens_recent_window": predicted_prompt_tokens,
|
||||
"recent_prompt_tokens_per_hour": round(recent_prompt_rate, 2),
|
||||
"baseline_prompt_tokens_per_hour": round(baseline_prompt_rate, 2),
|
||||
"surge_factor": round(surge_factor, 2),
|
||||
"gitea_outages_recent": gitea_outages_recent,
|
||||
"inference_failures_recent": inference_failures_recent,
|
||||
"top_callers_recent": top_callers_recent,
|
||||
"recommended_actions": recommended_actions,
|
||||
}
|
||||
|
||||
|
||||
def render_markdown(forecast: dict) -> str:
|
||||
lines = [
|
||||
"# Predictive Resource Allocation",
|
||||
"",
|
||||
f"Forecast horizon: {forecast['horizon_hours']} hours",
|
||||
f"resource_mode: {forecast['resource_mode']}",
|
||||
f"dispatch_posture: {forecast['dispatch_posture']}",
|
||||
f"Recent request rate/hour: {forecast['recent_request_rate_per_hour']}",
|
||||
f"Baseline request rate/hour: {forecast['baseline_request_rate_per_hour']}",
|
||||
f"Predicted request rate/hour: {forecast['predicted_request_rate_per_hour']}",
|
||||
f"Surge factor: {forecast['surge_factor']}",
|
||||
f"Recent prompt tokens/hour: {forecast['recent_prompt_tokens_per_hour']}",
|
||||
f"Baseline prompt tokens/hour: {forecast['baseline_prompt_tokens_per_hour']}",
|
||||
f"Gitea outages (recent): {forecast['gitea_outages_recent']}",
|
||||
f"Inference failures (recent): {forecast['inference_failures_recent']}",
|
||||
"",
|
||||
"## Recommended actions",
|
||||
"",
|
||||
]
|
||||
lines.extend(f"- {action}" for action in forecast["recommended_actions"])
|
||||
lines.extend([
|
||||
"",
|
||||
"## Top callers in recent window",
|
||||
"",
|
||||
"| Caller | Requests | Prompt tokens |",
|
||||
"|---|---:|---:|",
|
||||
])
|
||||
for caller in forecast["top_callers_recent"]:
|
||||
lines.append(f"| {caller['caller']} | {caller['requests']} | {caller['prompt_tokens']} |")
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Forecast fleet resource needs from historical metrics")
|
||||
parser.add_argument("--metrics", nargs="*", default=["metrics/local_20260326.jsonl", "metrics/local_20260327.jsonl", "metrics/local_20260328.jsonl", "metrics/local_20260329.jsonl", "metrics/local_20260330.jsonl"])
|
||||
parser.add_argument("--heartbeat", nargs="*", default=["heartbeat/ticks_20260325.jsonl", "heartbeat/ticks_20260326.jsonl", "heartbeat/ticks_20260327.jsonl", "heartbeat/ticks_20260328.jsonl", "heartbeat/ticks_20260329.jsonl", "heartbeat/ticks_20260330.jsonl"])
|
||||
parser.add_argument("--horizon-hours", type=int, default=6)
|
||||
parser.add_argument("--json", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
forecast = forecast_resources(args.metrics, args.heartbeat, horizon_hours=args.horizon_hours)
|
||||
if args.json:
|
||||
print(json.dumps(forecast, indent=2))
|
||||
else:
|
||||
print(render_markdown(forecast))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
123
tests/test_predictive_resource_allocator.py
Normal file
123
tests/test_predictive_resource_allocator.py
Normal file
@@ -0,0 +1,123 @@
|
||||
from pathlib import Path
|
||||
import importlib.util
|
||||
import json
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
SCRIPT_PATH = ROOT / "scripts" / "predictive_resource_allocator.py"
|
||||
DOC_PATH = ROOT / "docs" / "PREDICTIVE_RESOURCE_ALLOCATION.md"
|
||||
|
||||
|
||||
def load_module(path: Path, name: str):
|
||||
assert path.exists(), f"missing {path.relative_to(ROOT)}"
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
assert spec and spec.loader
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def write_jsonl(path: Path, rows: list[dict]) -> None:
|
||||
path.write_text("".join(json.dumps(row) + "\n" for row in rows), encoding="utf-8")
|
||||
|
||||
|
||||
def test_forecast_detects_recent_surge_and_recommends_prewarm(tmp_path):
|
||||
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
|
||||
|
||||
metrics_path = tmp_path / "metrics.jsonl"
|
||||
heartbeat_path = tmp_path / "heartbeat.jsonl"
|
||||
|
||||
metric_rows = []
|
||||
# baseline: 1 req/hour for 6 earlier hours
|
||||
for hour in range(6):
|
||||
metric_rows.append({
|
||||
"timestamp": f"2026-03-29T0{hour}:00:00+00:00",
|
||||
"caller": "heartbeat_tick",
|
||||
"prompt_len": 1000,
|
||||
"response_len": 50,
|
||||
"success": True,
|
||||
})
|
||||
# recent surge: 5 req/hour plus large batch job
|
||||
for minute in [0, 10, 20, 30, 40]:
|
||||
metric_rows.append({
|
||||
"timestamp": f"2026-03-29T12:{minute:02d}:00+00:00",
|
||||
"caller": "heartbeat_tick",
|
||||
"prompt_len": 1200,
|
||||
"response_len": 50,
|
||||
"success": True,
|
||||
})
|
||||
metric_rows.append({
|
||||
"timestamp": "2026-03-29T12:15:00+00:00",
|
||||
"caller": "know-thy-father-draft:batch_003",
|
||||
"prompt_len": 14420,
|
||||
"response_len": 50,
|
||||
"success": True,
|
||||
})
|
||||
write_jsonl(metrics_path, metric_rows)
|
||||
|
||||
heartbeat_rows = [
|
||||
{
|
||||
"timestamp": "2026-03-29T12:10:00+00:00",
|
||||
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
|
||||
},
|
||||
{
|
||||
"timestamp": "2026-03-29T12:20:00+00:00",
|
||||
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
|
||||
},
|
||||
]
|
||||
write_jsonl(heartbeat_path, heartbeat_rows)
|
||||
|
||||
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
|
||||
|
||||
assert forecast["resource_mode"] == "surge"
|
||||
assert forecast["surge_factor"] > 1.5
|
||||
assert any("Pre-warm local inference" in action for action in forecast["recommended_actions"])
|
||||
assert any("Throttle or defer large background jobs" in action for action in forecast["recommended_actions"])
|
||||
assert forecast["top_callers_recent"][0]["caller"] == "heartbeat_tick"
|
||||
|
||||
|
||||
def test_forecast_detects_control_plane_risk_from_gitea_outage(tmp_path):
|
||||
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
|
||||
|
||||
metrics_path = tmp_path / "metrics.jsonl"
|
||||
heartbeat_path = tmp_path / "heartbeat.jsonl"
|
||||
write_jsonl(metrics_path, [
|
||||
{
|
||||
"timestamp": "2026-03-29T13:00:00+00:00",
|
||||
"caller": "heartbeat_tick",
|
||||
"prompt_len": 1000,
|
||||
"response_len": 50,
|
||||
"success": True,
|
||||
}
|
||||
])
|
||||
write_jsonl(heartbeat_path, [
|
||||
{
|
||||
"timestamp": "2026-03-29T13:00:00+00:00",
|
||||
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
|
||||
},
|
||||
{
|
||||
"timestamp": "2026-03-29T13:10:00+00:00",
|
||||
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
|
||||
},
|
||||
])
|
||||
|
||||
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
|
||||
|
||||
assert forecast["gitea_outages_recent"] == 2
|
||||
assert any("Pre-fetch or cache forge state" in action for action in forecast["recommended_actions"])
|
||||
assert forecast["dispatch_posture"] == "degraded"
|
||||
|
||||
|
||||
def test_repo_contains_predictive_resource_allocation_doc():
|
||||
assert DOC_PATH.exists(), "missing predictive resource allocation doc"
|
||||
text = DOC_PATH.read_text(encoding="utf-8")
|
||||
required = [
|
||||
"# Predictive Resource Allocation",
|
||||
"scripts/predictive_resource_allocator.py",
|
||||
"resource_mode",
|
||||
"dispatch_posture",
|
||||
"Pre-warm local inference",
|
||||
"Throttle or defer large background jobs",
|
||||
]
|
||||
for snippet in required:
|
||||
assert snippet in text
|
||||
Reference in New Issue
Block a user