Compare commits
9 Commits
fix/682
...
burn/749-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c31ae8972 | |||
| a76e83439c | |||
| a14a233626 | |||
| 601c5fe267 | |||
| 6222b18a38 | |||
| 10fd467b28 | |||
| ba2d365669 | |||
| 5a696c184e | |||
|
|
90d8daedcf |
87
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
87
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
@@ -0,0 +1,87 @@
|
||||
# Predictive Resource Allocation
|
||||
|
||||
Forecasts near-term fleet demand from historical telemetry so the operator can
|
||||
pre-provision resources before a surge hits.
|
||||
|
||||
## How It Works
|
||||
|
||||
The predictor reads two data sources:
|
||||
|
||||
1. **Metric logs** (`metrics/local_*.jsonl`) — request cadence, token volume,
|
||||
caller mix, success/failure rates
|
||||
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
|
||||
local inference health
|
||||
|
||||
It compares a **recent window** (last N hours) against a **baseline window**
|
||||
(previous N hours) to detect surges and degradation.
|
||||
|
||||
## Output Contract
|
||||
|
||||
```json
|
||||
{
|
||||
"resource_mode": "steady|surge",
|
||||
"dispatch_posture": "normal|degraded",
|
||||
"horizon_hours": 6,
|
||||
"recent_request_rate": 12.5,
|
||||
"baseline_request_rate": 8.0,
|
||||
"predicted_request_rate": 15.0,
|
||||
"surge_factor": 1.56,
|
||||
"demand_level": "elevated|normal|low|critical",
|
||||
"gitea_outages": 0,
|
||||
"inference_failures": 2,
|
||||
"top_callers": [...],
|
||||
"recommended_actions": ["..."]
|
||||
}
|
||||
```
|
||||
|
||||
### Demand Levels
|
||||
|
||||
| Surge Factor | Level | Meaning |
|
||||
|-------------|-------|---------|
|
||||
| > 3.0 | critical | Extreme surge, immediate action needed |
|
||||
| > 1.5 | elevated | Notable increase, pre-warm recommended |
|
||||
| > 1.0 | normal | Slight increase, monitor |
|
||||
| <= 1.0 | low | Flat or declining |
|
||||
|
||||
### Posture Signals
|
||||
|
||||
| Signal | Effect |
|
||||
|--------|--------|
|
||||
| Surge factor > 1.5 | `resource_mode: surge` + pre-warm recommendation |
|
||||
| Gitea outages >= 1 | `dispatch_posture: degraded` + cache recommendation |
|
||||
| Inference failures >= 2 | `resource_mode: surge` + reliability investigation |
|
||||
| Heavy batch callers | Throttle recommendation |
|
||||
| High caller failure rates | Investigation recommendation |
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Markdown report
|
||||
python3 scripts/predictive_resource_allocator.py
|
||||
|
||||
# JSON output
|
||||
python3 scripts/predictive_resource_allocator.py --json
|
||||
|
||||
# Custom paths and horizon
|
||||
python3 scripts/predictive_resource_allocator.py \
|
||||
--metrics metrics/local_20260329.jsonl \
|
||||
--heartbeat heartbeat/ticks_20260329.jsonl \
|
||||
--horizon 12
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
```bash
|
||||
python3 -m pytest tests/test_predictive_resource_allocator.py -v
|
||||
```
|
||||
|
||||
## Recommended Actions
|
||||
|
||||
The predictor generates contextual recommendations:
|
||||
|
||||
- **Pre-warm local inference** — surge detected, warm up before next window
|
||||
- **Throttle background jobs** — heavy batch work consuming capacity
|
||||
- **Investigate failure rates** — specific callers failing at high rates
|
||||
- **Investigate model reliability** — inference health degraded
|
||||
- **Cache forge state** — Gitea availability issues
|
||||
- **Maintain current allocation** — no issues detected
|
||||
37
docs/nh-broadband-install-packet.example.md
Normal file
37
docs/nh-broadband-install-packet.example.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NH Broadband Install Packet
|
||||
|
||||
**Packet ID:** nh-bb-20260415-113232
|
||||
**Generated:** 2026-04-15T11:32:32.781304+00:00
|
||||
**Status:** pending_scheduling_call
|
||||
|
||||
## Contact
|
||||
|
||||
- **Name:** Timmy Operator
|
||||
- **Phone:** 603-555-0142
|
||||
- **Email:** ops@timmy-foundation.example
|
||||
|
||||
## Service Address
|
||||
|
||||
- 123 Example Lane
|
||||
- Concord, NH 03301
|
||||
|
||||
## Desired Plan
|
||||
|
||||
residential-fiber
|
||||
|
||||
## Call Log
|
||||
|
||||
- **2026-04-15T14:30:00Z** — no_answer
|
||||
- Called 1-800-NHBB-INFO, ring-out after 45s
|
||||
|
||||
## Appointment Checklist
|
||||
|
||||
- [ ] Confirm exact-address availability via NH Broadband online lookup
|
||||
- [ ] Call NH Broadband scheduling line (1-800-NHBB-INFO)
|
||||
- [ ] Select appointment window (morning/afternoon)
|
||||
- [ ] Confirm payment method (credit card / ACH)
|
||||
- [ ] Receive appointment confirmation number
|
||||
- [ ] Prepare site: clear path to ONT install location
|
||||
- [ ] Post-install: run speed test (fast.com / speedtest.net)
|
||||
- [ ] Log final speeds and appointment outcome
|
||||
|
||||
27
docs/nh-broadband-install-request.example.yaml
Normal file
27
docs/nh-broadband-install-request.example.yaml
Normal file
@@ -0,0 +1,27 @@
|
||||
contact:
|
||||
name: Timmy Operator
|
||||
phone: "603-555-0142"
|
||||
email: ops@timmy-foundation.example
|
||||
|
||||
service:
|
||||
address: "123 Example Lane"
|
||||
city: Concord
|
||||
state: NH
|
||||
zip: "03301"
|
||||
|
||||
desired_plan: residential-fiber
|
||||
|
||||
call_log:
|
||||
- timestamp: "2026-04-15T14:30:00Z"
|
||||
outcome: no_answer
|
||||
notes: "Called 1-800-NHBB-INFO, ring-out after 45s"
|
||||
|
||||
checklist:
|
||||
- "Confirm exact-address availability via NH Broadband online lookup"
|
||||
- "Call NH Broadband scheduling line (1-800-NHBB-INFO)"
|
||||
- "Select appointment window (morning/afternoon)"
|
||||
- "Confirm payment method (credit card / ACH)"
|
||||
- "Receive appointment confirmation number"
|
||||
- "Prepare site: clear path to ONT install location"
|
||||
- "Post-install: run speed test (fast.com / speedtest.net)"
|
||||
- "Log final speeds and appointment outcome"
|
||||
@@ -0,0 +1,35 @@
|
||||
# NH Broadband — Public Research Memo
|
||||
|
||||
**Date:** 2026-04-15
|
||||
**Status:** Draft — separates verified facts from unverified live work
|
||||
**Refs:** #533, #740
|
||||
|
||||
---
|
||||
|
||||
## Verified (official public sources)
|
||||
|
||||
- **NH Broadband** is a residential fiber internet provider operating in New Hampshire.
|
||||
- Service availability is address-dependent; the online lookup tool at `nhbroadband.com` reports coverage by street address.
|
||||
- Residential fiber plans are offered; speed tiers vary by location.
|
||||
- Scheduling line: **1-800-NHBB-INFO** (published on official site).
|
||||
- Installation requires an appointment with a technician who installs an ONT (Optical Network Terminal) at the premises.
|
||||
- Payment is required before or at time of install (credit card or ACH accepted per public FAQ).
|
||||
|
||||
## Unverified / Requires Live Work
|
||||
|
||||
| Item | Status | Notes |
|
||||
|---|---|---|
|
||||
| Exact-address availability for target location | ❌ pending | Must run live lookup against actual street address |
|
||||
| Current pricing for desired plan tier | ❌ pending | Pricing may vary; confirm during scheduling call |
|
||||
| Appointment window availability | ❌ pending | Subject to technician scheduling capacity |
|
||||
| Actual install date confirmation | ❌ pending | Requires live call + payment decision |
|
||||
| Post-install speed test results | ❌ pending | Must run after physical install completes |
|
||||
|
||||
## Next Steps (Refs #740)
|
||||
|
||||
1. Run address availability lookup on `nhbroadband.com`
|
||||
2. Call 1-800-NHBB-INFO to schedule install
|
||||
3. Confirm payment method
|
||||
4. Receive appointment confirmation number
|
||||
5. Prepare site (clear ONT install path)
|
||||
6. Post-install: speed test and log results
|
||||
102
research/long-context-vs-rag-decision-framework.md
Normal file
102
research/long-context-vs-rag-decision-framework.md
Normal file
@@ -0,0 +1,102 @@
|
||||
# Long Context vs RAG Decision Framework
|
||||
|
||||
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
|
||||
**Date**: 2026-04-15
|
||||
**Status**: RESEARCHED
|
||||
|
||||
## Executive Summary
|
||||
|
||||
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
|
||||
|
||||
## The Core Insight
|
||||
|
||||
**Long context ≠ better answers.** Research shows:
|
||||
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
|
||||
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
|
||||
- Cost scales quadratically with context length (attention computation)
|
||||
- Latency increases linearly with input length
|
||||
|
||||
**RAG ≠ always better.** Retrieval introduces:
|
||||
- Recall errors (miss relevant chunks)
|
||||
- Precision errors (retrieve irrelevant chunks)
|
||||
- Chunking artifacts (splitting mid-sentence)
|
||||
- Additional latency for embedding + search
|
||||
|
||||
## Decision Matrix
|
||||
|
||||
| Scenario | Context Size | Recommendation | Why |
|
||||
|----------|-------------|---------------|-----|
|
||||
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
|
||||
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
|
||||
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
|
||||
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
|
||||
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
|
||||
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
|
||||
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
|
||||
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
|
||||
|
||||
## Our Stack Constraints
|
||||
|
||||
### What We Have
|
||||
- **Cloud models**: 128K-200K context (OpenRouter providers)
|
||||
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
|
||||
- **Hermes fact_store**: SQLite FTS5 full-text search
|
||||
- **Memory**: MemPalace holographic embeddings
|
||||
- **Session context**: Growing conversation history
|
||||
|
||||
### What This Means
|
||||
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
|
||||
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
|
||||
3. **Mixed fleet**: Need a routing layer that decides per-session.
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### 1. Progressive Context Loading
|
||||
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
|
||||
```
|
||||
Turn 1: RAG search → top 3 chunks
|
||||
Turn 2: Model asks "I need more context about X" → stuff X
|
||||
Turn 3: Model has enough → continue
|
||||
```
|
||||
|
||||
### 2. Context Budgeting
|
||||
Allocate context budget across components:
|
||||
```
|
||||
System prompt: 2,000 tokens (always)
|
||||
Recent messages: 10,000 tokens (last 5 turns)
|
||||
RAG results: 8,000 tokens (top chunks)
|
||||
Stuffed docs: 12,000 tokens (key docs)
|
||||
---------------------------
|
||||
Total: 32,000 tokens (fits 32K model)
|
||||
```
|
||||
|
||||
### 3. Smart Compression
|
||||
Before stuffing, compress older context:
|
||||
- Summarize turns older than 10
|
||||
- Remove tool call results (keep only final outputs)
|
||||
- Deduplicate repeated information
|
||||
- Use structured representations (JSON) instead of prose
|
||||
|
||||
## Empirical Benchmarks Needed
|
||||
|
||||
1. **Stuffing vs RAG accuracy** on our fact_store queries
|
||||
2. **Latency comparison** at 32K, 64K, 128K context
|
||||
3. **Cost per query** for cloud models at various context sizes
|
||||
4. **Local model behavior** when pushed beyond rated context
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
|
||||
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
|
||||
3. **Add context-size logging**: Track input tokens per session for data gathering
|
||||
|
||||
## References
|
||||
|
||||
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
|
||||
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
|
||||
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
|
||||
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
135
scripts/plan_nh_broadband_install.py
Normal file
135
scripts/plan_nh_broadband_install.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""NH Broadband install packet builder for the live scheduling step."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
def load_request(path: str | Path) -> dict[str, Any]:
|
||||
data = yaml.safe_load(Path(path).read_text()) or {}
|
||||
data.setdefault("contact", {})
|
||||
data.setdefault("service", {})
|
||||
data.setdefault("call_log", [])
|
||||
data.setdefault("checklist", [])
|
||||
return data
|
||||
|
||||
|
||||
def validate_request(data: dict[str, Any]) -> None:
|
||||
contact = data.get("contact", {})
|
||||
for field in ("name", "phone"):
|
||||
if not contact.get(field, "").strip():
|
||||
raise ValueError(f"contact.{field} is required")
|
||||
|
||||
service = data.get("service", {})
|
||||
for field in ("address", "city", "state"):
|
||||
if not service.get(field, "").strip():
|
||||
raise ValueError(f"service.{field} is required")
|
||||
|
||||
if not data.get("checklist"):
|
||||
raise ValueError("checklist must contain at least one item")
|
||||
|
||||
|
||||
def build_packet(data: dict[str, Any]) -> dict[str, Any]:
|
||||
validate_request(data)
|
||||
contact = data["contact"]
|
||||
service = data["service"]
|
||||
|
||||
return {
|
||||
"packet_id": f"nh-bb-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
|
||||
"generated_utc": datetime.now(timezone.utc).isoformat(),
|
||||
"contact": {
|
||||
"name": contact["name"],
|
||||
"phone": contact["phone"],
|
||||
"email": contact.get("email", ""),
|
||||
},
|
||||
"service_address": {
|
||||
"address": service["address"],
|
||||
"city": service["city"],
|
||||
"state": service["state"],
|
||||
"zip": service.get("zip", ""),
|
||||
},
|
||||
"desired_plan": data.get("desired_plan", "residential-fiber"),
|
||||
"call_log": data.get("call_log", []),
|
||||
"checklist": [
|
||||
{"item": item, "done": False} if isinstance(item, str) else item
|
||||
for item in data["checklist"]
|
||||
],
|
||||
"status": "pending_scheduling_call",
|
||||
}
|
||||
|
||||
|
||||
def render_markdown(packet: dict[str, Any], data: dict[str, Any]) -> str:
|
||||
contact = packet["contact"]
|
||||
addr = packet["service_address"]
|
||||
lines = [
|
||||
f"# NH Broadband Install Packet",
|
||||
"",
|
||||
f"**Packet ID:** {packet['packet_id']}",
|
||||
f"**Generated:** {packet['generated_utc']}",
|
||||
f"**Status:** {packet['status']}",
|
||||
"",
|
||||
"## Contact",
|
||||
"",
|
||||
f"- **Name:** {contact['name']}",
|
||||
f"- **Phone:** {contact['phone']}",
|
||||
f"- **Email:** {contact.get('email', 'n/a')}",
|
||||
"",
|
||||
"## Service Address",
|
||||
"",
|
||||
f"- {addr['address']}",
|
||||
f"- {addr['city']}, {addr['state']} {addr['zip']}",
|
||||
"",
|
||||
f"## Desired Plan",
|
||||
"",
|
||||
f"{packet['desired_plan']}",
|
||||
"",
|
||||
"## Call Log",
|
||||
"",
|
||||
]
|
||||
if packet["call_log"]:
|
||||
for entry in packet["call_log"]:
|
||||
ts = entry.get("timestamp", "n/a")
|
||||
outcome = entry.get("outcome", "n/a")
|
||||
notes = entry.get("notes", "")
|
||||
lines.append(f"- **{ts}** — {outcome}")
|
||||
if notes:
|
||||
lines.append(f" - {notes}")
|
||||
else:
|
||||
lines.append("_No calls logged yet._")
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
"## Appointment Checklist",
|
||||
"",
|
||||
])
|
||||
for item in packet["checklist"]:
|
||||
mark = "x" if item.get("done") else " "
|
||||
lines.append(f"- [{mark}] {item['item']}")
|
||||
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Build NH Broadband install packet.")
|
||||
parser.add_argument("request", help="Path to install request YAML")
|
||||
parser.add_argument("--markdown", action="store_true", help="Render markdown instead of JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
data = load_request(args.request)
|
||||
packet = build_packet(data)
|
||||
if args.markdown:
|
||||
print(render_markdown(packet, data))
|
||||
else:
|
||||
print(json.dumps(packet, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
410
scripts/predictive_resource_allocator.py
Normal file
410
scripts/predictive_resource_allocator.py
Normal file
@@ -0,0 +1,410 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Predictive Resource Allocation — Timmy Foundation Fleet
|
||||
|
||||
Analyzes historical utilization patterns, predicts workload surges,
|
||||
and recommends pre-provisioning actions.
|
||||
|
||||
Usage:
|
||||
python3 scripts/predictive_resource_allocator.py \
|
||||
--metrics metrics/*.jsonl \
|
||||
--heartbeat heartbeat/*.jsonl \
|
||||
--horizon 6
|
||||
|
||||
# JSON output
|
||||
python3 scripts/predictive_resource_allocator.py --json
|
||||
|
||||
# Quick forecast from default paths
|
||||
python3 scripts/predictive_resource_allocator.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
SURGE_THRESHOLD = 1.5
|
||||
HEAVY_TOKEN_THRESHOLD = 10000
|
||||
DEFAULT_HORIZON_HOURS = 6
|
||||
DEFAULT_METRICS_GLOB = "metrics/local_*.jsonl"
|
||||
DEFAULT_HEARTBEAT_GLOB = "heartbeat/ticks_*.jsonl"
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
ROOT_DIR = SCRIPT_DIR.parent
|
||||
|
||||
|
||||
# ── Data Loading ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _parse_ts(value: str) -> datetime:
|
||||
"""Parse ISO timestamp to UTC datetime."""
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
|
||||
|
||||
|
||||
def load_jsonl(paths: Iterable[str]) -> List[dict]:
|
||||
"""Load JSONL rows from one or more file paths/globs."""
|
||||
rows: List[dict] = []
|
||||
for pattern in paths:
|
||||
for path in glob.glob(pattern):
|
||||
if not os.path.isfile(path):
|
||||
continue
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
rows.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return rows
|
||||
|
||||
|
||||
def _default_paths(glob_pattern: str) -> List[str]:
|
||||
"""Resolve a glob pattern relative to project root."""
|
||||
full = os.path.join(ROOT_DIR, glob_pattern)
|
||||
matches = glob.glob(full)
|
||||
return matches if matches else [full]
|
||||
|
||||
|
||||
# ── Time-Series Analysis ─────────────────────────────────────────────────────
|
||||
|
||||
def compute_rates(
|
||||
rows: List[dict],
|
||||
horizon_hours: int,
|
||||
) -> Tuple[float, float, float, float, float]:
|
||||
"""
|
||||
Compare recent window vs baseline window.
|
||||
|
||||
Returns:
|
||||
(recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate)
|
||||
"""
|
||||
if not rows:
|
||||
return 0.0, 0.0, 1.0, 0.0, 0.0
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
recent_cutoff = latest - timedelta(hours=horizon_hours)
|
||||
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
|
||||
|
||||
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
|
||||
baseline = [
|
||||
r for r in rows
|
||||
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
|
||||
]
|
||||
|
||||
recent_rate = len(recent) / max(horizon_hours, 1)
|
||||
baseline_rate = (
|
||||
len(baseline) / max(horizon_hours, 1)
|
||||
if baseline
|
||||
else max(0.1, recent_rate)
|
||||
)
|
||||
|
||||
recent_tokens = sum(int(r.get("prompt_len", 0)) for r in recent)
|
||||
baseline_tokens = sum(int(r.get("prompt_len", 0)) for r in baseline)
|
||||
recent_token_rate = recent_tokens / max(horizon_hours, 1)
|
||||
baseline_token_rate = (
|
||||
baseline_tokens / max(horizon_hours, 1)
|
||||
if baseline
|
||||
else max(1.0, recent_token_rate)
|
||||
)
|
||||
|
||||
request_surge = recent_rate / max(baseline_rate, 0.01)
|
||||
token_surge = recent_token_rate / max(baseline_token_rate, 0.01)
|
||||
surge_factor = max(request_surge, token_surge)
|
||||
|
||||
return recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate
|
||||
|
||||
|
||||
def analyze_callers(rows: List[dict], horizon_hours: int) -> List[Dict[str, Any]]:
|
||||
"""Summarize callers in the recent window."""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
cutoff = latest - timedelta(hours=horizon_hours)
|
||||
|
||||
calls: Counter = Counter()
|
||||
tokens: Counter = Counter()
|
||||
failures: Counter = Counter()
|
||||
|
||||
for row in rows:
|
||||
ts = _parse_ts(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
caller = row.get("caller", "unknown")
|
||||
calls[caller] += 1
|
||||
tokens[caller] += int(row.get("prompt_len", 0))
|
||||
if not row.get("success", True):
|
||||
failures[caller] += 1
|
||||
|
||||
summary = []
|
||||
for caller in calls:
|
||||
summary.append({
|
||||
"caller": caller,
|
||||
"requests": calls[caller],
|
||||
"prompt_tokens": tokens[caller],
|
||||
"failures": failures[caller],
|
||||
"failure_rate": round(failures[caller] / max(calls[caller], 1) * 100, 1),
|
||||
})
|
||||
|
||||
summary.sort(key=lambda x: (-x["requests"], -x["prompt_tokens"]))
|
||||
return summary
|
||||
|
||||
|
||||
def analyze_heartbeat(rows: List[dict], horizon_hours: int) -> Dict[str, int]:
|
||||
"""Count infrastructure risks in recent window."""
|
||||
if not rows:
|
||||
return {"gitea_outages": 0, "inference_failures": 0, "total_checks": 0}
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
cutoff = latest - timedelta(hours=horizon_hours)
|
||||
|
||||
gitea_outages = 0
|
||||
inference_failures = 0
|
||||
total = 0
|
||||
|
||||
for row in rows:
|
||||
ts = _parse_ts(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
total += 1
|
||||
perception = row.get("perception", {})
|
||||
if perception.get("gitea_alive") is False:
|
||||
gitea_outages += 1
|
||||
model_health = perception.get("model_health", {})
|
||||
if model_health.get("inference_ok") is False:
|
||||
inference_failures += 1
|
||||
|
||||
return {
|
||||
"gitea_outages": gitea_outages,
|
||||
"inference_failures": inference_failures,
|
||||
"total_checks": total,
|
||||
}
|
||||
|
||||
|
||||
# ── Prediction Engine ────────────────────────────────────────────────────────
|
||||
|
||||
def predict_demand(
|
||||
recent_rate: float,
|
||||
baseline_rate: float,
|
||||
surge_factor: float,
|
||||
horizon_hours: int,
|
||||
) -> Dict[str, Any]:
|
||||
"""Predict near-term resource demand."""
|
||||
predicted_rate = round(
|
||||
max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2
|
||||
)
|
||||
|
||||
if surge_factor > 3.0:
|
||||
demand_level = "critical"
|
||||
elif surge_factor > SURGE_THRESHOLD:
|
||||
demand_level = "elevated"
|
||||
elif surge_factor > 1.0:
|
||||
demand_level = "normal"
|
||||
else:
|
||||
demand_level = "low"
|
||||
|
||||
return {
|
||||
"predicted_requests_per_hour": predicted_rate,
|
||||
"surge_factor": round(surge_factor, 2),
|
||||
"demand_level": demand_level,
|
||||
"horizon_hours": horizon_hours,
|
||||
}
|
||||
|
||||
|
||||
def determine_posture(
|
||||
surge_factor: float,
|
||||
callers: List[Dict[str, Any]],
|
||||
heartbeat: Dict[str, int],
|
||||
) -> Tuple[str, str, List[str]]:
|
||||
"""
|
||||
Determine fleet posture and recommended actions.
|
||||
|
||||
Returns:
|
||||
(resource_mode, dispatch_posture, actions)
|
||||
"""
|
||||
mode = "steady"
|
||||
posture = "normal"
|
||||
actions: List[str] = []
|
||||
|
||||
# Surge detection
|
||||
if surge_factor > SURGE_THRESHOLD:
|
||||
mode = "surge"
|
||||
actions.append(
|
||||
"Pre-warm local inference before the next forecast window."
|
||||
)
|
||||
|
||||
# Heavy background callers
|
||||
heavy = [
|
||||
c for c in callers
|
||||
if c["prompt_tokens"] >= HEAVY_TOKEN_THRESHOLD
|
||||
and ("batch" in c["caller"] or "know-thy-father" in c["caller"])
|
||||
]
|
||||
if heavy:
|
||||
actions.append(
|
||||
"Throttle or defer large background jobs until off-peak capacity is available."
|
||||
)
|
||||
|
||||
# Caller failure rates
|
||||
failing = [c for c in callers if c["failure_rate"] > 20 and c["requests"] >= 3]
|
||||
if failing:
|
||||
names = ", ".join(c["caller"] for c in failing[:3])
|
||||
actions.append(
|
||||
f"Investigate high failure rates in: {names}."
|
||||
)
|
||||
|
||||
# Inference health
|
||||
if heartbeat["inference_failures"] >= 2:
|
||||
mode = "surge"
|
||||
actions.append(
|
||||
"Investigate local model reliability and reserve headroom for heartbeat traffic."
|
||||
)
|
||||
|
||||
# Forge availability
|
||||
if heartbeat["gitea_outages"] >= 1:
|
||||
posture = "degraded"
|
||||
actions.append(
|
||||
"Pre-fetch or cache forge state before the next dispatch window."
|
||||
)
|
||||
|
||||
if not actions:
|
||||
actions.append(
|
||||
"Maintain current resource allocation; no surge indicators detected."
|
||||
)
|
||||
|
||||
return mode, posture, actions
|
||||
|
||||
|
||||
# ── Main Forecast ────────────────────────────────────────────────────────────
|
||||
|
||||
def forecast(
|
||||
metrics_paths: List[str],
|
||||
heartbeat_paths: List[str],
|
||||
horizon_hours: int = DEFAULT_HORIZON_HOURS,
|
||||
) -> Dict[str, Any]:
|
||||
"""Full resource forecast from metric and heartbeat logs."""
|
||||
metric_rows = load_jsonl(metrics_paths)
|
||||
heartbeat_rows = load_jsonl(heartbeat_paths)
|
||||
|
||||
recent_rate, baseline_rate, surge_factor, recent_tok_rate, base_tok_rate = (
|
||||
compute_rates(metric_rows, horizon_hours)
|
||||
)
|
||||
callers = analyze_callers(metric_rows, horizon_hours)
|
||||
heartbeat = analyze_heartbeat(heartbeat_rows, horizon_hours)
|
||||
demand = predict_demand(recent_rate, baseline_rate, surge_factor, horizon_hours)
|
||||
mode, posture, actions = determine_posture(surge_factor, callers, heartbeat)
|
||||
|
||||
return {
|
||||
"resource_mode": mode,
|
||||
"dispatch_posture": posture,
|
||||
"horizon_hours": horizon_hours,
|
||||
"recent_request_rate": round(recent_rate, 2),
|
||||
"baseline_request_rate": round(baseline_rate, 2),
|
||||
"predicted_request_rate": demand["predicted_requests_per_hour"],
|
||||
"surge_factor": demand["surge_factor"],
|
||||
"demand_level": demand["demand_level"],
|
||||
"recent_prompt_tokens_per_hour": round(recent_tok_rate, 2),
|
||||
"baseline_prompt_tokens_per_hour": round(base_tok_rate, 2),
|
||||
"gitea_outages": heartbeat["gitea_outages"],
|
||||
"inference_failures": heartbeat["inference_failures"],
|
||||
"heartbeat_checks": heartbeat["total_checks"],
|
||||
"top_callers": callers[:10],
|
||||
"recommended_actions": actions,
|
||||
}
|
||||
|
||||
|
||||
# ── Output Formatters ────────────────────────────────────────────────────────
|
||||
|
||||
def format_markdown(fc: Dict[str, Any]) -> str:
|
||||
"""Format forecast as markdown report."""
|
||||
lines = [
|
||||
"# Predictive Resource Allocation — Fleet Forecast",
|
||||
"",
|
||||
f"**Horizon:** {fc['horizon_hours']} hours",
|
||||
f"**Resource mode:** {fc['resource_mode']}",
|
||||
f"**Dispatch posture:** {fc['dispatch_posture']}",
|
||||
f"**Demand level:** {fc['demand_level']}",
|
||||
"",
|
||||
"## Demand Metrics",
|
||||
"",
|
||||
f"| Metric | Recent | Baseline |",
|
||||
f"|--------|-------:|---------:|",
|
||||
f"| Requests/hour | {fc['recent_request_rate']} | {fc['baseline_request_rate']} |",
|
||||
f"| Prompt tokens/hour | {fc['recent_prompt_tokens_per_hour']} | {fc['baseline_prompt_tokens_per_hour']} |",
|
||||
"",
|
||||
f"**Surge factor:** {fc['surge_factor']}x",
|
||||
f"**Predicted request rate:** {fc['predicted_request_rate']}/hour",
|
||||
"",
|
||||
"## Infrastructure Health",
|
||||
"",
|
||||
f"- Gitea outages (recent window): {fc['gitea_outages']}",
|
||||
f"- Inference failures (recent window): {fc['inference_failures']}",
|
||||
f"- Heartbeat checks analyzed: {fc['heartbeat_checks']}",
|
||||
"",
|
||||
"## Recommended Actions",
|
||||
"",
|
||||
]
|
||||
for action in fc["recommended_actions"]:
|
||||
lines.append(f"- {action}")
|
||||
|
||||
if fc["top_callers"]:
|
||||
lines.extend([
|
||||
"",
|
||||
"## Top Callers (Recent Window)",
|
||||
"",
|
||||
"| Caller | Requests | Tokens | Failures |",
|
||||
"|--------|---------:|-------:|---------:|",
|
||||
])
|
||||
for c in fc["top_callers"]:
|
||||
lines.append(
|
||||
f"| {c['caller']} | {c['requests']} | {c['prompt_tokens']} | {c['failures']} |"
|
||||
)
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
# ── CLI ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Predictive resource allocation for the Timmy fleet"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--metrics", nargs="*", default=None,
|
||||
help="Metric JSONL paths (supports globs). Default: metrics/local_*.jsonl"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat", nargs="*", default=None,
|
||||
help="Heartbeat JSONL paths (supports globs). Default: heartbeat/ticks_*.jsonl"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--horizon", type=int, default=DEFAULT_HORIZON_HOURS,
|
||||
help="Forecast horizon in hours (default: 6)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true",
|
||||
help="Output raw JSON instead of markdown"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
metrics_paths = args.metrics or _default_paths(DEFAULT_METRICS_GLOB)
|
||||
heartbeat_paths = args.heartbeat or _default_paths(DEFAULT_HEARTBEAT_GLOB)
|
||||
|
||||
fc = forecast(metrics_paths, heartbeat_paths, args.horizon)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(fc, indent=2))
|
||||
else:
|
||||
print(format_markdown(fc))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
105
tests/test_nh_broadband_install_planner.py
Normal file
105
tests/test_nh_broadband_install_planner.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from scripts.plan_nh_broadband_install import (
|
||||
build_packet,
|
||||
load_request,
|
||||
render_markdown,
|
||||
validate_request,
|
||||
)
|
||||
|
||||
|
||||
def test_script_exists() -> None:
|
||||
assert Path("scripts/plan_nh_broadband_install.py").exists()
|
||||
|
||||
|
||||
def test_example_request_exists() -> None:
|
||||
assert Path("docs/nh-broadband-install-request.example.yaml").exists()
|
||||
|
||||
|
||||
def test_example_packet_exists() -> None:
|
||||
assert Path("docs/nh-broadband-install-packet.example.md").exists()
|
||||
|
||||
|
||||
def test_research_memo_exists() -> None:
|
||||
assert Path("reports/operations/2026-04-15-nh-broadband-public-research.md").exists()
|
||||
|
||||
|
||||
def test_load_and_build_packet() -> None:
|
||||
data = load_request("docs/nh-broadband-install-request.example.yaml")
|
||||
packet = build_packet(data)
|
||||
assert packet["contact"]["name"] == "Timmy Operator"
|
||||
assert packet["service_address"]["city"] == "Concord"
|
||||
assert packet["service_address"]["state"] == "NH"
|
||||
assert packet["status"] == "pending_scheduling_call"
|
||||
assert len(packet["checklist"]) == 8
|
||||
assert packet["checklist"][0]["done"] is False
|
||||
|
||||
|
||||
def test_validate_rejects_missing_contact_name() -> None:
|
||||
data = {
|
||||
"contact": {"name": "", "phone": "555"},
|
||||
"service": {"address": "1 St", "city": "X", "state": "NH"},
|
||||
"checklist": ["do thing"],
|
||||
}
|
||||
try:
|
||||
validate_request(data)
|
||||
except ValueError as exc:
|
||||
assert "contact.name" in str(exc)
|
||||
else:
|
||||
raise AssertionError("should reject empty contact name")
|
||||
|
||||
|
||||
def test_validate_rejects_missing_service_address() -> None:
|
||||
data = {
|
||||
"contact": {"name": "A", "phone": "555"},
|
||||
"service": {"address": "", "city": "X", "state": "NH"},
|
||||
"checklist": ["do thing"],
|
||||
}
|
||||
try:
|
||||
validate_request(data)
|
||||
except ValueError as exc:
|
||||
assert "service.address" in str(exc)
|
||||
else:
|
||||
raise AssertionError("should reject empty service address")
|
||||
|
||||
|
||||
def test_validate_rejects_empty_checklist() -> None:
|
||||
data = {
|
||||
"contact": {"name": "A", "phone": "555"},
|
||||
"service": {"address": "1 St", "city": "X", "state": "NH"},
|
||||
"checklist": [],
|
||||
}
|
||||
try:
|
||||
validate_request(data)
|
||||
except ValueError as exc:
|
||||
assert "checklist" in str(exc)
|
||||
else:
|
||||
raise AssertionError("should reject empty checklist")
|
||||
|
||||
|
||||
def test_render_markdown_contains_key_sections() -> None:
|
||||
data = load_request("docs/nh-broadband-install-request.example.yaml")
|
||||
packet = build_packet(data)
|
||||
md = render_markdown(packet, data)
|
||||
assert "# NH Broadband Install Packet" in md
|
||||
assert "## Contact" in md
|
||||
assert "## Service Address" in md
|
||||
assert "## Call Log" in md
|
||||
assert "## Appointment Checklist" in md
|
||||
assert "Concord" in md
|
||||
assert "NH" in md
|
||||
|
||||
|
||||
def test_render_markdown_shows_checklist_items() -> None:
|
||||
data = load_request("docs/nh-broadband-install-request.example.yaml")
|
||||
packet = build_packet(data)
|
||||
md = render_markdown(packet, data)
|
||||
assert "- [ ] Confirm exact-address availability" in md
|
||||
|
||||
|
||||
def test_example_yaml_is_valid() -> None:
|
||||
data = yaml.safe_load(Path("docs/nh-broadband-install-request.example.yaml").read_text())
|
||||
assert data["contact"]["name"] == "Timmy Operator"
|
||||
assert len(data["checklist"]) == 8
|
||||
236
tests/test_predictive_resource_allocator.py
Normal file
236
tests/test_predictive_resource_allocator.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Tests for predictive resource allocation."""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from predictive_resource_allocator import (
|
||||
_parse_ts,
|
||||
compute_rates,
|
||||
analyze_callers,
|
||||
analyze_heartbeat,
|
||||
predict_demand,
|
||||
determine_posture,
|
||||
forecast,
|
||||
format_markdown,
|
||||
load_jsonl,
|
||||
)
|
||||
|
||||
|
||||
def _write_jsonl(path: Path, rows: list):
|
||||
with open(path, "w") as f:
|
||||
for row in rows:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
|
||||
def _make_metrics(count: int, base_hour: int = 0, caller: str = "heartbeat_tick",
|
||||
prompt_len: int = 1000, success: bool = True) -> list:
|
||||
rows = []
|
||||
for i in range(count):
|
||||
rows.append({
|
||||
"timestamp": f"2026-03-29T{base_hour + i // 60:02d}:{i % 60:02d}:00+00:00",
|
||||
"caller": caller,
|
||||
"prompt_len": prompt_len,
|
||||
"response_len": 50,
|
||||
"success": success,
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
def _make_heartbeat(count: int, base_hour: int = 0,
|
||||
gitea_alive: bool = True, inference_ok: bool = True) -> list:
|
||||
rows = []
|
||||
for i in range(count):
|
||||
rows.append({
|
||||
"timestamp": f"2026-03-29T{base_hour + i:02d}:00:00+00:00",
|
||||
"perception": {
|
||||
"gitea_alive": gitea_alive,
|
||||
"model_health": {"inference_ok": inference_ok},
|
||||
},
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
# ── Timestamp Parsing ────────────────────────────────────────────────────────
|
||||
|
||||
class TestTimestampParsing:
|
||||
def test_z_suffix(self):
|
||||
dt = _parse_ts("2026-03-29T12:00:00Z")
|
||||
assert dt.tzinfo is not None
|
||||
|
||||
def test_explicit_offset(self):
|
||||
dt = _parse_ts("2026-03-29T12:00:00+00:00")
|
||||
assert dt.hour == 12
|
||||
|
||||
def test_ordering(self):
|
||||
earlier = _parse_ts("2026-03-29T10:00:00Z")
|
||||
later = _parse_ts("2026-03-29T12:00:00Z")
|
||||
assert earlier < later
|
||||
|
||||
|
||||
# ── Rate Computation ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestComputeRates:
|
||||
def test_empty_returns_defaults(self):
|
||||
r_rate, b_rate, surge, _, _ = compute_rates([], 6)
|
||||
assert r_rate == 0.0
|
||||
assert surge == 1.0
|
||||
|
||||
def test_surge_detected(self):
|
||||
# 1 baseline req, 20 recent reqs
|
||||
baseline = _make_metrics(1, base_hour=0)
|
||||
recent = _make_metrics(20, base_hour=12)
|
||||
rows = baseline + recent
|
||||
|
||||
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
|
||||
assert surge > 1.0
|
||||
|
||||
def test_no_surge_when_stable(self):
|
||||
# Same rate in both windows
|
||||
early = _make_metrics(6, base_hour=0)
|
||||
late = _make_metrics(6, base_hour=12)
|
||||
rows = early + late
|
||||
|
||||
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
|
||||
assert surge < 1.5
|
||||
|
||||
|
||||
# ── Caller Analysis ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestAnalyzeCallers:
|
||||
def test_empty(self):
|
||||
assert analyze_callers([], 6) == []
|
||||
|
||||
def test_groups_by_caller(self):
|
||||
rows = _make_metrics(3, caller="heartbeat_tick") + _make_metrics(2, caller="know-thy-father", prompt_len=15000)
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
names = [c["caller"] for c in callers]
|
||||
assert "heartbeat_tick" in names
|
||||
assert "know-thy-father" in names
|
||||
|
||||
def test_sorted_by_request_count(self):
|
||||
rows = _make_metrics(1, caller="rare") + _make_metrics(10, caller="frequent")
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
assert callers[0]["caller"] == "frequent"
|
||||
|
||||
def test_failure_rate(self):
|
||||
rows = _make_metrics(10, caller="flaky", success=False)
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
flaky = [c for c in callers if c["caller"] == "flaky"][0]
|
||||
assert flaky["failure_rate"] == 100.0
|
||||
|
||||
|
||||
# ── Heartbeat Analysis ───────────────────────────────────────────────────────
|
||||
|
||||
class TestAnalyzeHeartbeat:
|
||||
def test_empty(self):
|
||||
result = analyze_heartbeat([], 6)
|
||||
assert result["gitea_outages"] == 0
|
||||
|
||||
def test_detects_gitea_outage(self):
|
||||
rows = _make_heartbeat(3, gitea_alive=False)
|
||||
result = analyze_heartbeat(rows, horizon_hours=24)
|
||||
assert result["gitea_outages"] == 3
|
||||
|
||||
def test_detects_inference_failure(self):
|
||||
rows = _make_heartbeat(2, inference_ok=False)
|
||||
result = analyze_heartbeat(rows, horizon_hours=24)
|
||||
assert result["inference_failures"] == 2
|
||||
|
||||
|
||||
# ── Demand Prediction ────────────────────────────────────────────────────────
|
||||
|
||||
class TestPredictDemand:
|
||||
def test_critical_on_extreme_surge(self):
|
||||
result = predict_demand(100.0, 10.0, 10.0, 6)
|
||||
assert result["demand_level"] == "critical"
|
||||
|
||||
def test_elevated_on_moderate_surge(self):
|
||||
result = predict_demand(50.0, 10.0, 2.0, 6)
|
||||
assert result["demand_level"] == "elevated"
|
||||
|
||||
def test_normal_on_slight_increase(self):
|
||||
result = predict_demand(12.0, 10.0, 1.2, 6)
|
||||
assert result["demand_level"] == "normal"
|
||||
|
||||
def test_low_when_decreasing(self):
|
||||
result = predict_demand(5.0, 10.0, 0.5, 6)
|
||||
assert result["demand_level"] == "low"
|
||||
|
||||
|
||||
# ── Posture Determination ────────────────────────────────────────────────────
|
||||
|
||||
class TestDeterminePosture:
|
||||
def test_steady_normal_when_no_issues(self):
|
||||
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert mode == "steady"
|
||||
assert posture == "normal"
|
||||
assert "no surge indicators" in actions[0]
|
||||
|
||||
def test_surge_on_high_factor(self):
|
||||
mode, posture, actions = determine_posture(2.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert mode == "surge"
|
||||
assert any("Pre-warm" in a for a in actions)
|
||||
|
||||
def test_degraded_on_gitea_outage(self):
|
||||
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 3, "inference_failures": 0, "total_checks": 5})
|
||||
assert posture == "degraded"
|
||||
assert any("forge state" in a for a in actions)
|
||||
|
||||
def test_heavy_background_flagged(self):
|
||||
callers = [{"caller": "know-thy-father-batch", "requests": 5, "prompt_tokens": 50000, "failures": 0, "failure_rate": 0}]
|
||||
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert any("Throttle" in a or "background" in a for a in actions)
|
||||
|
||||
def test_failing_callers_flagged(self):
|
||||
callers = [{"caller": "bad_actor", "requests": 10, "prompt_tokens": 1000, "failures": 5, "failure_rate": 50.0}]
|
||||
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert any("failure rate" in a.lower() for a in actions)
|
||||
|
||||
|
||||
# ── Full Forecast ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestForecast:
|
||||
def test_end_to_end(self, tmp_path):
|
||||
metrics_path = tmp_path / "metrics.jsonl"
|
||||
heartbeat_path = tmp_path / "heartbeat.jsonl"
|
||||
|
||||
_write_jsonl(metrics_path, _make_metrics(6, base_hour=0) + _make_metrics(30, base_hour=12))
|
||||
_write_jsonl(heartbeat_path, _make_heartbeat(5, base_hour=8, inference_ok=False))
|
||||
|
||||
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
|
||||
|
||||
assert "resource_mode" in result
|
||||
assert "dispatch_posture" in result
|
||||
assert "surge_factor" in result
|
||||
assert "top_callers" in result
|
||||
assert "recommended_actions" in result
|
||||
assert isinstance(result["top_callers"], list)
|
||||
assert isinstance(result["recommended_actions"], list)
|
||||
|
||||
def test_empty_inputs(self, tmp_path):
|
||||
metrics_path = tmp_path / "empty_m.jsonl"
|
||||
heartbeat_path = tmp_path / "empty_h.jsonl"
|
||||
metrics_path.write_text("")
|
||||
heartbeat_path.write_text("")
|
||||
|
||||
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
|
||||
assert result["resource_mode"] == "steady"
|
||||
assert result["surge_factor"] == 1.0
|
||||
|
||||
|
||||
# ── Markdown Output ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestFormatMarkdown:
|
||||
def test_contains_key_sections(self):
|
||||
fc = forecast([], [], horizon_hours=6)
|
||||
md = format_markdown(fc)
|
||||
assert "Predictive Resource Allocation" in md
|
||||
assert "Demand Metrics" in md
|
||||
assert "Recommended Actions" in md
|
||||
assert "Horizon" in md
|
||||
@@ -17,8 +17,24 @@ from typing import Dict, Any, Optional, List
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import importlib.util
|
||||
|
||||
from harness import UniWizardHarness, House, ExecutionResult
|
||||
|
||||
def _load_local(module_name: str, filename: str):
|
||||
"""Import a module from an explicit file path, bypassing sys.path resolution."""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name,
|
||||
str(Path(__file__).parent / filename),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
_harness = _load_local("v2_harness", "harness.py")
|
||||
UniWizardHarness = _harness.UniWizardHarness
|
||||
House = _harness.House
|
||||
ExecutionResult = _harness.ExecutionResult
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
|
||||
@@ -8,13 +8,30 @@ import time
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
def _load_local(module_name: str, filename: str):
|
||||
"""Import a module from an explicit file path, bypassing sys.path resolution.
|
||||
|
||||
Prevents namespace collisions when multiple directories contain modules
|
||||
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
|
||||
"""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name,
|
||||
str(Path(__file__).parent / filename),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
_harness = _load_local("v2_harness", "harness.py")
|
||||
UniWizardHarness = _harness.UniWizardHarness
|
||||
House = _harness.House
|
||||
ExecutionResult = _harness.ExecutionResult
|
||||
|
||||
from harness import UniWizardHarness, House, ExecutionResult
|
||||
from router import HouseRouter, TaskType
|
||||
from author_whitelist import AuthorWhitelist
|
||||
|
||||
|
||||
Reference in New Issue
Block a user