Compare commits

...

9 Commits

Author SHA1 Message Date
2c31ae8972 docs: predictive resource allocation — operator guide (#749)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
Closes #749
2026-04-16 01:51:19 +00:00
a76e83439c test: predictive resource allocator (#749)
25 tests covering:
- Timestamp parsing, rate computation, caller analysis
- Heartbeat risk detection, demand prediction
- Posture determination, full forecast pipeline
- Markdown output

Closes #749
2026-04-16 01:49:51 +00:00
a14a233626 feat: predictive resource allocation — forecast fleet demand (#749)
Analyzes historical metrics and heartbeat logs to predict
workload surges and recommend pre-provisioning actions.

Closes #749
2026-04-16 01:47:48 +00:00
601c5fe267 Merge pull request 'research: Long Context vs RAG Decision Framework (backlog item #4.3)' (#750) from research/long-context-vs-rag into main 2026-04-16 01:39:55 +00:00
6222b18a38 research: Long Context vs RAG Decision Framework (backlog item #4.3)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
Highest-ratio research item (Impact:4, Effort:1, Ratio:4.0).
Covers decision matrix for stuffing vs RAG, our stack constraints,
context budgeting, progressive loading, and smart compression.
2026-04-15 16:38:07 +00:00
10fd467b28 Merge pull request 'fix: resolve v2 harness import collision with explicit path loading (#716)' (#748) from burn/716-1776264183 into main 2026-04-15 16:04:04 +00:00
ba2d365669 fix: resolve v2 harness import collision with explicit path loading (closes #716)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 18s
2026-04-15 11:46:37 -04:00
5a696c184e Merge pull request 'feat: add NH Broadband install packet scaffold (closes #740)' (#741) from sprint/issue-740 into main 2026-04-15 11:57:34 +00:00
Alexander Whitestone
90d8daedcf feat: add NH Broadband install packet scaffold (closes #740)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 19s
2026-04-15 07:33:01 -04:00
11 changed files with 1210 additions and 3 deletions

View File

@@ -0,0 +1,87 @@
# Predictive Resource Allocation
Forecasts near-term fleet demand from historical telemetry so the operator can
pre-provision resources before a surge hits.
## How It Works
The predictor reads two data sources:
1. **Metric logs** (`metrics/local_*.jsonl`) — request cadence, token volume,
caller mix, success/failure rates
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
local inference health
It compares a **recent window** (last N hours) against a **baseline window**
(previous N hours) to detect surges and degradation.
## Output Contract
```json
{
"resource_mode": "steady|surge",
"dispatch_posture": "normal|degraded",
"horizon_hours": 6,
"recent_request_rate": 12.5,
"baseline_request_rate": 8.0,
"predicted_request_rate": 15.0,
"surge_factor": 1.56,
"demand_level": "elevated|normal|low|critical",
"gitea_outages": 0,
"inference_failures": 2,
"top_callers": [...],
"recommended_actions": ["..."]
}
```
### Demand Levels
| Surge Factor | Level | Meaning |
|-------------|-------|---------|
| > 3.0 | critical | Extreme surge, immediate action needed |
| > 1.5 | elevated | Notable increase, pre-warm recommended |
| > 1.0 | normal | Slight increase, monitor |
| <= 1.0 | low | Flat or declining |
### Posture Signals
| Signal | Effect |
|--------|--------|
| Surge factor > 1.5 | `resource_mode: surge` + pre-warm recommendation |
| Gitea outages >= 1 | `dispatch_posture: degraded` + cache recommendation |
| Inference failures >= 2 | `resource_mode: surge` + reliability investigation |
| Heavy batch callers | Throttle recommendation |
| High caller failure rates | Investigation recommendation |
## Usage
```bash
# Markdown report
python3 scripts/predictive_resource_allocator.py
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Custom paths and horizon
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/local_20260329.jsonl \
--heartbeat heartbeat/ticks_20260329.jsonl \
--horizon 12
```
## Tests
```bash
python3 -m pytest tests/test_predictive_resource_allocator.py -v
```
## Recommended Actions
The predictor generates contextual recommendations:
- **Pre-warm local inference** — surge detected, warm up before next window
- **Throttle background jobs** — heavy batch work consuming capacity
- **Investigate failure rates** — specific callers failing at high rates
- **Investigate model reliability** — inference health degraded
- **Cache forge state** — Gitea availability issues
- **Maintain current allocation** — no issues detected

View File

@@ -0,0 +1,37 @@
# NH Broadband Install Packet
**Packet ID:** nh-bb-20260415-113232
**Generated:** 2026-04-15T11:32:32.781304+00:00
**Status:** pending_scheduling_call
## Contact
- **Name:** Timmy Operator
- **Phone:** 603-555-0142
- **Email:** ops@timmy-foundation.example
## Service Address
- 123 Example Lane
- Concord, NH 03301
## Desired Plan
residential-fiber
## Call Log
- **2026-04-15T14:30:00Z** — no_answer
- Called 1-800-NHBB-INFO, ring-out after 45s
## Appointment Checklist
- [ ] Confirm exact-address availability via NH Broadband online lookup
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
- [ ] Select appointment window (morning/afternoon)
- [ ] Confirm payment method (credit card / ACH)
- [ ] Receive appointment confirmation number
- [ ] Prepare site: clear path to ONT install location
- [ ] Post-install: run speed test (fast.com / speedtest.net)
- [ ] Log final speeds and appointment outcome

View File

@@ -0,0 +1,27 @@
contact:
name: Timmy Operator
phone: "603-555-0142"
email: ops@timmy-foundation.example
service:
address: "123 Example Lane"
city: Concord
state: NH
zip: "03301"
desired_plan: residential-fiber
call_log:
- timestamp: "2026-04-15T14:30:00Z"
outcome: no_answer
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
checklist:
- "Confirm exact-address availability via NH Broadband online lookup"
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
- "Select appointment window (morning/afternoon)"
- "Confirm payment method (credit card / ACH)"
- "Receive appointment confirmation number"
- "Prepare site: clear path to ONT install location"
- "Post-install: run speed test (fast.com / speedtest.net)"
- "Log final speeds and appointment outcome"

View File

@@ -0,0 +1,35 @@
# NH Broadband — Public Research Memo
**Date:** 2026-04-15
**Status:** Draft — separates verified facts from unverified live work
**Refs:** #533, #740
---
## Verified (official public sources)
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
- Residential fiber plans are offered; speed tiers vary by location.
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
## Unverified / Requires Live Work
| Item | Status | Notes |
|---|---|---|
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
| Post-install speed test results | ❌ pending | Must run after physical install completes |
## Next Steps (Refs #740)
1. Run address availability lookup on `nhbroadband.com`
2. Call 1-800-NHBB-INFO to schedule install
3. Confirm payment method
4. Receive appointment confirmation number
5. Prepare site (clear ONT install path)
6. Post-install: speed test and log results

View File

@@ -0,0 +1,102 @@
# Long Context vs RAG Decision Framework
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
**Date**: 2026-04-15
**Status**: RESEARCHED
## Executive Summary
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
## The Core Insight
**Long context ≠ better answers.** Research shows:
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
- Cost scales quadratically with context length (attention computation)
- Latency increases linearly with input length
**RAG ≠ always better.** Retrieval introduces:
- Recall errors (miss relevant chunks)
- Precision errors (retrieve irrelevant chunks)
- Chunking artifacts (splitting mid-sentence)
- Additional latency for embedding + search
## Decision Matrix
| Scenario | Context Size | Recommendation | Why |
|----------|-------------|---------------|-----|
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
## Our Stack Constraints
### What We Have
- **Cloud models**: 128K-200K context (OpenRouter providers)
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
- **Hermes fact_store**: SQLite FTS5 full-text search
- **Memory**: MemPalace holographic embeddings
- **Session context**: Growing conversation history
### What This Means
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
3. **Mixed fleet**: Need a routing layer that decides per-session.
## Advanced Patterns
### 1. Progressive Context Loading
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
```
Turn 1: RAG search → top 3 chunks
Turn 2: Model asks "I need more context about X" → stuff X
Turn 3: Model has enough → continue
```
### 2. Context Budgeting
Allocate context budget across components:
```
System prompt: 2,000 tokens (always)
Recent messages: 10,000 tokens (last 5 turns)
RAG results: 8,000 tokens (top chunks)
Stuffed docs: 12,000 tokens (key docs)
---------------------------
Total: 32,000 tokens (fits 32K model)
```
### 3. Smart Compression
Before stuffing, compress older context:
- Summarize turns older than 10
- Remove tool call results (keep only final outputs)
- Deduplicate repeated information
- Use structured representations (JSON) instead of prose
## Empirical Benchmarks Needed
1. **Stuffing vs RAG accuracy** on our fact_store queries
2. **Latency comparison** at 32K, 64K, 128K context
3. **Cost per query** for cloud models at various context sizes
4. **Local model behavior** when pushed beyond rated context
## Recommendations
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
3. **Add context-size logging**: Track input tokens per session for data gathering
## References
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
---
*Sovereignty and service always.*

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""NH Broadband install packet builder for the live scheduling step."""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
def load_request(path: str | Path) -> dict[str, Any]:
data = yaml.safe_load(Path(path).read_text()) or {}
data.setdefault("contact", {})
data.setdefault("service", {})
data.setdefault("call_log", [])
data.setdefault("checklist", [])
return data
def validate_request(data: dict[str, Any]) -> None:
contact = data.get("contact", {})
for field in ("name", "phone"):
if not contact.get(field, "").strip():
raise ValueError(f"contact.{field} is required")
service = data.get("service", {})
for field in ("address", "city", "state"):
if not service.get(field, "").strip():
raise ValueError(f"service.{field} is required")
if not data.get("checklist"):
raise ValueError("checklist must contain at least one item")
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
validate_request(data)
contact = data["contact"]
service = data["service"]
return {
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
"generated_utc": datetime.now(timezone.utc).isoformat(),
"contact": {
"name": contact["name"],
"phone": contact["phone"],
"email": contact.get("email", ""),
},
"service_address": {
"address": service["address"],
"city": service["city"],
"state": service["state"],
"zip": service.get("zip", ""),
},
"desired_plan": data.get("desired_plan", "residential-fiber"),
"call_log": data.get("call_log", []),
"checklist": [
{"item": item, "done": False} if isinstance(item, str) else item
for item in data["checklist"]
],
"status": "pending_scheduling_call",
}
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
contact = packet["contact"]
addr = packet["service_address"]
lines = [
f"# NH Broadband Install Packet",
"",
f"**Packet ID:** {packet['packet_id']}",
f"**Generated:** {packet['generated_utc']}",
f"**Status:** {packet['status']}",
"",
"## Contact",
"",
f"- **Name:** {contact['name']}",
f"- **Phone:** {contact['phone']}",
f"- **Email:** {contact.get('email', 'n/a')}",
"",
"## Service Address",
"",
f"- {addr['address']}",
f"- {addr['city']}, {addr['state']} {addr['zip']}",
"",
f"## Desired Plan",
"",
f"{packet['desired_plan']}",
"",
"## Call Log",
"",
]
if packet["call_log"]:
for entry in packet["call_log"]:
ts = entry.get("timestamp", "n/a")
outcome = entry.get("outcome", "n/a")
notes = entry.get("notes", "")
lines.append(f"- **{ts}** — {outcome}")
if notes:
lines.append(f" - {notes}")
else:
lines.append("_No calls logged yet._")
lines.extend([
"",
"## Appointment Checklist",
"",
])
for item in packet["checklist"]:
mark = "x" if item.get("done") else " "
lines.append(f"- [{mark}] {item['item']}")
lines.append("")
return "\n".join(lines)
def main() -> int:
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
parser.add_argument("request", help="Path to install request YAML")
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
args = parser.parse_args()
data = load_request(args.request)
packet = build_packet(data)
if args.markdown:
print(render_markdown(packet, data))
else:
print(json.dumps(packet, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,410 @@
#!/usr/bin/env python3
"""
Predictive Resource Allocation — Timmy Foundation Fleet
Analyzes historical utilization patterns, predicts workload surges,
and recommends pre-provisioning actions.
Usage:
python3 scripts/predictive_resource_allocator.py \
--metrics metrics/*.jsonl \
--heartbeat heartbeat/*.jsonl \
--horizon 6
# JSON output
python3 scripts/predictive_resource_allocator.py --json
# Quick forecast from default paths
python3 scripts/predictive_resource_allocator.py
"""
import argparse
import glob
import json
import os
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# ── Constants ────────────────────────────────────────────────────────────────
SURGE_THRESHOLD = 1.5
HEAVY_TOKEN_THRESHOLD = 10000
DEFAULT_HORIZON_HOURS = 6
DEFAULT_METRICS_GLOB = "metrics/local_*.jsonl"
DEFAULT_HEARTBEAT_GLOB = "heartbeat/ticks_*.jsonl"
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
# ── Data Loading ─────────────────────────────────────────────────────────────
def _parse_ts(value: str) -> datetime:
"""Parse ISO timestamp to UTC datetime."""
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
def load_jsonl(paths: Iterable[str]) -> List[dict]:
"""Load JSONL rows from one or more file paths/globs."""
rows: List[dict] = []
for pattern in paths:
for path in glob.glob(pattern):
if not os.path.isfile(path):
continue
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
return rows
def _default_paths(glob_pattern: str) -> List[str]:
"""Resolve a glob pattern relative to project root."""
full = os.path.join(ROOT_DIR, glob_pattern)
matches = glob.glob(full)
return matches if matches else [full]
# ── Time-Series Analysis ─────────────────────────────────────────────────────
def compute_rates(
rows: List[dict],
horizon_hours: int,
) -> Tuple[float, float, float, float, float]:
"""
Compare recent window vs baseline window.
Returns:
(recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate)
"""
if not rows:
return 0.0, 0.0, 1.0, 0.0, 0.0
latest = max(_parse_ts(r["timestamp"]) for r in rows)
recent_cutoff = latest - timedelta(hours=horizon_hours)
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
baseline = [
r for r in rows
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
]
recent_rate = len(recent) / max(horizon_hours, 1)
baseline_rate = (
len(baseline) / max(horizon_hours, 1)
if baseline
else max(0.1, recent_rate)
)
recent_tokens = sum(int(r.get("prompt_len", 0)) for r in recent)
baseline_tokens = sum(int(r.get("prompt_len", 0)) for r in baseline)
recent_token_rate = recent_tokens / max(horizon_hours, 1)
baseline_token_rate = (
baseline_tokens / max(horizon_hours, 1)
if baseline
else max(1.0, recent_token_rate)
)
request_surge = recent_rate / max(baseline_rate, 0.01)
token_surge = recent_token_rate / max(baseline_token_rate, 0.01)
surge_factor = max(request_surge, token_surge)
return recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate
def analyze_callers(rows: List[dict], horizon_hours: int) -> List[Dict[str, Any]]:
"""Summarize callers in the recent window."""
if not rows:
return []
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
calls: Counter = Counter()
tokens: Counter = Counter()
failures: Counter = Counter()
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
caller = row.get("caller", "unknown")
calls[caller] += 1
tokens[caller] += int(row.get("prompt_len", 0))
if not row.get("success", True):
failures[caller] += 1
summary = []
for caller in calls:
summary.append({
"caller": caller,
"requests": calls[caller],
"prompt_tokens": tokens[caller],
"failures": failures[caller],
"failure_rate": round(failures[caller] / max(calls[caller], 1) * 100, 1),
})
summary.sort(key=lambda x: (-x["requests"], -x["prompt_tokens"]))
return summary
def analyze_heartbeat(rows: List[dict], horizon_hours: int) -> Dict[str, int]:
"""Count infrastructure risks in recent window."""
if not rows:
return {"gitea_outages": 0, "inference_failures": 0, "total_checks": 0}
latest = max(_parse_ts(r["timestamp"]) for r in rows)
cutoff = latest - timedelta(hours=horizon_hours)
gitea_outages = 0
inference_failures = 0
total = 0
for row in rows:
ts = _parse_ts(row["timestamp"])
if ts < cutoff:
continue
total += 1
perception = row.get("perception", {})
if perception.get("gitea_alive") is False:
gitea_outages += 1
model_health = perception.get("model_health", {})
if model_health.get("inference_ok") is False:
inference_failures += 1
return {
"gitea_outages": gitea_outages,
"inference_failures": inference_failures,
"total_checks": total,
}
# ── Prediction Engine ────────────────────────────────────────────────────────
def predict_demand(
recent_rate: float,
baseline_rate: float,
surge_factor: float,
horizon_hours: int,
) -> Dict[str, Any]:
"""Predict near-term resource demand."""
predicted_rate = round(
max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2
)
if surge_factor > 3.0:
demand_level = "critical"
elif surge_factor > SURGE_THRESHOLD:
demand_level = "elevated"
elif surge_factor > 1.0:
demand_level = "normal"
else:
demand_level = "low"
return {
"predicted_requests_per_hour": predicted_rate,
"surge_factor": round(surge_factor, 2),
"demand_level": demand_level,
"horizon_hours": horizon_hours,
}
def determine_posture(
surge_factor: float,
callers: List[Dict[str, Any]],
heartbeat: Dict[str, int],
) -> Tuple[str, str, List[str]]:
"""
Determine fleet posture and recommended actions.
Returns:
(resource_mode, dispatch_posture, actions)
"""
mode = "steady"
posture = "normal"
actions: List[str] = []
# Surge detection
if surge_factor > SURGE_THRESHOLD:
mode = "surge"
actions.append(
"Pre-warm local inference before the next forecast window."
)
# Heavy background callers
heavy = [
c for c in callers
if c["prompt_tokens"] >= HEAVY_TOKEN_THRESHOLD
and ("batch" in c["caller"] or "know-thy-father" in c["caller"])
]
if heavy:
actions.append(
"Throttle or defer large background jobs until off-peak capacity is available."
)
# Caller failure rates
failing = [c for c in callers if c["failure_rate"] > 20 and c["requests"] >= 3]
if failing:
names = ", ".join(c["caller"] for c in failing[:3])
actions.append(
f"Investigate high failure rates in: {names}."
)
# Inference health
if heartbeat["inference_failures"] >= 2:
mode = "surge"
actions.append(
"Investigate local model reliability and reserve headroom for heartbeat traffic."
)
# Forge availability
if heartbeat["gitea_outages"] >= 1:
posture = "degraded"
actions.append(
"Pre-fetch or cache forge state before the next dispatch window."
)
if not actions:
actions.append(
"Maintain current resource allocation; no surge indicators detected."
)
return mode, posture, actions
# ── Main Forecast ────────────────────────────────────────────────────────────
def forecast(
metrics_paths: List[str],
heartbeat_paths: List[str],
horizon_hours: int = DEFAULT_HORIZON_HOURS,
) -> Dict[str, Any]:
"""Full resource forecast from metric and heartbeat logs."""
metric_rows = load_jsonl(metrics_paths)
heartbeat_rows = load_jsonl(heartbeat_paths)
recent_rate, baseline_rate, surge_factor, recent_tok_rate, base_tok_rate = (
compute_rates(metric_rows, horizon_hours)
)
callers = analyze_callers(metric_rows, horizon_hours)
heartbeat = analyze_heartbeat(heartbeat_rows, horizon_hours)
demand = predict_demand(recent_rate, baseline_rate, surge_factor, horizon_hours)
mode, posture, actions = determine_posture(surge_factor, callers, heartbeat)
return {
"resource_mode": mode,
"dispatch_posture": posture,
"horizon_hours": horizon_hours,
"recent_request_rate": round(recent_rate, 2),
"baseline_request_rate": round(baseline_rate, 2),
"predicted_request_rate": demand["predicted_requests_per_hour"],
"surge_factor": demand["surge_factor"],
"demand_level": demand["demand_level"],
"recent_prompt_tokens_per_hour": round(recent_tok_rate, 2),
"baseline_prompt_tokens_per_hour": round(base_tok_rate, 2),
"gitea_outages": heartbeat["gitea_outages"],
"inference_failures": heartbeat["inference_failures"],
"heartbeat_checks": heartbeat["total_checks"],
"top_callers": callers[:10],
"recommended_actions": actions,
}
# ── Output Formatters ────────────────────────────────────────────────────────
def format_markdown(fc: Dict[str, Any]) -> str:
"""Format forecast as markdown report."""
lines = [
"# Predictive Resource Allocation — Fleet Forecast",
"",
f"**Horizon:** {fc['horizon_hours']} hours",
f"**Resource mode:** {fc['resource_mode']}",
f"**Dispatch posture:** {fc['dispatch_posture']}",
f"**Demand level:** {fc['demand_level']}",
"",
"## Demand Metrics",
"",
f"| Metric | Recent | Baseline |",
f"|--------|-------:|---------:|",
f"| Requests/hour | {fc['recent_request_rate']} | {fc['baseline_request_rate']} |",
f"| Prompt tokens/hour | {fc['recent_prompt_tokens_per_hour']} | {fc['baseline_prompt_tokens_per_hour']} |",
"",
f"**Surge factor:** {fc['surge_factor']}x",
f"**Predicted request rate:** {fc['predicted_request_rate']}/hour",
"",
"## Infrastructure Health",
"",
f"- Gitea outages (recent window): {fc['gitea_outages']}",
f"- Inference failures (recent window): {fc['inference_failures']}",
f"- Heartbeat checks analyzed: {fc['heartbeat_checks']}",
"",
"## Recommended Actions",
"",
]
for action in fc["recommended_actions"]:
lines.append(f"- {action}")
if fc["top_callers"]:
lines.extend([
"",
"## Top Callers (Recent Window)",
"",
"| Caller | Requests | Tokens | Failures |",
"|--------|---------:|-------:|---------:|",
])
for c in fc["top_callers"]:
lines.append(
f"| {c['caller']} | {c['requests']} | {c['prompt_tokens']} | {c['failures']} |"
)
return "\n".join(lines) + "\n"
# ── CLI ──────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(
description="Predictive resource allocation for the Timmy fleet"
)
parser.add_argument(
"--metrics", nargs="*", default=None,
help="Metric JSONL paths (supports globs). Default: metrics/local_*.jsonl"
)
parser.add_argument(
"--heartbeat", nargs="*", default=None,
help="Heartbeat JSONL paths (supports globs). Default: heartbeat/ticks_*.jsonl"
)
parser.add_argument(
"--horizon", type=int, default=DEFAULT_HORIZON_HOURS,
help="Forecast horizon in hours (default: 6)"
)
parser.add_argument(
"--json", action="store_true",
help="Output raw JSON instead of markdown"
)
args = parser.parse_args()
metrics_paths = args.metrics or _default_paths(DEFAULT_METRICS_GLOB)
heartbeat_paths = args.heartbeat or _default_paths(DEFAULT_HEARTBEAT_GLOB)
fc = forecast(metrics_paths, heartbeat_paths, args.horizon)
if args.json:
print(json.dumps(fc, indent=2))
else:
print(format_markdown(fc))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,105 @@
from pathlib import Path
import yaml
from scripts.plan_nh_broadband_install import (
build_packet,
load_request,
render_markdown,
validate_request,
)
def test_script_exists() -> None:
assert Path("scripts/plan_nh_broadband_install.py").exists()
def test_example_request_exists() -> None:
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
def test_example_packet_exists() -> None:
assert Path("docs/nh-broadband-install-packet.example.md").exists()
def test_research_memo_exists() -> None:
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
def test_load_and_build_packet() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
assert packet["contact"]["name"] == "Timmy Operator"
assert packet["service_address"]["city"] == "Concord"
assert packet["service_address"]["state"] == "NH"
assert packet["status"] == "pending_scheduling_call"
assert len(packet["checklist"]) == 8
assert packet["checklist"][0]["done"] is False
def test_validate_rejects_missing_contact_name() -> None:
data = {
"contact": {"name": "", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "contact.name" in str(exc)
else:
raise AssertionError("should reject empty contact name")
def test_validate_rejects_missing_service_address() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "", "city": "X", "state": "NH"},
"checklist": ["do thing"],
}
try:
validate_request(data)
except ValueError as exc:
assert "service.address" in str(exc)
else:
raise AssertionError("should reject empty service address")
def test_validate_rejects_empty_checklist() -> None:
data = {
"contact": {"name": "A", "phone": "555"},
"service": {"address": "1 St", "city": "X", "state": "NH"},
"checklist": [],
}
try:
validate_request(data)
except ValueError as exc:
assert "checklist" in str(exc)
else:
raise AssertionError("should reject empty checklist")
def test_render_markdown_contains_key_sections() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "# NH Broadband Install Packet" in md
assert "## Contact" in md
assert "## Service Address" in md
assert "## Call Log" in md
assert "## Appointment Checklist" in md
assert "Concord" in md
assert "NH" in md
def test_render_markdown_shows_checklist_items() -> None:
data = load_request("docs/nh-broadband-install-request.example.yaml")
packet = build_packet(data)
md = render_markdown(packet, data)
assert "- [ ] Confirm exact-address availability" in md
def test_example_yaml_is_valid() -> None:
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
assert data["contact"]["name"] == "Timmy Operator"
assert len(data["checklist"]) == 8

View File

@@ -0,0 +1,236 @@
"""Tests for predictive resource allocation."""
import json
import os
import sys
from pathlib import Path
import pytest
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from predictive_resource_allocator import (
_parse_ts,
compute_rates,
analyze_callers,
analyze_heartbeat,
predict_demand,
determine_posture,
forecast,
format_markdown,
load_jsonl,
)
def _write_jsonl(path: Path, rows: list):
with open(path, "w") as f:
for row in rows:
f.write(json.dumps(row) + "\n")
def _make_metrics(count: int, base_hour: int = 0, caller: str = "heartbeat_tick",
prompt_len: int = 1000, success: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i // 60:02d}:{i % 60:02d}:00+00:00",
"caller": caller,
"prompt_len": prompt_len,
"response_len": 50,
"success": success,
})
return rows
def _make_heartbeat(count: int, base_hour: int = 0,
gitea_alive: bool = True, inference_ok: bool = True) -> list:
rows = []
for i in range(count):
rows.append({
"timestamp": f"2026-03-29T{base_hour + i:02d}:00:00+00:00",
"perception": {
"gitea_alive": gitea_alive,
"model_health": {"inference_ok": inference_ok},
},
})
return rows
# ── Timestamp Parsing ────────────────────────────────────────────────────────
class TestTimestampParsing:
def test_z_suffix(self):
dt = _parse_ts("2026-03-29T12:00:00Z")
assert dt.tzinfo is not None
def test_explicit_offset(self):
dt = _parse_ts("2026-03-29T12:00:00+00:00")
assert dt.hour == 12
def test_ordering(self):
earlier = _parse_ts("2026-03-29T10:00:00Z")
later = _parse_ts("2026-03-29T12:00:00Z")
assert earlier < later
# ── Rate Computation ─────────────────────────────────────────────────────────
class TestComputeRates:
def test_empty_returns_defaults(self):
r_rate, b_rate, surge, _, _ = compute_rates([], 6)
assert r_rate == 0.0
assert surge == 1.0
def test_surge_detected(self):
# 1 baseline req, 20 recent reqs
baseline = _make_metrics(1, base_hour=0)
recent = _make_metrics(20, base_hour=12)
rows = baseline + recent
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge > 1.0
def test_no_surge_when_stable(self):
# Same rate in both windows
early = _make_metrics(6, base_hour=0)
late = _make_metrics(6, base_hour=12)
rows = early + late
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
assert surge < 1.5
# ── Caller Analysis ──────────────────────────────────────────────────────────
class TestAnalyzeCallers:
def test_empty(self):
assert analyze_callers([], 6) == []
def test_groups_by_caller(self):
rows = _make_metrics(3, caller="heartbeat_tick") + _make_metrics(2, caller="know-thy-father", prompt_len=15000)
callers = analyze_callers(rows, horizon_hours=24)
names = [c["caller"] for c in callers]
assert "heartbeat_tick" in names
assert "know-thy-father" in names
def test_sorted_by_request_count(self):
rows = _make_metrics(1, caller="rare") + _make_metrics(10, caller="frequent")
callers = analyze_callers(rows, horizon_hours=24)
assert callers[0]["caller"] == "frequent"
def test_failure_rate(self):
rows = _make_metrics(10, caller="flaky", success=False)
callers = analyze_callers(rows, horizon_hours=24)
flaky = [c for c in callers if c["caller"] == "flaky"][0]
assert flaky["failure_rate"] == 100.0
# ── Heartbeat Analysis ───────────────────────────────────────────────────────
class TestAnalyzeHeartbeat:
def test_empty(self):
result = analyze_heartbeat([], 6)
assert result["gitea_outages"] == 0
def test_detects_gitea_outage(self):
rows = _make_heartbeat(3, gitea_alive=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["gitea_outages"] == 3
def test_detects_inference_failure(self):
rows = _make_heartbeat(2, inference_ok=False)
result = analyze_heartbeat(rows, horizon_hours=24)
assert result["inference_failures"] == 2
# ── Demand Prediction ────────────────────────────────────────────────────────
class TestPredictDemand:
def test_critical_on_extreme_surge(self):
result = predict_demand(100.0, 10.0, 10.0, 6)
assert result["demand_level"] == "critical"
def test_elevated_on_moderate_surge(self):
result = predict_demand(50.0, 10.0, 2.0, 6)
assert result["demand_level"] == "elevated"
def test_normal_on_slight_increase(self):
result = predict_demand(12.0, 10.0, 1.2, 6)
assert result["demand_level"] == "normal"
def test_low_when_decreasing(self):
result = predict_demand(5.0, 10.0, 0.5, 6)
assert result["demand_level"] == "low"
# ── Posture Determination ────────────────────────────────────────────────────
class TestDeterminePosture:
def test_steady_normal_when_no_issues(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "steady"
assert posture == "normal"
assert "no surge indicators" in actions[0]
def test_surge_on_high_factor(self):
mode, posture, actions = determine_posture(2.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert mode == "surge"
assert any("Pre-warm" in a for a in actions)
def test_degraded_on_gitea_outage(self):
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 3, "inference_failures": 0, "total_checks": 5})
assert posture == "degraded"
assert any("forge state" in a for a in actions)
def test_heavy_background_flagged(self):
callers = [{"caller": "know-thy-father-batch", "requests": 5, "prompt_tokens": 50000, "failures": 0, "failure_rate": 0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("Throttle" in a or "background" in a for a in actions)
def test_failing_callers_flagged(self):
callers = [{"caller": "bad_actor", "requests": 10, "prompt_tokens": 1000, "failures": 5, "failure_rate": 50.0}]
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
assert any("failure rate" in a.lower() for a in actions)
# ── Full Forecast ────────────────────────────────────────────────────────────
class TestForecast:
def test_end_to_end(self, tmp_path):
metrics_path = tmp_path / "metrics.jsonl"
heartbeat_path = tmp_path / "heartbeat.jsonl"
_write_jsonl(metrics_path, _make_metrics(6, base_hour=0) + _make_metrics(30, base_hour=12))
_write_jsonl(heartbeat_path, _make_heartbeat(5, base_hour=8, inference_ok=False))
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert "resource_mode" in result
assert "dispatch_posture" in result
assert "surge_factor" in result
assert "top_callers" in result
assert "recommended_actions" in result
assert isinstance(result["top_callers"], list)
assert isinstance(result["recommended_actions"], list)
def test_empty_inputs(self, tmp_path):
metrics_path = tmp_path / "empty_m.jsonl"
heartbeat_path = tmp_path / "empty_h.jsonl"
metrics_path.write_text("")
heartbeat_path.write_text("")
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
assert result["resource_mode"] == "steady"
assert result["surge_factor"] == 1.0
# ── Markdown Output ──────────────────────────────────────────────────────────
class TestFormatMarkdown:
def test_contains_key_sections(self):
fc = forecast([], [], horizon_hours=6)
md = format_markdown(fc)
assert "Predictive Resource Allocation" in md
assert "Demand Metrics" in md
assert "Recommended Actions" in md
assert "Horizon" in md

View File

@@ -17,8 +17,24 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import importlib.util
from harness import UniWizardHarness, House, ExecutionResult
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution."""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
class TaskType(Enum):

View File

@@ -8,13 +8,30 @@ import time
import sys
import argparse
import os
import importlib.util
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent))
def _load_local(module_name: str, filename: str):
"""Import a module from an explicit file path, bypassing sys.path resolution.
Prevents namespace collisions when multiple directories contain modules
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
"""
spec = importlib.util.spec_from_file_location(
module_name,
str(Path(__file__).parent / filename),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
_harness = _load_local("v2_harness", "harness.py")
UniWizardHarness = _harness.UniWizardHarness
House = _harness.House
ExecutionResult = _harness.ExecutionResult
from harness import UniWizardHarness, House, ExecutionResult
from router import HouseRouter, TaskType
from author_whitelist import AuthorWhitelist