diff --git a/nexus/adaptive_calibrator.py b/nexus/adaptive_calibrator.py new file mode 100644 index 0000000..16ff42c --- /dev/null +++ b/nexus/adaptive_calibrator.py @@ -0,0 +1,97 @@ +import json +import os +import time +from typing import Dict, List, Optional + +class AdaptiveCalibrator: + """ + Provides online learning for cost estimation accuracy in the sovereign AI stack. + Tracks predicted vs actual metrics (latency, tokens, etc.) and adjusts a + calibration factor to improve future estimates. + """ + + def __init__(self, storage_path: str = "nexus/calibration_state.json"): + self.storage_path = storage_path + self.state = { + "factor": 1.0, + "history": [], + "last_updated": 0, + "total_samples": 0, + "learning_rate": 0.1 + } + self.load() + + def load(self): + if os.path.exists(self.storage_path): + try: + with open(self.storage_path, 'r') as f: + self.state.update(json.load(f)) + except Exception as e: + print(f"Error loading calibration state: {e}") + + def save(self): + try: + with open(self.storage_path, 'w') as f: + json.dump(self.state, f, indent=2) + except Exception as e: + print(f"Error saving calibration state: {e}") + + def predict(self, base_estimate: float) -> float: + """Apply the current calibration factor to a base estimate.""" + return base_estimate * self.state["factor"] + + def update(self, predicted: float, actual: float): + """ + Update the calibration factor based on a new sample. + Uses a simple moving average approach for the factor. + """ + if predicted <= 0 or actual <= 0: + return + + # Ratio of actual to predicted + # If actual > predicted, ratio > 1 (we underestimated, factor should increase) + # If actual < predicted, ratio < 1 (we overestimated, factor should decrease) + ratio = actual / predicted + + # Update factor using learning rate + lr = self.state["learning_rate"] + self.state["factor"] = (1 - lr) * self.state["factor"] + lr * (self.state["factor"] * ratio) + + # Record history (keep last 50 samples) + self.state["history"].append({ + "timestamp": time.time(), + "predicted": predicted, + "actual": actual, + "ratio": ratio + }) + if len(self.state["history"]) > 50: + self.state["history"].pop(0) + + self.state["total_samples"] += 1 + self.state["last_updated"] = time.time() + self.save() + + def get_metrics(self) -> Dict: + """Return current calibration metrics.""" + return { + "current_factor": self.state["factor"], + "total_samples": self.state["total_samples"], + "average_ratio": sum(h["ratio"] for h in self.state["history"]) / len(self.state["history"]) if self.state["history"] else 1.0 + } + +if __name__ == "__main__": + # Simple test/demo + calibrator = AdaptiveCalibrator("nexus/test_calibration.json") + + print(f"Initial factor: {calibrator.state['factor']}") + + # Simulate some samples where we consistently underestimate by 20% + for _ in range(10): + base = 100.0 + pred = calibrator.predict(base) + actual = 120.0 # Reality is 20% higher + calibrator.update(pred, actual) + print(f"Pred: {pred:.2f}, Actual: {actual:.2f}, New Factor: {calibrator.state['factor']:.4f}") + + print("Final metrics:", calibrator.get_metrics()) + os.remove("nexus/test_calibration.json") diff --git a/nexus/nostr_identity.py b/nexus/nostr_identity.py new file mode 100644 index 0000000..82587e9 --- /dev/null +++ b/nexus/nostr_identity.py @@ -0,0 +1,102 @@ + +import hashlib +import hmac +import os +import binascii + +# ═══════════════════════════════════════════ +# NOSTR SOVEREIGN IDENTITY (NIP-01) +# ═══════════════════════════════════════════ +# Pure Python implementation of Schnorr signatures for Nostr. +# No dependencies required. + +def sha256(data): + return hashlib.sha256(data).digest() + +def hmac_sha256(key, data): + return hmac.new(key, data, hashlib.sha256).digest() + +# Secp256k1 Constants +P = 2**256 - 2**32 - 977 +N = 115792089237316195423570985008687907852837564279074904382605163141518161494337 +G = (0x79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798, + 0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8) + +def inverse(a, n): + return pow(a, n - 2, n) + +def point_add(p1, p2): + if p1 is None: return p2 + if p2 is None: return p1 + (x1, y1), (x2, y2) = p1, p2 + if x1 == x2 and y1 != y2: return None + if x1 == x2: + m = (3 * x1 * x1 * inverse(2 * y1, P)) % P + else: + m = ((y2 - y1) * inverse(x2 - x1, P)) % P + x3 = (m * m - x1 - x2) % P + y3 = (m * (x1 - x3) - y1) % P + return (x3, y3) + +def point_mul(p, n): + r = None + for i in range(256): + if (n >> i) & 1: + r = point_add(r, p) + p = point_add(p, p) + return r + +def get_pubkey(privkey): + p = point_mul(G, privkey) + return binascii.hexlify(p[0].to_bytes(32, 'big')).decode() + +# Schnorr Signature (BIP340) +def sign_schnorr(msg_hash, privkey): + k = int.from_bytes(sha256(privkey.to_bytes(32, 'big') + msg_hash), 'big') % N + R = point_mul(G, k) + if R[1] % 2 != 0: + k = N - k + r = R[0].to_bytes(32, 'big') + e = int.from_bytes(sha256(r + binascii.unhexlify(get_pubkey(privkey)) + msg_hash), 'big') % N + s = (k + e * privkey) % N + return binascii.hexlify(r + s.to_bytes(32, 'big')).decode() + +class NostrIdentity: + def __init__(self, privkey_hex=None): + if privkey_hex: + self.privkey = int(privkey_hex, 16) + else: + self.privkey = int.from_bytes(os.urandom(32), 'big') % N + self.pubkey = get_pubkey(self.privkey) + + def sign_event(self, event): + # NIP-01 Event Signing + import json + event_data = [ + 0, + event['pubkey'], + event['created_at'], + event['kind'], + event['tags'], + event['content'] + ] + serialized = json.dumps(event_data, separators=(',', ':')) + msg_hash = sha256(serialized.encode()) + event['id'] = binascii.hexlify(msg_hash).decode() + event['sig'] = sign_schnorr(msg_hash, self.privkey) + return event + +if __name__ == "__main__": + # Test Identity + identity = NostrIdentity() + print(f"Nostr Pubkey: {identity.pubkey}") + + event = { + "pubkey": identity.pubkey, + "created_at": 1677628800, + "kind": 1, + "tags": [], + "content": "Sovereignty and service always. #Timmy" + } + signed_event = identity.sign_event(event) + print(f"Signed Event: {signed_event}") diff --git a/nexus/nostr_publisher.py b/nexus/nostr_publisher.py new file mode 100644 index 0000000..185c45e --- /dev/null +++ b/nexus/nostr_publisher.py @@ -0,0 +1,55 @@ + +import asyncio +import websockets +import json +import time +import os +from nostr_identity import NostrIdentity + +# ═══════════════════════════════════════════ +# NOSTR SOVEREIGN PUBLISHER +# ═══════════════════════════════════════════ + +RELAYS = [ + "wss://relay.damus.io", + "wss://nos.lol", + "wss://relay.snort.social" +] + +async def publish_soul(identity, soul_content): + event = { + "pubkey": identity.pubkey, + "created_at": int(time.time()), + "kind": 1, # Text note + "tags": [["t", "TimmyFoundation"], ["t", "SovereignAI"]], + "content": soul_content + } + signed_event = identity.sign_event(event) + message = json.dumps(["EVENT", signed_event]) + + for relay in RELAYS: + try: + print(f"Publishing to {relay}...") + async with websockets.connect(relay, timeout=10) as ws: + await ws.send(message) + print(f"Successfully published to {relay}") + except Exception as e: + print(f"Failed to publish to {relay}: {e}") + +async def main(): + # Load SOUL.md + soul_path = os.path.join(os.path.dirname(__file__), "../SOUL.md") + if os.path.exists(soul_path): + with open(soul_path, "r") as f: + soul_content = f.read() + else: + soul_content = "Sovereignty and service always. #Timmy" + + # Initialize Identity (In production, load from secure storage) + identity = NostrIdentity() + print(f"Timmy's Nostr Identity: npub1{identity.pubkey}") + + await publish_soul(identity, soul_content) + +if __name__ == "__main__": + asyncio.run(main())