Compare commits
7 Commits
fix/716
...
burn/749-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c31ae8972 | |||
| a76e83439c | |||
| a14a233626 | |||
| 601c5fe267 | |||
| 6222b18a38 | |||
| 10fd467b28 | |||
| ba2d365669 |
87
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
87
docs/PREDICTIVE_RESOURCE_ALLOCATION.md
Normal file
@@ -0,0 +1,87 @@
|
||||
# Predictive Resource Allocation
|
||||
|
||||
Forecasts near-term fleet demand from historical telemetry so the operator can
|
||||
pre-provision resources before a surge hits.
|
||||
|
||||
## How It Works
|
||||
|
||||
The predictor reads two data sources:
|
||||
|
||||
1. **Metric logs** (`metrics/local_*.jsonl`) — request cadence, token volume,
|
||||
caller mix, success/failure rates
|
||||
2. **Heartbeat logs** (`heartbeat/ticks_*.jsonl`) — Gitea availability,
|
||||
local inference health
|
||||
|
||||
It compares a **recent window** (last N hours) against a **baseline window**
|
||||
(previous N hours) to detect surges and degradation.
|
||||
|
||||
## Output Contract
|
||||
|
||||
```json
|
||||
{
|
||||
"resource_mode": "steady|surge",
|
||||
"dispatch_posture": "normal|degraded",
|
||||
"horizon_hours": 6,
|
||||
"recent_request_rate": 12.5,
|
||||
"baseline_request_rate": 8.0,
|
||||
"predicted_request_rate": 15.0,
|
||||
"surge_factor": 1.56,
|
||||
"demand_level": "elevated|normal|low|critical",
|
||||
"gitea_outages": 0,
|
||||
"inference_failures": 2,
|
||||
"top_callers": [...],
|
||||
"recommended_actions": ["..."]
|
||||
}
|
||||
```
|
||||
|
||||
### Demand Levels
|
||||
|
||||
| Surge Factor | Level | Meaning |
|
||||
|-------------|-------|---------|
|
||||
| > 3.0 | critical | Extreme surge, immediate action needed |
|
||||
| > 1.5 | elevated | Notable increase, pre-warm recommended |
|
||||
| > 1.0 | normal | Slight increase, monitor |
|
||||
| <= 1.0 | low | Flat or declining |
|
||||
|
||||
### Posture Signals
|
||||
|
||||
| Signal | Effect |
|
||||
|--------|--------|
|
||||
| Surge factor > 1.5 | `resource_mode: surge` + pre-warm recommendation |
|
||||
| Gitea outages >= 1 | `dispatch_posture: degraded` + cache recommendation |
|
||||
| Inference failures >= 2 | `resource_mode: surge` + reliability investigation |
|
||||
| Heavy batch callers | Throttle recommendation |
|
||||
| High caller failure rates | Investigation recommendation |
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Markdown report
|
||||
python3 scripts/predictive_resource_allocator.py
|
||||
|
||||
# JSON output
|
||||
python3 scripts/predictive_resource_allocator.py --json
|
||||
|
||||
# Custom paths and horizon
|
||||
python3 scripts/predictive_resource_allocator.py \
|
||||
--metrics metrics/local_20260329.jsonl \
|
||||
--heartbeat heartbeat/ticks_20260329.jsonl \
|
||||
--horizon 12
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
```bash
|
||||
python3 -m pytest tests/test_predictive_resource_allocator.py -v
|
||||
```
|
||||
|
||||
## Recommended Actions
|
||||
|
||||
The predictor generates contextual recommendations:
|
||||
|
||||
- **Pre-warm local inference** — surge detected, warm up before next window
|
||||
- **Throttle background jobs** — heavy batch work consuming capacity
|
||||
- **Investigate failure rates** — specific callers failing at high rates
|
||||
- **Investigate model reliability** — inference health degraded
|
||||
- **Cache forge state** — Gitea availability issues
|
||||
- **Maintain current allocation** — no issues detected
|
||||
102
research/long-context-vs-rag-decision-framework.md
Normal file
102
research/long-context-vs-rag-decision-framework.md
Normal file
@@ -0,0 +1,102 @@
|
||||
# Long Context vs RAG Decision Framework
|
||||
|
||||
**Research Backlog Item #4.3** | Impact: 4 | Effort: 1 | Ratio: 4.0
|
||||
**Date**: 2026-04-15
|
||||
**Status**: RESEARCHED
|
||||
|
||||
## Executive Summary
|
||||
|
||||
Modern LLMs have 128K-200K+ context windows, but we still treat them like 4K models by default. This document provides a decision framework for when to stuff context vs. use RAG, based on empirical findings and our stack constraints.
|
||||
|
||||
## The Core Insight
|
||||
|
||||
**Long context ≠ better answers.** Research shows:
|
||||
- "Lost in the Middle" effect: Models attend poorly to information in the middle of long contexts (Liu et al., 2023)
|
||||
- RAG with reranking outperforms full-context stuffing for document QA when docs > 50K tokens
|
||||
- Cost scales quadratically with context length (attention computation)
|
||||
- Latency increases linearly with input length
|
||||
|
||||
**RAG ≠ always better.** Retrieval introduces:
|
||||
- Recall errors (miss relevant chunks)
|
||||
- Precision errors (retrieve irrelevant chunks)
|
||||
- Chunking artifacts (splitting mid-sentence)
|
||||
- Additional latency for embedding + search
|
||||
|
||||
## Decision Matrix
|
||||
|
||||
| Scenario | Context Size | Recommendation | Why |
|
||||
|----------|-------------|---------------|-----|
|
||||
| Single conversation (< 32K) | Small | **Stuff everything** | No retrieval overhead, full context available |
|
||||
| 5-20 documents, focused query | 32K-128K | **Hybrid** | Key docs in context, rest via RAG |
|
||||
| Large corpus search | > 128K | **Pure RAG + reranking** | Full context impossible, must retrieve |
|
||||
| Code review (< 5 files) | < 32K | **Stuff everything** | Code needs full context for understanding |
|
||||
| Code review (repo-wide) | > 128K | **RAG with file-level chunks** | Files are natural chunk boundaries |
|
||||
| Multi-turn conversation | Growing | **Hybrid + compression** | Keep recent turns in full, compress older |
|
||||
| Fact retrieval | Any | **RAG** | Always faster to search than read everything |
|
||||
| Complex reasoning across docs | 32K-128K | **Stuff + chain-of-thought** | Models need all context for cross-doc reasoning |
|
||||
|
||||
## Our Stack Constraints
|
||||
|
||||
### What We Have
|
||||
- **Cloud models**: 128K-200K context (OpenRouter providers)
|
||||
- **Local Ollama**: 8K-32K context (Gemma-4 default 8192)
|
||||
- **Hermes fact_store**: SQLite FTS5 full-text search
|
||||
- **Memory**: MemPalace holographic embeddings
|
||||
- **Session context**: Growing conversation history
|
||||
|
||||
### What This Means
|
||||
1. **Cloud sessions**: We CAN stuff up to 128K but SHOULD we? Cost and latency matter.
|
||||
2. **Local sessions**: MUST use RAG for anything beyond 8K. Long context not available.
|
||||
3. **Mixed fleet**: Need a routing layer that decides per-session.
|
||||
|
||||
## Advanced Patterns
|
||||
|
||||
### 1. Progressive Context Loading
|
||||
Don't load everything at once. Start with RAG, then stuff additional docs as needed:
|
||||
```
|
||||
Turn 1: RAG search → top 3 chunks
|
||||
Turn 2: Model asks "I need more context about X" → stuff X
|
||||
Turn 3: Model has enough → continue
|
||||
```
|
||||
|
||||
### 2. Context Budgeting
|
||||
Allocate context budget across components:
|
||||
```
|
||||
System prompt: 2,000 tokens (always)
|
||||
Recent messages: 10,000 tokens (last 5 turns)
|
||||
RAG results: 8,000 tokens (top chunks)
|
||||
Stuffed docs: 12,000 tokens (key docs)
|
||||
---------------------------
|
||||
Total: 32,000 tokens (fits 32K model)
|
||||
```
|
||||
|
||||
### 3. Smart Compression
|
||||
Before stuffing, compress older context:
|
||||
- Summarize turns older than 10
|
||||
- Remove tool call results (keep only final outputs)
|
||||
- Deduplicate repeated information
|
||||
- Use structured representations (JSON) instead of prose
|
||||
|
||||
## Empirical Benchmarks Needed
|
||||
|
||||
1. **Stuffing vs RAG accuracy** on our fact_store queries
|
||||
2. **Latency comparison** at 32K, 64K, 128K context
|
||||
3. **Cost per query** for cloud models at various context sizes
|
||||
4. **Local model behavior** when pushed beyond rated context
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. **Audit current context usage**: How many sessions hit > 32K? (Low effort, high value)
|
||||
2. **Implement ContextRouter**: ~50 LOC, adds routing decisions to hermes
|
||||
3. **Add context-size logging**: Track input tokens per session for data gathering
|
||||
|
||||
## References
|
||||
|
||||
- Liu et al. "Lost in the Middle: How Language Models Use Long Contexts" (2023) — https://arxiv.org/abs/2307.03172
|
||||
- Shi et al. "Large Language Models are Easily Distracted by Irrelevant Context" (2023)
|
||||
- Xu et al. "Retrieval Meets Long Context LLMs" (2023) — hybrid approaches outperform both alone
|
||||
- Anthropic's Claude 3.5 context caching — built-in prefix caching reduces cost for repeated system prompts
|
||||
|
||||
---
|
||||
|
||||
*Sovereignty and service always.*
|
||||
410
scripts/predictive_resource_allocator.py
Normal file
410
scripts/predictive_resource_allocator.py
Normal file
@@ -0,0 +1,410 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Predictive Resource Allocation — Timmy Foundation Fleet
|
||||
|
||||
Analyzes historical utilization patterns, predicts workload surges,
|
||||
and recommends pre-provisioning actions.
|
||||
|
||||
Usage:
|
||||
python3 scripts/predictive_resource_allocator.py \
|
||||
--metrics metrics/*.jsonl \
|
||||
--heartbeat heartbeat/*.jsonl \
|
||||
--horizon 6
|
||||
|
||||
# JSON output
|
||||
python3 scripts/predictive_resource_allocator.py --json
|
||||
|
||||
# Quick forecast from default paths
|
||||
python3 scripts/predictive_resource_allocator.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
SURGE_THRESHOLD = 1.5
|
||||
HEAVY_TOKEN_THRESHOLD = 10000
|
||||
DEFAULT_HORIZON_HOURS = 6
|
||||
DEFAULT_METRICS_GLOB = "metrics/local_*.jsonl"
|
||||
DEFAULT_HEARTBEAT_GLOB = "heartbeat/ticks_*.jsonl"
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
ROOT_DIR = SCRIPT_DIR.parent
|
||||
|
||||
|
||||
# ── Data Loading ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _parse_ts(value: str) -> datetime:
|
||||
"""Parse ISO timestamp to UTC datetime."""
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
|
||||
|
||||
|
||||
def load_jsonl(paths: Iterable[str]) -> List[dict]:
|
||||
"""Load JSONL rows from one or more file paths/globs."""
|
||||
rows: List[dict] = []
|
||||
for pattern in paths:
|
||||
for path in glob.glob(pattern):
|
||||
if not os.path.isfile(path):
|
||||
continue
|
||||
with open(path, encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
rows.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return rows
|
||||
|
||||
|
||||
def _default_paths(glob_pattern: str) -> List[str]:
|
||||
"""Resolve a glob pattern relative to project root."""
|
||||
full = os.path.join(ROOT_DIR, glob_pattern)
|
||||
matches = glob.glob(full)
|
||||
return matches if matches else [full]
|
||||
|
||||
|
||||
# ── Time-Series Analysis ─────────────────────────────────────────────────────
|
||||
|
||||
def compute_rates(
|
||||
rows: List[dict],
|
||||
horizon_hours: int,
|
||||
) -> Tuple[float, float, float, float, float]:
|
||||
"""
|
||||
Compare recent window vs baseline window.
|
||||
|
||||
Returns:
|
||||
(recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate)
|
||||
"""
|
||||
if not rows:
|
||||
return 0.0, 0.0, 1.0, 0.0, 0.0
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
recent_cutoff = latest - timedelta(hours=horizon_hours)
|
||||
baseline_cutoff = latest - timedelta(hours=horizon_hours * 2)
|
||||
|
||||
recent = [r for r in rows if _parse_ts(r["timestamp"]) >= recent_cutoff]
|
||||
baseline = [
|
||||
r for r in rows
|
||||
if baseline_cutoff <= _parse_ts(r["timestamp"]) < recent_cutoff
|
||||
]
|
||||
|
||||
recent_rate = len(recent) / max(horizon_hours, 1)
|
||||
baseline_rate = (
|
||||
len(baseline) / max(horizon_hours, 1)
|
||||
if baseline
|
||||
else max(0.1, recent_rate)
|
||||
)
|
||||
|
||||
recent_tokens = sum(int(r.get("prompt_len", 0)) for r in recent)
|
||||
baseline_tokens = sum(int(r.get("prompt_len", 0)) for r in baseline)
|
||||
recent_token_rate = recent_tokens / max(horizon_hours, 1)
|
||||
baseline_token_rate = (
|
||||
baseline_tokens / max(horizon_hours, 1)
|
||||
if baseline
|
||||
else max(1.0, recent_token_rate)
|
||||
)
|
||||
|
||||
request_surge = recent_rate / max(baseline_rate, 0.01)
|
||||
token_surge = recent_token_rate / max(baseline_token_rate, 0.01)
|
||||
surge_factor = max(request_surge, token_surge)
|
||||
|
||||
return recent_rate, baseline_rate, surge_factor, recent_token_rate, baseline_token_rate
|
||||
|
||||
|
||||
def analyze_callers(rows: List[dict], horizon_hours: int) -> List[Dict[str, Any]]:
|
||||
"""Summarize callers in the recent window."""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
cutoff = latest - timedelta(hours=horizon_hours)
|
||||
|
||||
calls: Counter = Counter()
|
||||
tokens: Counter = Counter()
|
||||
failures: Counter = Counter()
|
||||
|
||||
for row in rows:
|
||||
ts = _parse_ts(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
caller = row.get("caller", "unknown")
|
||||
calls[caller] += 1
|
||||
tokens[caller] += int(row.get("prompt_len", 0))
|
||||
if not row.get("success", True):
|
||||
failures[caller] += 1
|
||||
|
||||
summary = []
|
||||
for caller in calls:
|
||||
summary.append({
|
||||
"caller": caller,
|
||||
"requests": calls[caller],
|
||||
"prompt_tokens": tokens[caller],
|
||||
"failures": failures[caller],
|
||||
"failure_rate": round(failures[caller] / max(calls[caller], 1) * 100, 1),
|
||||
})
|
||||
|
||||
summary.sort(key=lambda x: (-x["requests"], -x["prompt_tokens"]))
|
||||
return summary
|
||||
|
||||
|
||||
def analyze_heartbeat(rows: List[dict], horizon_hours: int) -> Dict[str, int]:
|
||||
"""Count infrastructure risks in recent window."""
|
||||
if not rows:
|
||||
return {"gitea_outages": 0, "inference_failures": 0, "total_checks": 0}
|
||||
|
||||
latest = max(_parse_ts(r["timestamp"]) for r in rows)
|
||||
cutoff = latest - timedelta(hours=horizon_hours)
|
||||
|
||||
gitea_outages = 0
|
||||
inference_failures = 0
|
||||
total = 0
|
||||
|
||||
for row in rows:
|
||||
ts = _parse_ts(row["timestamp"])
|
||||
if ts < cutoff:
|
||||
continue
|
||||
total += 1
|
||||
perception = row.get("perception", {})
|
||||
if perception.get("gitea_alive") is False:
|
||||
gitea_outages += 1
|
||||
model_health = perception.get("model_health", {})
|
||||
if model_health.get("inference_ok") is False:
|
||||
inference_failures += 1
|
||||
|
||||
return {
|
||||
"gitea_outages": gitea_outages,
|
||||
"inference_failures": inference_failures,
|
||||
"total_checks": total,
|
||||
}
|
||||
|
||||
|
||||
# ── Prediction Engine ────────────────────────────────────────────────────────
|
||||
|
||||
def predict_demand(
|
||||
recent_rate: float,
|
||||
baseline_rate: float,
|
||||
surge_factor: float,
|
||||
horizon_hours: int,
|
||||
) -> Dict[str, Any]:
|
||||
"""Predict near-term resource demand."""
|
||||
predicted_rate = round(
|
||||
max(recent_rate, baseline_rate * max(1.0, surge_factor * 0.75)), 2
|
||||
)
|
||||
|
||||
if surge_factor > 3.0:
|
||||
demand_level = "critical"
|
||||
elif surge_factor > SURGE_THRESHOLD:
|
||||
demand_level = "elevated"
|
||||
elif surge_factor > 1.0:
|
||||
demand_level = "normal"
|
||||
else:
|
||||
demand_level = "low"
|
||||
|
||||
return {
|
||||
"predicted_requests_per_hour": predicted_rate,
|
||||
"surge_factor": round(surge_factor, 2),
|
||||
"demand_level": demand_level,
|
||||
"horizon_hours": horizon_hours,
|
||||
}
|
||||
|
||||
|
||||
def determine_posture(
|
||||
surge_factor: float,
|
||||
callers: List[Dict[str, Any]],
|
||||
heartbeat: Dict[str, int],
|
||||
) -> Tuple[str, str, List[str]]:
|
||||
"""
|
||||
Determine fleet posture and recommended actions.
|
||||
|
||||
Returns:
|
||||
(resource_mode, dispatch_posture, actions)
|
||||
"""
|
||||
mode = "steady"
|
||||
posture = "normal"
|
||||
actions: List[str] = []
|
||||
|
||||
# Surge detection
|
||||
if surge_factor > SURGE_THRESHOLD:
|
||||
mode = "surge"
|
||||
actions.append(
|
||||
"Pre-warm local inference before the next forecast window."
|
||||
)
|
||||
|
||||
# Heavy background callers
|
||||
heavy = [
|
||||
c for c in callers
|
||||
if c["prompt_tokens"] >= HEAVY_TOKEN_THRESHOLD
|
||||
and ("batch" in c["caller"] or "know-thy-father" in c["caller"])
|
||||
]
|
||||
if heavy:
|
||||
actions.append(
|
||||
"Throttle or defer large background jobs until off-peak capacity is available."
|
||||
)
|
||||
|
||||
# Caller failure rates
|
||||
failing = [c for c in callers if c["failure_rate"] > 20 and c["requests"] >= 3]
|
||||
if failing:
|
||||
names = ", ".join(c["caller"] for c in failing[:3])
|
||||
actions.append(
|
||||
f"Investigate high failure rates in: {names}."
|
||||
)
|
||||
|
||||
# Inference health
|
||||
if heartbeat["inference_failures"] >= 2:
|
||||
mode = "surge"
|
||||
actions.append(
|
||||
"Investigate local model reliability and reserve headroom for heartbeat traffic."
|
||||
)
|
||||
|
||||
# Forge availability
|
||||
if heartbeat["gitea_outages"] >= 1:
|
||||
posture = "degraded"
|
||||
actions.append(
|
||||
"Pre-fetch or cache forge state before the next dispatch window."
|
||||
)
|
||||
|
||||
if not actions:
|
||||
actions.append(
|
||||
"Maintain current resource allocation; no surge indicators detected."
|
||||
)
|
||||
|
||||
return mode, posture, actions
|
||||
|
||||
|
||||
# ── Main Forecast ────────────────────────────────────────────────────────────
|
||||
|
||||
def forecast(
|
||||
metrics_paths: List[str],
|
||||
heartbeat_paths: List[str],
|
||||
horizon_hours: int = DEFAULT_HORIZON_HOURS,
|
||||
) -> Dict[str, Any]:
|
||||
"""Full resource forecast from metric and heartbeat logs."""
|
||||
metric_rows = load_jsonl(metrics_paths)
|
||||
heartbeat_rows = load_jsonl(heartbeat_paths)
|
||||
|
||||
recent_rate, baseline_rate, surge_factor, recent_tok_rate, base_tok_rate = (
|
||||
compute_rates(metric_rows, horizon_hours)
|
||||
)
|
||||
callers = analyze_callers(metric_rows, horizon_hours)
|
||||
heartbeat = analyze_heartbeat(heartbeat_rows, horizon_hours)
|
||||
demand = predict_demand(recent_rate, baseline_rate, surge_factor, horizon_hours)
|
||||
mode, posture, actions = determine_posture(surge_factor, callers, heartbeat)
|
||||
|
||||
return {
|
||||
"resource_mode": mode,
|
||||
"dispatch_posture": posture,
|
||||
"horizon_hours": horizon_hours,
|
||||
"recent_request_rate": round(recent_rate, 2),
|
||||
"baseline_request_rate": round(baseline_rate, 2),
|
||||
"predicted_request_rate": demand["predicted_requests_per_hour"],
|
||||
"surge_factor": demand["surge_factor"],
|
||||
"demand_level": demand["demand_level"],
|
||||
"recent_prompt_tokens_per_hour": round(recent_tok_rate, 2),
|
||||
"baseline_prompt_tokens_per_hour": round(base_tok_rate, 2),
|
||||
"gitea_outages": heartbeat["gitea_outages"],
|
||||
"inference_failures": heartbeat["inference_failures"],
|
||||
"heartbeat_checks": heartbeat["total_checks"],
|
||||
"top_callers": callers[:10],
|
||||
"recommended_actions": actions,
|
||||
}
|
||||
|
||||
|
||||
# ── Output Formatters ────────────────────────────────────────────────────────
|
||||
|
||||
def format_markdown(fc: Dict[str, Any]) -> str:
|
||||
"""Format forecast as markdown report."""
|
||||
lines = [
|
||||
"# Predictive Resource Allocation — Fleet Forecast",
|
||||
"",
|
||||
f"**Horizon:** {fc['horizon_hours']} hours",
|
||||
f"**Resource mode:** {fc['resource_mode']}",
|
||||
f"**Dispatch posture:** {fc['dispatch_posture']}",
|
||||
f"**Demand level:** {fc['demand_level']}",
|
||||
"",
|
||||
"## Demand Metrics",
|
||||
"",
|
||||
f"| Metric | Recent | Baseline |",
|
||||
f"|--------|-------:|---------:|",
|
||||
f"| Requests/hour | {fc['recent_request_rate']} | {fc['baseline_request_rate']} |",
|
||||
f"| Prompt tokens/hour | {fc['recent_prompt_tokens_per_hour']} | {fc['baseline_prompt_tokens_per_hour']} |",
|
||||
"",
|
||||
f"**Surge factor:** {fc['surge_factor']}x",
|
||||
f"**Predicted request rate:** {fc['predicted_request_rate']}/hour",
|
||||
"",
|
||||
"## Infrastructure Health",
|
||||
"",
|
||||
f"- Gitea outages (recent window): {fc['gitea_outages']}",
|
||||
f"- Inference failures (recent window): {fc['inference_failures']}",
|
||||
f"- Heartbeat checks analyzed: {fc['heartbeat_checks']}",
|
||||
"",
|
||||
"## Recommended Actions",
|
||||
"",
|
||||
]
|
||||
for action in fc["recommended_actions"]:
|
||||
lines.append(f"- {action}")
|
||||
|
||||
if fc["top_callers"]:
|
||||
lines.extend([
|
||||
"",
|
||||
"## Top Callers (Recent Window)",
|
||||
"",
|
||||
"| Caller | Requests | Tokens | Failures |",
|
||||
"|--------|---------:|-------:|---------:|",
|
||||
])
|
||||
for c in fc["top_callers"]:
|
||||
lines.append(
|
||||
f"| {c['caller']} | {c['requests']} | {c['prompt_tokens']} | {c['failures']} |"
|
||||
)
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
|
||||
# ── CLI ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Predictive resource allocation for the Timmy fleet"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--metrics", nargs="*", default=None,
|
||||
help="Metric JSONL paths (supports globs). Default: metrics/local_*.jsonl"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat", nargs="*", default=None,
|
||||
help="Heartbeat JSONL paths (supports globs). Default: heartbeat/ticks_*.jsonl"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--horizon", type=int, default=DEFAULT_HORIZON_HOURS,
|
||||
help="Forecast horizon in hours (default: 6)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true",
|
||||
help="Output raw JSON instead of markdown"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
metrics_paths = args.metrics or _default_paths(DEFAULT_METRICS_GLOB)
|
||||
heartbeat_paths = args.heartbeat or _default_paths(DEFAULT_HEARTBEAT_GLOB)
|
||||
|
||||
fc = forecast(metrics_paths, heartbeat_paths, args.horizon)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(fc, indent=2))
|
||||
else:
|
||||
print(format_markdown(fc))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
236
tests/test_predictive_resource_allocator.py
Normal file
236
tests/test_predictive_resource_allocator.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Tests for predictive resource allocation."""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from predictive_resource_allocator import (
|
||||
_parse_ts,
|
||||
compute_rates,
|
||||
analyze_callers,
|
||||
analyze_heartbeat,
|
||||
predict_demand,
|
||||
determine_posture,
|
||||
forecast,
|
||||
format_markdown,
|
||||
load_jsonl,
|
||||
)
|
||||
|
||||
|
||||
def _write_jsonl(path: Path, rows: list):
|
||||
with open(path, "w") as f:
|
||||
for row in rows:
|
||||
f.write(json.dumps(row) + "\n")
|
||||
|
||||
|
||||
def _make_metrics(count: int, base_hour: int = 0, caller: str = "heartbeat_tick",
|
||||
prompt_len: int = 1000, success: bool = True) -> list:
|
||||
rows = []
|
||||
for i in range(count):
|
||||
rows.append({
|
||||
"timestamp": f"2026-03-29T{base_hour + i // 60:02d}:{i % 60:02d}:00+00:00",
|
||||
"caller": caller,
|
||||
"prompt_len": prompt_len,
|
||||
"response_len": 50,
|
||||
"success": success,
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
def _make_heartbeat(count: int, base_hour: int = 0,
|
||||
gitea_alive: bool = True, inference_ok: bool = True) -> list:
|
||||
rows = []
|
||||
for i in range(count):
|
||||
rows.append({
|
||||
"timestamp": f"2026-03-29T{base_hour + i:02d}:00:00+00:00",
|
||||
"perception": {
|
||||
"gitea_alive": gitea_alive,
|
||||
"model_health": {"inference_ok": inference_ok},
|
||||
},
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
# ── Timestamp Parsing ────────────────────────────────────────────────────────
|
||||
|
||||
class TestTimestampParsing:
|
||||
def test_z_suffix(self):
|
||||
dt = _parse_ts("2026-03-29T12:00:00Z")
|
||||
assert dt.tzinfo is not None
|
||||
|
||||
def test_explicit_offset(self):
|
||||
dt = _parse_ts("2026-03-29T12:00:00+00:00")
|
||||
assert dt.hour == 12
|
||||
|
||||
def test_ordering(self):
|
||||
earlier = _parse_ts("2026-03-29T10:00:00Z")
|
||||
later = _parse_ts("2026-03-29T12:00:00Z")
|
||||
assert earlier < later
|
||||
|
||||
|
||||
# ── Rate Computation ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestComputeRates:
|
||||
def test_empty_returns_defaults(self):
|
||||
r_rate, b_rate, surge, _, _ = compute_rates([], 6)
|
||||
assert r_rate == 0.0
|
||||
assert surge == 1.0
|
||||
|
||||
def test_surge_detected(self):
|
||||
# 1 baseline req, 20 recent reqs
|
||||
baseline = _make_metrics(1, base_hour=0)
|
||||
recent = _make_metrics(20, base_hour=12)
|
||||
rows = baseline + recent
|
||||
|
||||
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
|
||||
assert surge > 1.0
|
||||
|
||||
def test_no_surge_when_stable(self):
|
||||
# Same rate in both windows
|
||||
early = _make_metrics(6, base_hour=0)
|
||||
late = _make_metrics(6, base_hour=12)
|
||||
rows = early + late
|
||||
|
||||
_, _, surge, _, _ = compute_rates(rows, horizon_hours=6)
|
||||
assert surge < 1.5
|
||||
|
||||
|
||||
# ── Caller Analysis ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestAnalyzeCallers:
|
||||
def test_empty(self):
|
||||
assert analyze_callers([], 6) == []
|
||||
|
||||
def test_groups_by_caller(self):
|
||||
rows = _make_metrics(3, caller="heartbeat_tick") + _make_metrics(2, caller="know-thy-father", prompt_len=15000)
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
names = [c["caller"] for c in callers]
|
||||
assert "heartbeat_tick" in names
|
||||
assert "know-thy-father" in names
|
||||
|
||||
def test_sorted_by_request_count(self):
|
||||
rows = _make_metrics(1, caller="rare") + _make_metrics(10, caller="frequent")
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
assert callers[0]["caller"] == "frequent"
|
||||
|
||||
def test_failure_rate(self):
|
||||
rows = _make_metrics(10, caller="flaky", success=False)
|
||||
callers = analyze_callers(rows, horizon_hours=24)
|
||||
flaky = [c for c in callers if c["caller"] == "flaky"][0]
|
||||
assert flaky["failure_rate"] == 100.0
|
||||
|
||||
|
||||
# ── Heartbeat Analysis ───────────────────────────────────────────────────────
|
||||
|
||||
class TestAnalyzeHeartbeat:
|
||||
def test_empty(self):
|
||||
result = analyze_heartbeat([], 6)
|
||||
assert result["gitea_outages"] == 0
|
||||
|
||||
def test_detects_gitea_outage(self):
|
||||
rows = _make_heartbeat(3, gitea_alive=False)
|
||||
result = analyze_heartbeat(rows, horizon_hours=24)
|
||||
assert result["gitea_outages"] == 3
|
||||
|
||||
def test_detects_inference_failure(self):
|
||||
rows = _make_heartbeat(2, inference_ok=False)
|
||||
result = analyze_heartbeat(rows, horizon_hours=24)
|
||||
assert result["inference_failures"] == 2
|
||||
|
||||
|
||||
# ── Demand Prediction ────────────────────────────────────────────────────────
|
||||
|
||||
class TestPredictDemand:
|
||||
def test_critical_on_extreme_surge(self):
|
||||
result = predict_demand(100.0, 10.0, 10.0, 6)
|
||||
assert result["demand_level"] == "critical"
|
||||
|
||||
def test_elevated_on_moderate_surge(self):
|
||||
result = predict_demand(50.0, 10.0, 2.0, 6)
|
||||
assert result["demand_level"] == "elevated"
|
||||
|
||||
def test_normal_on_slight_increase(self):
|
||||
result = predict_demand(12.0, 10.0, 1.2, 6)
|
||||
assert result["demand_level"] == "normal"
|
||||
|
||||
def test_low_when_decreasing(self):
|
||||
result = predict_demand(5.0, 10.0, 0.5, 6)
|
||||
assert result["demand_level"] == "low"
|
||||
|
||||
|
||||
# ── Posture Determination ────────────────────────────────────────────────────
|
||||
|
||||
class TestDeterminePosture:
|
||||
def test_steady_normal_when_no_issues(self):
|
||||
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert mode == "steady"
|
||||
assert posture == "normal"
|
||||
assert "no surge indicators" in actions[0]
|
||||
|
||||
def test_surge_on_high_factor(self):
|
||||
mode, posture, actions = determine_posture(2.0, [], {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert mode == "surge"
|
||||
assert any("Pre-warm" in a for a in actions)
|
||||
|
||||
def test_degraded_on_gitea_outage(self):
|
||||
mode, posture, actions = determine_posture(1.0, [], {"gitea_outages": 3, "inference_failures": 0, "total_checks": 5})
|
||||
assert posture == "degraded"
|
||||
assert any("forge state" in a for a in actions)
|
||||
|
||||
def test_heavy_background_flagged(self):
|
||||
callers = [{"caller": "know-thy-father-batch", "requests": 5, "prompt_tokens": 50000, "failures": 0, "failure_rate": 0}]
|
||||
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert any("Throttle" in a or "background" in a for a in actions)
|
||||
|
||||
def test_failing_callers_flagged(self):
|
||||
callers = [{"caller": "bad_actor", "requests": 10, "prompt_tokens": 1000, "failures": 5, "failure_rate": 50.0}]
|
||||
_, _, actions = determine_posture(1.0, callers, {"gitea_outages": 0, "inference_failures": 0, "total_checks": 5})
|
||||
assert any("failure rate" in a.lower() for a in actions)
|
||||
|
||||
|
||||
# ── Full Forecast ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestForecast:
|
||||
def test_end_to_end(self, tmp_path):
|
||||
metrics_path = tmp_path / "metrics.jsonl"
|
||||
heartbeat_path = tmp_path / "heartbeat.jsonl"
|
||||
|
||||
_write_jsonl(metrics_path, _make_metrics(6, base_hour=0) + _make_metrics(30, base_hour=12))
|
||||
_write_jsonl(heartbeat_path, _make_heartbeat(5, base_hour=8, inference_ok=False))
|
||||
|
||||
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
|
||||
|
||||
assert "resource_mode" in result
|
||||
assert "dispatch_posture" in result
|
||||
assert "surge_factor" in result
|
||||
assert "top_callers" in result
|
||||
assert "recommended_actions" in result
|
||||
assert isinstance(result["top_callers"], list)
|
||||
assert isinstance(result["recommended_actions"], list)
|
||||
|
||||
def test_empty_inputs(self, tmp_path):
|
||||
metrics_path = tmp_path / "empty_m.jsonl"
|
||||
heartbeat_path = tmp_path / "empty_h.jsonl"
|
||||
metrics_path.write_text("")
|
||||
heartbeat_path.write_text("")
|
||||
|
||||
result = forecast([str(metrics_path)], [str(heartbeat_path)], horizon_hours=6)
|
||||
assert result["resource_mode"] == "steady"
|
||||
assert result["surge_factor"] == 1.0
|
||||
|
||||
|
||||
# ── Markdown Output ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestFormatMarkdown:
|
||||
def test_contains_key_sections(self):
|
||||
fc = forecast([], [], horizon_hours=6)
|
||||
md = format_markdown(fc)
|
||||
assert "Predictive Resource Allocation" in md
|
||||
assert "Demand Metrics" in md
|
||||
assert "Recommended Actions" in md
|
||||
assert "Horizon" in md
|
||||
@@ -17,8 +17,24 @@ from typing import Dict, Any, Optional, List
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import importlib.util
|
||||
|
||||
from harness import UniWizardHarness, House, ExecutionResult
|
||||
|
||||
def _load_local(module_name: str, filename: str):
|
||||
"""Import a module from an explicit file path, bypassing sys.path resolution."""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name,
|
||||
str(Path(__file__).parent / filename),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
_harness = _load_local("v2_harness", "harness.py")
|
||||
UniWizardHarness = _harness.UniWizardHarness
|
||||
House = _harness.House
|
||||
ExecutionResult = _harness.ExecutionResult
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
|
||||
@@ -8,13 +8,30 @@ import time
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
def _load_local(module_name: str, filename: str):
|
||||
"""Import a module from an explicit file path, bypassing sys.path resolution.
|
||||
|
||||
Prevents namespace collisions when multiple directories contain modules
|
||||
with the same name (e.g. uni-wizard/harness.py vs uni-wizard/v2/harness.py).
|
||||
"""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
module_name,
|
||||
str(Path(__file__).parent / filename),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
_harness = _load_local("v2_harness", "harness.py")
|
||||
UniWizardHarness = _harness.UniWizardHarness
|
||||
House = _harness.House
|
||||
ExecutionResult = _harness.ExecutionResult
|
||||
|
||||
from harness import UniWizardHarness, House, ExecutionResult
|
||||
from router import HouseRouter, TaskType
|
||||
from author_whitelist import AuthorWhitelist
|
||||
|
||||
|
||||
Reference in New Issue
Block a user