diff --git a/nexus/adaptive_calibrator.py b/nexus/adaptive_calibrator.py new file mode 100644 index 0000000..16ff42c --- /dev/null +++ b/nexus/adaptive_calibrator.py @@ -0,0 +1,97 @@ +import json +import os +import time +from typing import Dict, List, Optional + +class AdaptiveCalibrator: + """ + Provides online learning for cost estimation accuracy in the sovereign AI stack. + Tracks predicted vs actual metrics (latency, tokens, etc.) and adjusts a + calibration factor to improve future estimates. + """ + + def __init__(self, storage_path: str = "nexus/calibration_state.json"): + self.storage_path = storage_path + self.state = { + "factor": 1.0, + "history": [], + "last_updated": 0, + "total_samples": 0, + "learning_rate": 0.1 + } + self.load() + + def load(self): + if os.path.exists(self.storage_path): + try: + with open(self.storage_path, 'r') as f: + self.state.update(json.load(f)) + except Exception as e: + print(f"Error loading calibration state: {e}") + + def save(self): + try: + with open(self.storage_path, 'w') as f: + json.dump(self.state, f, indent=2) + except Exception as e: + print(f"Error saving calibration state: {e}") + + def predict(self, base_estimate: float) -> float: + """Apply the current calibration factor to a base estimate.""" + return base_estimate * self.state["factor"] + + def update(self, predicted: float, actual: float): + """ + Update the calibration factor based on a new sample. + Uses a simple moving average approach for the factor. + """ + if predicted <= 0 or actual <= 0: + return + + # Ratio of actual to predicted + # If actual > predicted, ratio > 1 (we underestimated, factor should increase) + # If actual < predicted, ratio < 1 (we overestimated, factor should decrease) + ratio = actual / predicted + + # Update factor using learning rate + lr = self.state["learning_rate"] + self.state["factor"] = (1 - lr) * self.state["factor"] + lr * (self.state["factor"] * ratio) + + # Record history (keep last 50 samples) + self.state["history"].append({ + "timestamp": time.time(), + "predicted": predicted, + "actual": actual, + "ratio": ratio + }) + if len(self.state["history"]) > 50: + self.state["history"].pop(0) + + self.state["total_samples"] += 1 + self.state["last_updated"] = time.time() + self.save() + + def get_metrics(self) -> Dict: + """Return current calibration metrics.""" + return { + "current_factor": self.state["factor"], + "total_samples": self.state["total_samples"], + "average_ratio": sum(h["ratio"] for h in self.state["history"]) / len(self.state["history"]) if self.state["history"] else 1.0 + } + +if __name__ == "__main__": + # Simple test/demo + calibrator = AdaptiveCalibrator("nexus/test_calibration.json") + + print(f"Initial factor: {calibrator.state['factor']}") + + # Simulate some samples where we consistently underestimate by 20% + for _ in range(10): + base = 100.0 + pred = calibrator.predict(base) + actual = 120.0 # Reality is 20% higher + calibrator.update(pred, actual) + print(f"Pred: {pred:.2f}, Actual: {actual:.2f}, New Factor: {calibrator.state['factor']:.4f}") + + print("Final metrics:", calibrator.get_metrics()) + os.remove("nexus/test_calibration.json")