Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
e4c4c21170 feat: add predictive resource allocation forecast (#749)
Some checks are pending
Smoke Test / smoke (pull_request) Waiting to run
2026-04-15 21:08:26 -04:00
10fd467b28 Merge pull request 'fix: resolve v2 harness import collision with explicit path loading (#716)' (#748) from burn/716-1776264183 into main 2026-04-15 16:04:04 +00:00
ba2d365669 fix: resolve v2 harness import collision with explicit path loading (closes #716)
Some checks are pending
Smoke Test / smoke (pull_request) Waiting to run
2026-04-15 11:46:37 -04:00
5a696c184e Merge pull request 'feat: add NH Broadband install packet scaffold (closes #740)' (#741) from sprint/issue-740 into main 2026-04-15 11:57:34 +00:00
Alexander Whitestone
90d8daedcf feat: add NH Broadband install packet scaffold (closes #740)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-15 07:33:01 -04:00
10 changed files with 775 additions and 3 deletions

View File

@@ -0,0 +1,50 @@
# Predictive Resource Allocation
This feature forecasts near-term fleet pressure from historical metrics and heartbeat logs so the operator can act before the system exhausts itself.
## Source data
- `metrics/local_*.jsonl` — historical request cadence, prompt size, success/failure, caller names
- `heartbeat/ticks_*.jsonl` — control-plane health (`gitea_alive`) and inference health (`inference_ok`)
## Script
`python3 scripts/predictive_resource_allocator.py --json`
The script computes:
- `resource_mode`
- `dispatch_posture`
- recent vs baseline request rate/hour
- `surge_factor`
- recent forge outages
- recent inference failures
- top callers in the recent window
## Example operator actions
- Pre-warm local inference before the next forecast window.
- Throttle or defer large background jobs until off-peak capacity is available.
- Investigate local model reliability and reserve headroom for heartbeat traffic.
- Pre-fetch or cache forge state before the next dispatch window.
## Why this matters
Predictive allocation is the missing bridge between passive dashboards and active sovereignty.
Instead of merely noticing resource exhaustion after the fact, the fleet can infer likely pressure from:
- rising heartbeat traffic
- large background batch jobs like `know-thy-father-draft:*`
- repeated inference failures
- repeated Gitea outages
## Output contract
The script intentionally emits a small, stable schema so future automation can consume it:
- `resource_mode`
- `dispatch_posture`
- `predicted_request_rate_per_hour`
- `surge_factor`
- `recommended_actions`
- `top_callers_recent`
That makes it safe to wire into later dashboards, cron nudges, or pre-provisioning hooks without rewriting the forecasting core.

View File

@@ -0,0 +1,37 @@
# NH Broadband Install Packet
**Packet ID:** nh-bb-20260415-113232
**Generated:** 2026-04-15T11:32:32.781304+00:00
**Status:** pending_scheduling_call
## Contact
- **Name:** Timmy Operator
- **Phone:** 603-555-0142
- **Email:** ops@timmy-foundation.example
## Service Address
- 123 Example Lane
- Concord, NH 03301
## Desired Plan
residential-fiber
## Call Log
- **2026-04-15T14:30:00Z** — no_answer
- Called 1-800-NHBB-INFO, ring-out after 45s
## Appointment Checklist
- [ ] Confirm exact-address availability via NH Broadband online lookup
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
- [ ] Select appointment window (morning/afternoon)
- [ ] Confirm payment method (credit card / ACH)
- [ ] Receive appointment confirmation number
- [ ] Prepare site: clear path to ONT install location
- [ ] Post-install: run speed test (fast.com / speedtest.net)
- [ ] Log final speeds and appointment outcome

View File

@@ -0,0 +1,27 @@
contact:
name: Timmy Operator
phone: "603-555-0142"
email: ops@timmy-foundation.example
service:
address: "123 Example Lane"
city: Concord
state: NH
zip: "03301"
desired_plan: residential-fiber
call_log:
- timestamp: "2026-04-15T14:30:00Z"
outcome: no_answer
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
checklist:
- "Confirm exact-address availability via NH Broadband online lookup"
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
- "Select appointment window (morning/afternoon)"
- "Confirm payment method (credit card / ACH)"
- "Receive appointment confirmation number"
- "Prepare site: clear path to ONT install location"
- "Post-install: run speed test (fast.com / speedtest.net)"
- "Log final speeds and appointment outcome"

View File

@@ -0,0 +1,35 @@
# NH Broadband — Public Research Memo
**Date:** 2026-04-15
**Status:** Draft — separates verified facts from unverified live work
**Refs:** #533, #740
---
## Verified (official public sources)
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
- Residential fiber plans are offered; speed tiers vary by location.
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
## Unverified / Requires Live Work
| Item | Status | Notes |
|---|---|---|
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
| Post-install speed test results | ❌ pending | Must run after physical install completes |
## Next Steps (Refs #740)
1. Run address availability lookup on `nhbroadband.com`
2. Call 1-800-NHBB-INFO to schedule install
3. Confirm payment method
4. Receive appointment confirmation number
5. Prepare site (clear ONT install path)
6. Post-install: speed test and log results

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""NH Broadband install packet builder for the live scheduling step."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
def load_request(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("contact", {})
data.setdefault("service", {})
data.setdefault("call_log", [])
data.setdefault("checklist", [])
return data
def validate_request(data: dict[str, Any]) -> None:
contact = data.get("contact", {})
for field in ("name", "phone"):
if not contact.get(field, "").strip():
raise ValueError(f"contact.{field} is required")
service = data.get("service", {})
for field in ("address", "city", "state"):
if not service.get(field, "").strip():
raise ValueError(f"service.{field} is required")
if not data.get("checklist"):
raise ValueError("checklist must contain at least one item")
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
validate_request(data)
contact = data["contact"]
service = data["service"]
return {
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
"generated_utc": datetime.now(timezone.utc).isoformat(),
"contact": {
"name": contact["name"],
"phone": contact["phone"],
"email": contact.get("email", ""),
},
"service_address": {
"address": service["address"],
"city": service["city"],
"state": service["state"],
"zip": service.get("zip", ""),
},
"desired_plan": data.get("desired_plan", "residential-fiber"),
"call_log": data.get("call_log", []),
"checklist": [
{"item": item, "done": False} if isinstance(item, str) else item
for item in data["checklist"]
],
"status": "pending_scheduling_call",
}
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
contact = packet["contact"]
addr = packet["service_address"]
lines = [
f"# NH Broadband Install Packet",
"",
f"**Packet ID:** {packet['packet_id']}",
f"**Generated:** {packet['generated_utc']}",
f"**Status:** {packet['status']}",
"",
"## Contact",
"",
f"- **Name:** {contact['name']}",
f"- **Phone:** {contact['phone']}",
f"- **Email:** {contact.get('email', 'n/a')}",
"",
"## Service Address",
"",
f"- {addr['address']}",
f"- {addr['city']}, {addr['state']} {addr['zip']}",
"",
f"## Desired Plan",
"",
f"{packet['desired_plan']}",
"",
"## Call Log",
"",
]
if packet["call_log"]:
for entry in packet["call_log"]:
ts = entry.get("timestamp", "n/a")
outcome = entry.get("outcome", "n/a")
notes = entry.get("notes", "")
lines.append(f"- **{ts}** — {outcome}")
if notes:
lines.append(f" - {notes}")
else:
lines.append("_No calls logged yet._")
lines.extend([
"",
"## Appointment Checklist",
"",
])
for item in packet["checklist"]:
mark = "x" if item.get("done") else " "
lines.append(f"- [{mark}] {item['item']}")
lines.append("")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
parser.add_argument("request", help="Path to install request YAML")
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
args = parser.parse_args()
data = load_request(args.request)
packet = build_packet(data)
if args.markdown:
print(render_markdown(packet, data))
else:
print(json.dumps(packet, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""Predictive resource allocation for the Timmy fleet.
Forecasts near-term pressure from historical metrics and heartbeat logs so the
operator can pre-warm models, defer background work, and protect dispatch.
"""
from __future__ import annotations
import argparse
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable
def parse_timestamp(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(path_or_paths: Path | str | Iterable[Path | str]) -> list[dict]:
if isinstance(path_or_paths, (str, Path)):
paths = [Path(path_or_paths)]
else:
paths = [Path(p) for p in path_or_paths]
rows: list[dict] = []
for path in paths:
if not path.exists():
continue
with path.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def recent_window_cutoff(rows: list[dict], horizon_hours: int) -> datetime:
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
return latest - timedelta(hours=horizon_hours)
def summarize_callers(rows: list[dict], cutoff: datetime) -> list[dict]:
counts: Counter[str] = Counter()
prompt_tokens: Counter[str] = Counter()
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
counts[caller] += 1
prompt_tokens[caller] += int(row.get("prompt_len", 0))
summary = [
{"caller": caller, "requests": counts[caller], "prompt_tokens": prompt_tokens[caller]}
for caller in counts
]
summary.sort(key=lambda item: (-item["requests"], -item["prompt_tokens"], item["caller"]))
return summary
def compute_request_rates(rows: list[dict], horizon_hours: int) -> tuple[float, float, float, float, float]:
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(parse_timestamp(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if parse_timestamp(r["timestamp"]) >= recent_cutoff]
baseline = [r for r in rows if baseline_cutoff <= parse_timestamp(r["timestamp"]) < recent_cutoff]
recent_rate = len(recent) / float(horizon_hours)
baseline_rate = (len(baseline) / float(horizon_hours)) if baseline else max(1.0, recent_rate)
recent_prompt_rate = sum(int(r.get("prompt_len", 0)) for r in recent) / float(horizon_hours)
baseline_prompt_rate = (
sum(int(r.get("prompt_len", 0)) for r in baseline) / float(horizon_hours)
if baseline else max(1.0, recent_prompt_rate)
)
request_surge = recent_rate / baseline_rate if baseline_rate else 1.0
prompt_surge = recent_prompt_rate / baseline_prompt_rate if baseline_prompt_rate else 1.0
surge_factor = max(request_surge, prompt_surge)
return recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate
def count_recent_heartbeat_risks(rows: list[dict], horizon_hours: int) -> tuple[int, int]:
if not rows:
return 0, 0
cutoff = recent_window_cutoff(rows, horizon_hours)
gitea_outages = 0
inference_failures = 0
for row in rows:
ts = parse_timestamp(row["timestamp"])
if ts < cutoff:
continue
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return gitea_outages, inference_failures
def build_recommendations(
surge_factor: float,
top_callers_recent: list[dict],
gitea_outages_recent: int,
inference_failures_recent: int,
) -> tuple[str, str, list[str]]:
resource_mode = "steady"
dispatch_posture = "normal"
actions: list[str] = []
if surge_factor > 1.5:
resource_mode = "surge"
actions.append("Pre-warm local inference before the next forecast window.")
heavy_background = any(
caller["prompt_tokens"] >= 10000 and caller["caller"].startswith("know-thy-father")
for caller in top_callers_recent
)
if heavy_background:
actions.append("Throttle or defer large background jobs until off-peak capacity is available.")
if inference_failures_recent >= 2:
resource_mode = "surge"
actions.append("Investigate local model reliability and reserve headroom for heartbeat traffic.")
if gitea_outages_recent >= 1:
dispatch_posture = "degraded"
actions.append("Pre-fetch or cache forge state before the next dispatch window.")
if not actions:
actions.append("Maintain current resource allocation; no surge indicators detected.")
return resource_mode, dispatch_posture, actions
def forecast_resources(
metrics_paths: Path | str | Iterable[Path | str],
heartbeat_paths: Path | str | Iterable[Path | str],
horizon_hours: int = 6,
) -> dict:
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_prompt_rate, baseline_prompt_rate = compute_request_rates(metric_rows, horizon_hours)
cutoff = recent_window_cutoff(metric_rows or heartbeat_rows or [{"timestamp": datetime.now(timezone.utc).isoformat()}], horizon_hours) if (metric_rows or heartbeat_rows) else datetime.now(timezone.utc)
top_callers_recent = summarize_callers(metric_rows, cutoff) if metric_rows else []
gitea_outages_recent, inference_failures_recent = count_recent_heartbeat_risks(heartbeat_rows, horizon_hours)
resource_mode, dispatch_posture, recommended_actions = build_recommendations(
surge_factor, top_callers_recent, gitea_outages_recent, inference_failures_recent
)
predicted_request_rate = round(max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2)
predicted_prompt_tokens = sum(caller["prompt_tokens"] for caller in top_callers_recent)
return {
"horizon_hours": horizon_hours,
"resource_mode": resource_mode,
"dispatch_posture": dispatch_posture,
"recent_request_rate_per_hour": round(recent_rate, 2),
"baseline_request_rate_per_hour": round(baseline_rate, 2),
"predicted_request_rate_per_hour": predicted_request_rate,
"predicted_prompt_tokens_recent_window": predicted_prompt_tokens,
"recent_prompt_tokens_per_hour": round(recent_prompt_rate, 2),
"baseline_prompt_tokens_per_hour": round(baseline_prompt_rate, 2),
"surge_factor": round(surge_factor, 2),
"gitea_outages_recent": gitea_outages_recent,
"inference_failures_recent": inference_failures_recent,
"top_callers_recent": top_callers_recent,
"recommended_actions": recommended_actions,
}
def render_markdown(forecast: dict) -> str:
lines = [
"# Predictive Resource Allocation",
"",
f"Forecast horizon: {forecast['horizon_hours']} hours",
f"resource_mode: {forecast['resource_mode']}",
f"dispatch_posture: {forecast['dispatch_posture']}",
f"Recent request rate/hour: {forecast['recent_request_rate_per_hour']}",
f"Baseline request rate/hour: {forecast['baseline_request_rate_per_hour']}",
f"Predicted request rate/hour: {forecast['predicted_request_rate_per_hour']}",
f"Surge factor: {forecast['surge_factor']}",
f"Recent prompt tokens/hour: {forecast['recent_prompt_tokens_per_hour']}",
f"Baseline prompt tokens/hour: {forecast['baseline_prompt_tokens_per_hour']}",
f"Gitea outages (recent): {forecast['gitea_outages_recent']}",
f"Inference failures (recent): {forecast['inference_failures_recent']}",
"",
"## Recommended actions",
"",
]
lines.extend(f"- {action}" for action in forecast["recommended_actions"])
lines.extend([
"",
"## Top callers in recent window",
"",
"| Caller | Requests | Prompt tokens |",
"|---|---:|---:|",
])
for caller in forecast["top_callers_recent"]:
lines.append(f"| {caller['caller']} | {caller['requests']} | {caller['prompt_tokens']} |")
return "\n".join(lines) + "\n"
def main() -> int:
parser = argparse.ArgumentParser(description="Forecast fleet resource needs from historical metrics")
parser.add_argument("--metrics", nargs="*", default=["metrics/local_20260326.jsonl", "metrics/local_20260327.jsonl", "metrics/local_20260328.jsonl", "metrics/local_20260329.jsonl", "metrics/local_20260330.jsonl"])
parser.add_argument("--heartbeat", nargs="*", default=["heartbeat/ticks_20260325.jsonl", "heartbeat/ticks_20260326.jsonl", "heartbeat/ticks_20260327.jsonl", "heartbeat/ticks_20260328.jsonl", "heartbeat/ticks_20260329.jsonl", "heartbeat/ticks_20260330.jsonl"])
parser.add_argument("--horizon-hours", type=int, default=6)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
forecast = forecast_resources(args.metrics, args.heartbeat, horizon_hours=args.horizon_hours)
if args.json:
print(json.dumps(forecast, indent=2))
else:
print(render_markdown(forecast))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,105 @@
from pathlib import Path
import yaml
from scripts.plan_nh_broadband_install import (
build_packet,
load_request,
render_markdown,
validate_request,
)
def test_script_exists() -> None:
assert Path("scripts/plan_nh_broadband_install.py").exists()
def test_example_request_exists() -> None:
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
def test_example_packet_exists() -> None:
assert Path("docs/nh-broadband-install-packet.example.md").exists()
def test_research_memo_exists() -> None:
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
def test_load_and_build_packet() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
assert packet["contact"]["name"] == "Timmy Operator"
assert packet["service_address"]["city"] == "Concord"
assert packet["service_address"]["state"] == "NH"
assert packet["status"] == "pending_scheduling_call"
assert len(packet["checklist"]) == 8
assert packet["checklist"][0]["done"] is False
def test_validate_rejects_missing_contact_name() -> None:
data = {
"contact": {"name": "", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "contact.name" in str(exc)
else:
raise AssertionError("should reject empty contact name")
def test_validate_rejects_missing_service_address() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "service.address" in str(exc)
else:
raise AssertionError("should reject empty service address")
def test_validate_rejects_empty_checklist() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": [],
}
try:
validate_request(data)
except ValueError as exc:
assert "checklist" in str(exc)
else:
raise AssertionError("should reject empty checklist")
def test_render_markdown_contains_key_sections() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "# NH Broadband Install Packet" in md
assert "## Contact" in md
assert "## Service Address" in md
assert "## Call Log" in md
assert "## Appointment Checklist" in md
assert "Concord" in md
assert "NH" in md
def test_render_markdown_shows_checklist_items() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "- [ ] Confirm exact-address availability" in md
def test_example_yaml_is_valid() -> None:
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
assert data["contact"]["name"] == "Timmy Operator"
assert len(data["checklist"]) == 8

View File

@@ -0,0 +1,123 @@
from pathlib import Path
import importlib.util
import json
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "predictive_resource_allocator.py"
DOC_PATH = ROOT / "docs" / "PREDICTIVE_RESOURCE_ALLOCATION.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def write_jsonl(path: Path, rows: list[dict]) -> None:
path.write_text("".join(json.dumps(row) + "\n" for row in rows), encoding="utf-8")
def test_forecast_detects_recent_surge_and_recommends_prewarm(tmp_path):
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
metric_rows = []
# baseline: 1 req/hour for 6 earlier hours
for hour in range(6):
metric_rows.append({
"timestamp": f"2026-03-29T0{hour}:00:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1000,
"response_len": 50,
"success": True,
})
# recent surge: 5 req/hour plus large batch job
for minute in [0, 10, 20, 30, 40]:
metric_rows.append({
"timestamp": f"2026-03-29T12:{minute:02d}:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1200,
"response_len": 50,
"success": True,
})
metric_rows.append({
"timestamp": "2026-03-29T12:15:00+00:00",
"caller": "know-thy-father-draft:batch_003",
"prompt_len": 14420,
"response_len": 50,
"success": True,
})
write_jsonl(metrics_path, metric_rows)
heartbeat_rows = [
{
"timestamp": "2026-03-29T12:10:00+00:00",
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
},
{
"timestamp": "2026-03-29T12:20:00+00:00",
"perception": {"gitea_alive": True, "model_health": {"inference_ok": False}},
},
]
write_jsonl(heartbeat_path, heartbeat_rows)
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
assert forecast["resource_mode"] == "surge"
assert forecast["surge_factor"] > 1.5
assert any("Pre-warm local inference" in action for action in forecast["recommended_actions"])
assert any("Throttle or defer large background jobs" in action for action in forecast["recommended_actions"])
assert forecast["top_callers_recent"][0]["caller"] == "heartbeat_tick"
def test_forecast_detects_control_plane_risk_from_gitea_outage(tmp_path):
mod = load_module(SCRIPT_PATH, "predictive_resource_allocator")
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
write_jsonl(metrics_path, [
{
"timestamp": "2026-03-29T13:00:00+00:00",
"caller": "heartbeat_tick",
"prompt_len": 1000,
"response_len": 50,
"success": True,
}
])
write_jsonl(heartbeat_path, [
{
"timestamp": "2026-03-29T13:00:00+00:00",
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
},
{
"timestamp": "2026-03-29T13:10:00+00:00",
"perception": {"gitea_alive": False, "model_health": {"inference_ok": True}},
},
])
forecast = mod.forecast_resources(metrics_path, heartbeat_path, horizon_hours=6)
assert forecast["gitea_outages_recent"] == 2
assert any("Pre-fetch or cache forge state" in action for action in forecast["recommended_actions"])
assert forecast["dispatch_posture"] == "degraded"
def test_repo_contains_predictive_resource_allocation_doc():
assert DOC_PATH.exists(), "missing predictive resource allocation doc"
text = DOC_PATH.read_text(encoding="utf-8")
required = [
"# Predictive Resource Allocation",
"scripts/predictive_resource_allocator.py",
"resource_mode",
"dispatch_posture",
"Pre-warm local inference",
"Throttle or defer large background jobs",
]
for snippet in required:
assert snippet in text

View File

@@ -17,8 +17,24 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import importlib.util
from harness import UniWizardHarness, House, ExecutionResult
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution."""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
class TaskType(Enum):

View File

@@ -8,13 +8,30 @@ import time
import sys
import argparse
import os
import importlib.util
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent))
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution.
Prevents namespace collisions when multiple directories contain modules
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
"""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
from author_whitelist import AuthorWhitelist