forked from Rockachopa/Timmy-time-dashboard
372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""Energy Budget Monitor — estimates GPU/CPU power draw during LLM inference.
|
||
|
||
Tracks estimated power consumption to optimize for "metabolic efficiency".
|
||
Three estimation strategies attempted in priority order:
|
||
|
||
1. Battery discharge via ioreg (macOS — works without sudo, on-battery only)
|
||
2. CPU utilisation proxy via sysctl hw.cpufrequency + top
|
||
3. Model-size heuristic (tokens/s × model_size_gb × 2W/GB estimate)
|
||
|
||
Energy Efficiency score (0–10):
|
||
efficiency = tokens_per_second / estimated_watts, normalised to 0–10.
|
||
|
||
Low Power Mode:
|
||
Activated manually or automatically when draw exceeds the configured
|
||
threshold. In low power mode the cascade router is advised to prefer the
|
||
configured low_power_model (e.g. qwen3:1b or similar compact model).
|
||
|
||
Refs: #1009
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import subprocess
|
||
import time
|
||
from collections import deque
|
||
from dataclasses import dataclass, field
|
||
from datetime import UTC, datetime
|
||
from typing import Any
|
||
|
||
from config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Approximate model-size lookup (GB) used for heuristic power estimate.
|
||
# Keys are lowercase substring matches against the model name.
|
||
_MODEL_SIZE_GB: dict[str, float] = {
|
||
"qwen3:1b": 0.8,
|
||
"qwen3:3b": 2.0,
|
||
"qwen3:4b": 2.5,
|
||
"qwen3:8b": 5.5,
|
||
"qwen3:14b": 9.0,
|
||
"qwen3:30b": 20.0,
|
||
"qwen3:32b": 20.0,
|
||
"llama3:8b": 5.5,
|
||
"llama3:70b": 45.0,
|
||
"mistral:7b": 4.5,
|
||
"gemma3:4b": 2.5,
|
||
"gemma3:12b": 8.0,
|
||
"gemma3:27b": 17.0,
|
||
"phi4:14b": 9.0,
|
||
}
|
||
_DEFAULT_MODEL_SIZE_GB = 5.0 # fallback when model not in table
|
||
_WATTS_PER_GB_HEURISTIC = 2.0 # rough W/GB for Apple Silicon unified memory
|
||
|
||
# Efficiency score normalisation: score 10 at this efficiency (tok/s per W).
|
||
_EFFICIENCY_SCORE_CEILING = 5.0 # tok/s per W → score 10
|
||
|
||
# Rolling window for recent samples
|
||
_HISTORY_MAXLEN = 60
|
||
|
||
|
||
@dataclass
|
||
class InferenceSample:
|
||
"""A single inference event captured by record_inference()."""
|
||
|
||
timestamp: str
|
||
model: str
|
||
tokens_per_second: float
|
||
estimated_watts: float
|
||
efficiency: float # tokens/s per watt
|
||
efficiency_score: float # 0–10
|
||
|
||
|
||
@dataclass
|
||
class EnergyReport:
|
||
"""Snapshot of current energy budget state."""
|
||
|
||
timestamp: str
|
||
low_power_mode: bool
|
||
current_watts: float
|
||
strategy: str # "battery", "cpu_proxy", "heuristic", "unavailable"
|
||
efficiency_score: float # 0–10; -1 if no inference samples yet
|
||
recent_samples: list[InferenceSample]
|
||
recommendation: str
|
||
details: dict[str, Any] = field(default_factory=dict)
|
||
|
||
def to_dict(self) -> dict[str, Any]:
|
||
return {
|
||
"timestamp": self.timestamp,
|
||
"low_power_mode": self.low_power_mode,
|
||
"current_watts": round(self.current_watts, 2),
|
||
"strategy": self.strategy,
|
||
"efficiency_score": round(self.efficiency_score, 2),
|
||
"recent_samples": [
|
||
{
|
||
"timestamp": s.timestamp,
|
||
"model": s.model,
|
||
"tokens_per_second": round(s.tokens_per_second, 1),
|
||
"estimated_watts": round(s.estimated_watts, 2),
|
||
"efficiency": round(s.efficiency, 3),
|
||
"efficiency_score": round(s.efficiency_score, 2),
|
||
}
|
||
for s in self.recent_samples
|
||
],
|
||
"recommendation": self.recommendation,
|
||
"details": self.details,
|
||
}
|
||
|
||
|
||
class EnergyBudgetMonitor:
|
||
"""Estimates power consumption and tracks LLM inference efficiency.
|
||
|
||
All blocking I/O (subprocess calls) is wrapped in asyncio.to_thread()
|
||
so the event loop is never blocked. Results are cached.
|
||
|
||
Usage::
|
||
|
||
# Record an inference event
|
||
energy_monitor.record_inference("qwen3:8b", tokens_per_second=42.0)
|
||
|
||
# Get the current report
|
||
report = await energy_monitor.get_report()
|
||
|
||
# Toggle low power mode
|
||
energy_monitor.set_low_power_mode(True)
|
||
"""
|
||
|
||
_POWER_CACHE_TTL = 10.0 # seconds between fresh power readings
|
||
|
||
def __init__(self) -> None:
|
||
self._low_power_mode: bool = False
|
||
self._samples: deque[InferenceSample] = deque(maxlen=_HISTORY_MAXLEN)
|
||
self._cached_watts: float = 0.0
|
||
self._cached_strategy: str = "unavailable"
|
||
self._cache_ts: float = 0.0
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────
|
||
|
||
@property
|
||
def low_power_mode(self) -> bool:
|
||
return self._low_power_mode
|
||
|
||
def set_low_power_mode(self, enabled: bool) -> None:
|
||
"""Enable or disable low power mode."""
|
||
self._low_power_mode = enabled
|
||
state = "enabled" if enabled else "disabled"
|
||
logger.info("Energy budget: low power mode %s", state)
|
||
|
||
def record_inference(self, model: str, tokens_per_second: float) -> InferenceSample:
|
||
"""Record an inference event for efficiency tracking.
|
||
|
||
Call this after each LLM inference completes with the model name and
|
||
measured throughput. The current power estimate is used to compute
|
||
the efficiency score.
|
||
|
||
Args:
|
||
model: Ollama model name (e.g. "qwen3:8b").
|
||
tokens_per_second: Measured decode throughput.
|
||
|
||
Returns:
|
||
The recorded InferenceSample.
|
||
"""
|
||
watts = self._cached_watts if self._cached_watts > 0 else self._estimate_watts_sync(model)
|
||
efficiency = tokens_per_second / max(watts, 0.1)
|
||
score = min(10.0, (efficiency / _EFFICIENCY_SCORE_CEILING) * 10.0)
|
||
|
||
sample = InferenceSample(
|
||
timestamp=datetime.now(UTC).isoformat(),
|
||
model=model,
|
||
tokens_per_second=tokens_per_second,
|
||
estimated_watts=watts,
|
||
efficiency=efficiency,
|
||
efficiency_score=score,
|
||
)
|
||
self._samples.append(sample)
|
||
|
||
# Auto-engage low power mode if above threshold and budget is enabled
|
||
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
|
||
if watts > threshold and not self._low_power_mode:
|
||
logger.info(
|
||
"Energy budget: %.1fW exceeds threshold %.1fW — auto-engaging low power mode",
|
||
watts,
|
||
threshold,
|
||
)
|
||
self.set_low_power_mode(True)
|
||
|
||
return sample
|
||
|
||
async def get_report(self) -> EnergyReport:
|
||
"""Return the current energy budget report.
|
||
|
||
Refreshes the power estimate if the cache is stale.
|
||
"""
|
||
await self._refresh_power_cache()
|
||
|
||
score = self._compute_mean_efficiency_score()
|
||
recommendation = self._build_recommendation(score)
|
||
|
||
return EnergyReport(
|
||
timestamp=datetime.now(UTC).isoformat(),
|
||
low_power_mode=self._low_power_mode,
|
||
current_watts=self._cached_watts,
|
||
strategy=self._cached_strategy,
|
||
efficiency_score=score,
|
||
recent_samples=list(self._samples)[-10:],
|
||
recommendation=recommendation,
|
||
details={"sample_count": len(self._samples)},
|
||
)
|
||
|
||
# ── Power estimation ──────────────────────────────────────────────────────
|
||
|
||
async def _refresh_power_cache(self) -> None:
|
||
"""Refresh the cached power reading if stale."""
|
||
now = time.monotonic()
|
||
if now - self._cache_ts < self._POWER_CACHE_TTL:
|
||
return
|
||
|
||
try:
|
||
watts, strategy = await asyncio.to_thread(self._read_power)
|
||
except Exception as exc:
|
||
logger.debug("Energy: power read failed: %s", exc)
|
||
watts, strategy = 0.0, "unavailable"
|
||
|
||
self._cached_watts = watts
|
||
self._cached_strategy = strategy
|
||
self._cache_ts = now
|
||
|
||
def _read_power(self) -> tuple[float, str]:
|
||
"""Synchronous power reading — tries strategies in priority order.
|
||
|
||
Returns:
|
||
Tuple of (watts, strategy_name).
|
||
"""
|
||
# Strategy 1: battery discharge via ioreg (on-battery Macs)
|
||
try:
|
||
watts = self._read_battery_watts()
|
||
if watts > 0:
|
||
return watts, "battery"
|
||
except Exception:
|
||
pass
|
||
|
||
# Strategy 2: CPU utilisation proxy via top
|
||
try:
|
||
cpu_pct = self._read_cpu_pct()
|
||
if cpu_pct >= 0:
|
||
# M3 Max TDP ≈ 40W; scale linearly
|
||
watts = (cpu_pct / 100.0) * 40.0
|
||
return watts, "cpu_proxy"
|
||
except Exception:
|
||
pass
|
||
|
||
# Strategy 3: heuristic from loaded model size
|
||
return 0.0, "unavailable"
|
||
|
||
def _estimate_watts_sync(self, model: str) -> float:
|
||
"""Estimate watts from model size when no live reading is available."""
|
||
size_gb = self._model_size_gb(model)
|
||
return size_gb * _WATTS_PER_GB_HEURISTIC
|
||
|
||
def _read_battery_watts(self) -> float:
|
||
"""Read instantaneous battery discharge via ioreg.
|
||
|
||
Returns watts if on battery, 0.0 if plugged in or unavailable.
|
||
Requires macOS; no sudo needed.
|
||
"""
|
||
result = subprocess.run(
|
||
["ioreg", "-r", "-c", "AppleSmartBattery", "-d", "1"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=3,
|
||
)
|
||
amperage_ma = 0.0
|
||
voltage_mv = 0.0
|
||
is_charging = True # assume charging unless we see ExternalConnected = No
|
||
|
||
for line in result.stdout.splitlines():
|
||
stripped = line.strip()
|
||
if '"InstantAmperage"' in stripped:
|
||
try:
|
||
amperage_ma = float(stripped.split("=")[-1].strip())
|
||
except ValueError:
|
||
pass
|
||
elif '"Voltage"' in stripped:
|
||
try:
|
||
voltage_mv = float(stripped.split("=")[-1].strip())
|
||
except ValueError:
|
||
pass
|
||
elif '"ExternalConnected"' in stripped:
|
||
is_charging = "Yes" in stripped
|
||
|
||
if is_charging or voltage_mv == 0 or amperage_ma <= 0:
|
||
return 0.0
|
||
|
||
# ioreg reports amperage in mA, voltage in mV
|
||
return (abs(amperage_ma) * voltage_mv) / 1_000_000
|
||
|
||
def _read_cpu_pct(self) -> float:
|
||
"""Read CPU utilisation from macOS top.
|
||
|
||
Returns aggregate CPU% (0–100), or -1.0 on failure.
|
||
"""
|
||
result = subprocess.run(
|
||
["top", "-l", "1", "-n", "0", "-stats", "cpu"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5,
|
||
)
|
||
for line in result.stdout.splitlines():
|
||
if "CPU usage:" in line:
|
||
# "CPU usage: 12.5% user, 8.3% sys, 79.1% idle"
|
||
parts = line.split()
|
||
try:
|
||
user = float(parts[2].rstrip("%"))
|
||
sys_ = float(parts[4].rstrip("%"))
|
||
return user + sys_
|
||
except (IndexError, ValueError):
|
||
pass
|
||
return -1.0
|
||
|
||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _model_size_gb(model: str) -> float:
|
||
"""Look up approximate model size in GB by name substring."""
|
||
lower = model.lower()
|
||
# Exact match first
|
||
if lower in _MODEL_SIZE_GB:
|
||
return _MODEL_SIZE_GB[lower]
|
||
# Substring match
|
||
for key, size in _MODEL_SIZE_GB.items():
|
||
if key in lower:
|
||
return size
|
||
return _DEFAULT_MODEL_SIZE_GB
|
||
|
||
def _compute_mean_efficiency_score(self) -> float:
|
||
"""Mean efficiency score over recent samples, or -1 if none."""
|
||
if not self._samples:
|
||
return -1.0
|
||
recent = list(self._samples)[-10:]
|
||
return sum(s.efficiency_score for s in recent) / len(recent)
|
||
|
||
def _build_recommendation(self, score: float) -> str:
|
||
"""Generate a human-readable recommendation from the efficiency score."""
|
||
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
|
||
low_power_model = getattr(settings, "energy_low_power_model", "qwen3:1b")
|
||
|
||
if score < 0:
|
||
return "No inference data yet — run some tasks to populate efficiency metrics."
|
||
|
||
if self._low_power_mode:
|
||
return (
|
||
f"Low power mode active — routing to {low_power_model}. "
|
||
"Disable when power draw normalises."
|
||
)
|
||
|
||
if score < 3.0:
|
||
return (
|
||
f"Low efficiency (score {score:.1f}/10). "
|
||
f"Consider enabling low power mode to favour smaller models "
|
||
f"(threshold: {threshold}W)."
|
||
)
|
||
|
||
if score < 6.0:
|
||
return f"Moderate efficiency (score {score:.1f}/10). System operating normally."
|
||
|
||
return f"Good efficiency (score {score:.1f}/10). No action needed."
|
||
|
||
|
||
# Module-level singleton
|
||
energy_monitor = EnergyBudgetMonitor()
|