98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from typing import Dict, List, Optional
|
|
|
|
class AdaptiveCalibrator:
|
|
"""
|
|
Provides online learning for cost estimation accuracy in the sovereign AI stack.
|
|
Tracks predicted vs actual metrics (latency, tokens, etc.) and adjusts a
|
|
calibration factor to improve future estimates.
|
|
"""
|
|
|
|
def __init__(self, storage_path: str = "nexus/calibration_state.json"):
|
|
self.storage_path = storage_path
|
|
self.state = {
|
|
"factor": 1.0,
|
|
"history": [],
|
|
"last_updated": 0,
|
|
"total_samples": 0,
|
|
"learning_rate": 0.1
|
|
}
|
|
self.load()
|
|
|
|
def load(self):
|
|
if os.path.exists(self.storage_path):
|
|
try:
|
|
with open(self.storage_path, 'r') as f:
|
|
self.state.update(json.load(f))
|
|
except Exception as e:
|
|
print(f"Error loading calibration state: {e}")
|
|
|
|
def save(self):
|
|
try:
|
|
with open(self.storage_path, 'w') as f:
|
|
json.dump(self.state, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error saving calibration state: {e}")
|
|
|
|
def predict(self, base_estimate: float) -> float:
|
|
"""Apply the current calibration factor to a base estimate."""
|
|
return base_estimate * self.state["factor"]
|
|
|
|
def update(self, predicted: float, actual: float):
|
|
"""
|
|
Update the calibration factor based on a new sample.
|
|
Uses a simple moving average approach for the factor.
|
|
"""
|
|
if predicted <= 0 or actual <= 0:
|
|
return
|
|
|
|
# Ratio of actual to predicted
|
|
# If actual > predicted, ratio > 1 (we underestimated, factor should increase)
|
|
# If actual < predicted, ratio < 1 (we overestimated, factor should decrease)
|
|
ratio = actual / predicted
|
|
|
|
# Update factor using learning rate
|
|
lr = self.state["learning_rate"]
|
|
self.state["factor"] = (1 - lr) * self.state["factor"] + lr * (self.state["factor"] * ratio)
|
|
|
|
# Record history (keep last 50 samples)
|
|
self.state["history"].append({
|
|
"timestamp": time.time(),
|
|
"predicted": predicted,
|
|
"actual": actual,
|
|
"ratio": ratio
|
|
})
|
|
if len(self.state["history"]) > 50:
|
|
self.state["history"].pop(0)
|
|
|
|
self.state["total_samples"] += 1
|
|
self.state["last_updated"] = time.time()
|
|
self.save()
|
|
|
|
def get_metrics(self) -> Dict:
|
|
"""Return current calibration metrics."""
|
|
return {
|
|
"current_factor": self.state["factor"],
|
|
"total_samples": self.state["total_samples"],
|
|
"average_ratio": sum(h["ratio"] for h in self.state["history"]) / len(self.state["history"]) if self.state["history"] else 1.0
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
# Simple test/demo
|
|
calibrator = AdaptiveCalibrator("nexus/test_calibration.json")
|
|
|
|
print(f"Initial factor: {calibrator.state['factor']}")
|
|
|
|
# Simulate some samples where we consistently underestimate by 20%
|
|
for _ in range(10):
|
|
base = 100.0
|
|
pred = calibrator.predict(base)
|
|
actual = 120.0 # Reality is 20% higher
|
|
calibrator.update(pred, actual)
|
|
print(f"Pred: {pred:.2f}, Actual: {actual:.2f}, New Factor: {calibrator.state['factor']:.4f}")
|
|
|
|
print("Final metrics:", calibrator.get_metrics())
|
|
os.remove("nexus/test_calibration.json")
|