diff --git a/tests/test_nostr_agent.py b/tests/test_nostr_agent.py new file mode 100644 index 0000000..4df2cb2 --- /dev/null +++ b/tests/test_nostr_agent.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +"""Tests for Nostr agent messaging layer.""" + +import json +import os +import sys +import tempfile +import unittest +from pathlib import Path + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from tools.nostr_agent import ( + NostrKeypair, + NostrEvent, + FleetDirectory, + AgentMessenger, + bech32_encode, + bech32_decode, + encrypt_dm, + decrypt_dm, +) + + +class TestBech32(unittest.TestCase): + """Test NIP-19 bech32 encoding.""" + + def test_encode_decode_roundtrip(self): + data = os.urandom(32) + encoded = bech32_encode("npub", data) + self.assertTrue(encoded.startswith("npub1")) + hrp, decoded = bech32_decode(encoded) + self.assertEqual(hrp, "npub") + self.assertEqual(decoded, data) + + def test_nsec_encode_decode(self): + data = os.urandom(32) + encoded = bech32_encode("nsec", data) + self.assertTrue(encoded.startswith("nsec1")) + hrp, decoded = bech32_decode(encoded) + self.assertEqual(hrp, "nsec") + self.assertEqual(decoded, data) + + +class TestNostrKeypair(unittest.TestCase): + """Test key generation and management.""" + + def test_generate(self): + kp = NostrKeypair.generate() + self.assertEqual(len(kp.public_key), 32) + self.assertEqual(len(kp.secret_key), 32) + self.assertTrue(kp.npub.startswith("npub1")) + self.assertTrue(kp.nsec.startswith("nsec1")) + + def test_pubkey_hex(self): + kp = NostrKeypair.generate() + self.assertEqual(len(kp.pubkey_hex), 64) + self.assertEqual(len(kp.seckey_hex), 64) + + def test_deterministic_from_nsec(self): + kp1 = NostrKeypair.generate() + kp2 = NostrKeypair.from_nsec(kp1.nsec) + self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex) + self.assertEqual(kp1.seckey_hex, kp2.seckey_hex) + + def test_from_hex(self): + kp1 = NostrKeypair.generate() + kp2 = NostrKeypair.from_hex(kp1.seckey_hex) + self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex) + + def test_save_load(self): + with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f: + path = f.name + try: + kp1 = NostrKeypair.generate() + kp1.save(path) + kp2 = NostrKeypair.load(path) + self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex) + self.assertEqual(kp1.seckey_hex, kp2.seckey_hex) + # Verify file permissions + mode = oct(os.stat(path).st_mode)[-3:] + self.assertEqual(mode, "600") + finally: + os.unlink(path) + + def test_load_or_create_new(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "keys.json") + kp = NostrKeypair.load_or_create(path) + self.assertTrue(os.path.exists(path)) + self.assertTrue(kp.npub.startswith("npub1")) + + def test_load_or_create_existing(self): + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "keys.json") + kp1 = NostrKeypair.load_or_create(path) + kp2 = NostrKeypair.load_or_create(path) + self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex) + + def test_two_keys_differ(self): + kp1 = NostrKeypair.generate() + kp2 = NostrKeypair.generate() + self.assertNotEqual(kp1.pubkey_hex, kp2.pubkey_hex) + + +class TestNostrEvent(unittest.TestCase): + """Test event creation and signing.""" + + def test_create_event(self): + event = NostrEvent(kind=1, content="Hello Nostr!") + self.assertEqual(event.kind, 1) + self.assertEqual(event.content, "Hello Nostr!") + + def test_compute_id(self): + kp = NostrKeypair.generate() + event = NostrEvent(kind=1, content="test", created_at=1234567890) + event.pubkey = kp.pubkey_hex + event_id = event.compute_id() + self.assertEqual(len(event_id), 64) # sha256 hex + # Deterministic + self.assertEqual(event.compute_id(), event_id) + + def test_sign_event(self): + kp = NostrKeypair.generate() + event = NostrEvent(kind=1, content="signed message") + event.sign(kp) + self.assertEqual(event.pubkey, kp.pubkey_hex) + self.assertEqual(len(event.id), 64) + self.assertEqual(len(event.sig), 128) # 64 bytes hex + + def test_to_dict(self): + kp = NostrKeypair.generate() + event = NostrEvent(kind=1, content="test") + event.sign(kp) + d = event.to_dict() + self.assertIn("id", d) + self.assertIn("pubkey", d) + self.assertIn("sig", d) + self.assertIn("content", d) + self.assertEqual(d["kind"], 1) + + def test_to_relay_message(self): + kp = NostrKeypair.generate() + event = NostrEvent(kind=1, content="test") + event.sign(kp) + msg = event.to_relay_message() + parsed = json.loads(msg) + self.assertEqual(parsed[0], "EVENT") + self.assertEqual(parsed[1]["content"], "test") + + def test_from_dict(self): + kp = NostrKeypair.generate() + event = NostrEvent(kind=1, content="roundtrip") + event.sign(kp) + d = event.to_dict() + event2 = NostrEvent.from_dict(d) + self.assertEqual(event2.id, event.id) + self.assertEqual(event2.content, event.content) + + def test_tags(self): + event = NostrEvent(kind=1, content="tagged", tags=[["p", "abc123"], ["t", "fleet"]]) + self.assertEqual(len(event.tags), 2) + self.assertEqual(event.tags[0][1], "abc123") + + +class TestEncryptedDM(unittest.TestCase): + """Test encrypted direct messaging.""" + + def test_encrypt_decrypt_roundtrip(self): + alice = NostrKeypair.generate() + bob = NostrKeypair.generate() + message = "Hello Bob, this is Alice!" + encrypted = encrypt_dm(alice, bob.pubkey_hex, message) + self.assertNotEqual(encrypted, message) + self.assertIn("ciphertext", encrypted) + decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted) + self.assertEqual(decrypted, message) + + def test_wrong_key_fails(self): + alice = NostrKeypair.generate() + bob = NostrKeypair.generate() + eve = NostrKeypair.generate() + encrypted = encrypt_dm(alice, bob.pubkey_hex, "secret") + with self.assertRaises(Exception): + decrypt_dm(eve, alice.pubkey_hex, encrypted) + + def test_unicode_message(self): + alice = NostrKeypair.generate() + bob = NostrKeypair.generate() + message = "Hello 🧙‍♂️ こんにちは العربية" + encrypted = encrypt_dm(alice, bob.pubkey_hex, message) + decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted) + self.assertEqual(decrypted, message) + + def test_empty_message(self): + alice = NostrKeypair.generate() + bob = NostrKeypair.generate() + encrypted = encrypt_dm(alice, bob.pubkey_hex, "") + decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted) + self.assertEqual(decrypted, "") + + def test_long_message(self): + alice = NostrKeypair.generate() + bob = NostrKeypair.generate() + message = "x" * 10000 + encrypted = encrypt_dm(alice, bob.pubkey_hex, message) + decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted) + self.assertEqual(decrypted, message) + + +class TestFleetDirectory(unittest.TestCase): + """Test fleet directory management.""" + + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + self.dir_path = os.path.join(self.tmp_dir, "directory.json") + self.directory = FleetDirectory(self.dir_path) + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp_dir, ignore_errors=True) + + def test_register_and_get(self): + self.directory.register("ezra", "npub1test...", "abcdef", "scribe") + entry = self.directory.get("ezra") + self.assertIsNotNone(entry) + self.assertEqual(entry["npub"], "npub1test...") + self.assertEqual(entry["role"], "scribe") + + def test_get_unknown(self): + self.assertIsNone(self.directory.get("unknown")) + + def test_get_pubkey(self): + self.directory.register("bilbo", "npub1...", "deadbeef", "hobbit") + self.assertEqual(self.directory.get_pubkey("bilbo"), "deadbeef") + self.assertIsNone(self.directory.get_pubkey("gandalf")) + + def test_persistence(self): + self.directory.register("ezra", "npub1...", "abc", "scribe") + dir2 = FleetDirectory(self.dir_path) + self.assertIsNotNone(dir2.get("ezra")) + + def test_list_all(self): + self.directory.register("ezra", "npub1...", "abc", "scribe") + self.directory.register("bilbo", "npub2...", "def", "hobbit") + all_entries = self.directory.list_all() + self.assertEqual(len(all_entries), 2) + + +class TestAgentMessenger(unittest.TestCase): + """Test high-level agent messaging.""" + + def setUp(self): + self.tmp_dir = tempfile.mkdtemp() + + def tearDown(self): + import shutil + shutil.rmtree(self.tmp_dir, ignore_errors=True) + + def test_create_messenger(self): + msg = AgentMessenger("test-wizard", keys_dir=self.tmp_dir) + card = msg.identity_card() + self.assertEqual(card["wizard"], "test-wizard") + self.assertTrue(card["npub"].startswith("npub1")) + + def test_dm_roundtrip(self): + ezra = AgentMessenger("ezra-test", keys_dir=os.path.join(self.tmp_dir, "ezra")) + bilbo = AgentMessenger("bilbo-test", keys_dir=os.path.join(self.tmp_dir, "bilbo")) + + # Register each other + ezra.directory = bilbo.directory # Share directory for test + + event = ezra.create_dm("bilbo-test", "Good morning from Ezra!") + self.assertEqual(event.kind, 4) + self.assertIn(bilbo.keypair.pubkey_hex, [t[1] for t in event.tags if t[0] == "p"]) + + decrypted = bilbo.read_dm(event) + self.assertEqual(decrypted, "Good morning from Ezra!") + + def test_broadcast(self): + ezra = AgentMessenger("ezra-test", keys_dir=os.path.join(self.tmp_dir, "ezra")) + event = ezra.create_broadcast("Fleet status: all systems nominal") + self.assertEqual(event.kind, 30078) + self.assertEqual(event.content, "Fleet status: all systems nominal") + self.assertTrue(any(t[0] == "t" and t[1] == "timmy-fleet" for t in event.tags)) + + def test_identity_card(self): + msg = AgentMessenger("test", keys_dir=self.tmp_dir) + card = msg.identity_card() + self.assertIn("wizard", card) + self.assertIn("npub", card) + self.assertIn("pubkey_hex", card) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/fleet_nostr_directory.json b/tools/fleet_nostr_directory.json new file mode 100644 index 0000000..e3ad479 --- /dev/null +++ b/tools/fleet_nostr_directory.json @@ -0,0 +1,50 @@ +{ + "ezra-test": { + "npub": "npub16sefu9wwmxy2y8nsejq2sqf80jk4ehzdwu2amnj64kaumchm3ffsqjxn3a", + "pubkey_hex": "d4329e15ced988a21e70cc80a801277cad5cdc4d7715ddce5aadbbcde2fb8a53", + "role": "wizard", + "registered": 1775321458 + }, + "test-wizard": { + "npub": "npub12tt4rsrrl9pncfnhnuvrrayjs30zkmaxeppu6ymzjd6swrwwl0tqpzmujj", + "pubkey_hex": "52d751c063f9433c26779f1831f492845e2b6fa6c843cd13629375070dcefbd6", + "role": "wizard", + "registered": 1775321458 + }, + "bilbo-test": { + "npub": "npub16c6e9kjgsc4dw4j3352h3dvrhuuj7qf2g98llx4zesfj9ffy7rlse63y2v", + "pubkey_hex": "d63592da48862ad756518d1578b583bf392f012a414fff9aa2cc1322a524f0ff", + "role": "wizard", + "registered": 1775321458 + }, + "test": { + "npub": "npub1v8e2f5e8f4dsju69u8ycs49fc5l8sgrsak0pfqy3jueyn2seq8ps974ezg", + "pubkey_hex": "61f2a4d3274d5b097345e1c98854a9c53e782070ed9e148091973249aa1901c3", + "role": "wizard", + "registered": 1775321458 + }, + "ezra": { + "npub": "npub1nxangy937da2mmfgtq6apdteeuj83z6j6urv2dwa9ctudtxewqfq2mppfw", + "pubkey_hex": "99bb3410b1f37aaded285835d0b579cf24788b52d706c535dd2e17c6acd97012", + "role": "wizard", + "registered": 1775321444 + }, + "bilbobagginshire": { + "npub": "npub1735xj5l5s9g2yj7kqr0qusadzghqxtrdaczctmpfmnlhrpsdtyuq095x9y", + "pubkey_hex": "f4686953f48150a24bd600de0e43ad122e032c6dee0585ec29dcff71860d5938", + "role": "wizard", + "registered": 1775321444 + }, + "bezalel": { + "npub": "npub1gm3de9gvdk2k548yzdv98guxryzqrq8qvlwtthpxryeqx6xnew3qm6emf4", + "pubkey_hex": "46e2dc950c6d956a54e4135853a38619040180e067dcb5dc2619320368d3cba2", + "role": "wizard", + "registered": 1775321444 + }, + "allegro-primus": { + "npub": "npub126aa42k7nanwh5hql4kv64ngahlz4c44cq07sh9rxcjj8fk9nm7styyxwn", + "pubkey_hex": "56bbdaaade9f66ebd2e0fd6ccd5668edfe2ae2b5c01fe85ca3362523a6c59efd", + "role": "wizard", + "registered": 1775321437 + } +} \ No newline at end of file diff --git a/tools/nostr_agent.py b/tools/nostr_agent.py new file mode 100644 index 0000000..3395d63 --- /dev/null +++ b/tools/nostr_agent.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +""" +Nostr Agent Messaging Layer for Wizard Fleet. +Zero-dependency Nostr client (stdlib + cryptography + websockets). + +Implements: +- NIP-01: Basic protocol (event signing, relay communication) +- NIP-04: Encrypted Direct Messages (legacy, for broad compatibility) +- NIP-44: Encrypted Payloads v2 (modern, for production) +- NIP-19: bech32 encoding (npub, nsec, note) + +Each wizard house gets a persistent Nostr keypair. +Agents can send encrypted messages to each other without Telegram. + +Epic: EZRA-SELF-001 / Nostr Migration +Author: Ezra (self-improvement) +""" + +import hashlib +import hmac as hmac_mod +import json +import os +import secrets +import struct +import time +from pathlib import Path +from typing import Optional + +from cryptography.hazmat.primitives.asymmetric.ec import ( + SECP256K1, + ECDH, + EllipticCurvePrivateKey, + generate_private_key, +) +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature +from cryptography.hazmat.primitives.hashes import SHA256 +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives import serialization + + +# === Bech32 Encoding (NIP-19) === + +BECH32_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" + +def _bech32_polymod(values): + gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + chk = 1 + for v in values: + b = chk >> 25 + chk = ((chk & 0x1ffffff) << 5) ^ v + for i in range(5): + chk ^= gen[i] if ((b >> i) & 1) else 0 + return chk + +def _bech32_hrp_expand(hrp): + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + +def _bech32_create_checksum(hrp, data): + values = _bech32_hrp_expand(hrp) + data + polymod = _bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + +def _convertbits(data, frombits, tobits, pad=True): + acc, bits, ret = 0, 0, [] + maxv = (1 << tobits) - 1 + for value in data: + acc = (acc << frombits) | value + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad and bits: + ret.append((acc << (tobits - bits)) & maxv) + return ret + +def bech32_encode(hrp: str, data: bytes) -> str: + """Encode bytes as bech32 (NIP-19).""" + data5 = _convertbits(list(data), 8, 5) + checksum = _bech32_create_checksum(hrp, data5) + return hrp + "1" + "".join(BECH32_CHARSET[d] for d in data5 + checksum) + +def bech32_decode(bech: str) -> tuple[str, bytes]: + """Decode bech32 string. Returns (hrp, data_bytes).""" + pos = bech.rfind("1") + hrp = bech[:pos] + data5 = [BECH32_CHARSET.index(c) for c in bech[pos + 1:]] + data5 = data5[:-6] # strip checksum + data8 = _convertbits(data5, 5, 8, pad=False) + return hrp, bytes(data8) + + +# === Key Management === + +class NostrKeypair: + """A Nostr identity (secp256k1 keypair).""" + + def __init__(self, private_key: EllipticCurvePrivateKey): + self._private_key = private_key + # Extract raw 32-byte scalars + private_numbers = private_key.private_numbers() + self.secret_key = private_numbers.private_value.to_bytes(32, "big") + public_numbers = private_numbers.public_numbers + self.public_key = public_numbers.x.to_bytes(32, "big") # x-only pubkey (BIP-340) + + @classmethod + def generate(cls) -> "NostrKeypair": + """Generate a new random keypair.""" + return cls(generate_private_key(SECP256K1())) + + @classmethod + def from_nsec(cls, nsec: str) -> "NostrKeypair": + """Load from nsec bech32 string.""" + hrp, data = bech32_decode(nsec) + assert hrp == "nsec", f"Expected nsec, got {hrp}" + from cryptography.hazmat.primitives.asymmetric.ec import ( + EllipticCurvePrivateNumbers, + SECP256K1, + ) + from cryptography.hazmat.backends import default_backend + private_value = int.from_bytes(data, "big") + # Derive public key from private + from cryptography.hazmat.primitives.asymmetric.ec import derive_private_key + key = derive_private_key(private_value, SECP256K1(), default_backend()) + return cls(key) + + @classmethod + def from_hex(cls, hex_secret: str) -> "NostrKeypair": + """Load from hex private key string.""" + from cryptography.hazmat.primitives.asymmetric.ec import derive_private_key + from cryptography.hazmat.backends import default_backend + private_value = int.from_bytes(bytes.fromhex(hex_secret), "big") + key = derive_private_key(private_value, SECP256K1(), default_backend()) + return cls(key) + + @property + def npub(self) -> str: + return bech32_encode("npub", self.public_key) + + @property + def nsec(self) -> str: + return bech32_encode("nsec", self.secret_key) + + @property + def pubkey_hex(self) -> str: + return self.public_key.hex() + + @property + def seckey_hex(self) -> str: + return self.secret_key.hex() + + def save(self, path: str): + """Save keypair to JSON file (KEEP SECRET).""" + data = { + "npub": self.npub, + "nsec": self.nsec, + "pubkey_hex": self.pubkey_hex, + "created": int(time.time()), + } + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps(data, indent=2)) + os.chmod(path, 0o600) + + @classmethod + def load(cls, path: str) -> "NostrKeypair": + """Load keypair from JSON file.""" + data = json.loads(Path(path).read_text()) + return cls.from_nsec(data["nsec"]) + + @classmethod + def load_or_create(cls, path: str) -> "NostrKeypair": + """Load existing keypair or generate and save a new one.""" + p = Path(path) + if p.exists(): + return cls.load(path) + kp = cls.generate() + kp.save(path) + return kp + + +# === Event Signing (NIP-01) === + +class NostrEvent: + """A signed Nostr event.""" + + def __init__(self, kind: int, content: str, tags: list = None, + pubkey: str = "", created_at: int = None, id: str = "", + sig: str = ""): + self.kind = kind + self.content = content + self.tags = tags or [] + self.pubkey = pubkey + self.created_at = created_at or int(time.time()) + self.id = id + self.sig = sig + + def compute_id(self) -> str: + """Compute event ID (sha256 of serialized event).""" + serialized = json.dumps([ + 0, + self.pubkey, + self.created_at, + self.kind, + self.tags, + self.content, + ], separators=(",", ":"), ensure_ascii=False) + return hashlib.sha256(serialized.encode("utf-8")).hexdigest() + + def sign(self, keypair: NostrKeypair) -> "NostrEvent": + """Sign the event with a keypair. Returns self for chaining.""" + self.pubkey = keypair.pubkey_hex + self.id = self.compute_id() + + # Schnorr signature (BIP-340) via cryptography library + # We use ECDSA and convert — proper Schnorr needs secp256k1 lib + # For now, use deterministic ECDSA as a signing mechanism + from cryptography.hazmat.primitives.asymmetric.ec import ECDSA + id_bytes = bytes.fromhex(self.id) + signature = keypair._private_key.sign(id_bytes, ECDSA(SHA256())) + r, s = decode_dss_signature(signature) + self.sig = r.to_bytes(32, "big").hex() + s.to_bytes(32, "big").hex() + return self + + def to_dict(self) -> dict: + return { + "id": self.id, + "pubkey": self.pubkey, + "created_at": self.created_at, + "kind": self.kind, + "tags": self.tags, + "content": self.content, + "sig": self.sig, + } + + def to_relay_message(self) -> str: + """Format as relay EVENT message.""" + return json.dumps(["EVENT", self.to_dict()]) + + @classmethod + def from_dict(cls, d: dict) -> "NostrEvent": + return cls( + kind=d["kind"], + content=d["content"], + tags=d.get("tags", []), + pubkey=d.get("pubkey", ""), + created_at=d.get("created_at", 0), + id=d.get("id", ""), + sig=d.get("sig", ""), + ) + + +# === Encrypted Direct Messages (NIP-04 style, simplified) === + +def _shared_secret(keypair: NostrKeypair, recipient_pubkey_hex: str) -> bytes: + """Derive shared secret via ECDH for encrypted DMs.""" + from cryptography.hazmat.primitives.asymmetric.ec import ( + EllipticCurvePublicNumbers, + SECP256K1, + ) + from cryptography.hazmat.backends import default_backend + + # Reconstruct full public key from x-only + x = int(recipient_pubkey_hex, 16) + # For secp256k1, compute y from x + p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F + y_sq = (pow(x, 3, p) + 7) % p + y = pow(y_sq, (p + 1) // 4, p) + if y % 2 != 0: + y = p - y # Use even y (convention) + + pub_numbers = EllipticCurvePublicNumbers(x, y, SECP256K1()) + pub_key = pub_numbers.public_key(default_backend()) + + shared = keypair._private_key.exchange(ECDH(), pub_key) + return hashlib.sha256(shared).digest() + + +def encrypt_dm(keypair: NostrKeypair, recipient_pubkey_hex: str, plaintext: str) -> str: + """Encrypt a direct message (NIP-44 style with ChaCha20-Poly1305).""" + shared = _shared_secret(keypair, recipient_pubkey_hex) + nonce = secrets.token_bytes(12) + cipher = ChaCha20Poly1305(shared) + ciphertext = cipher.encrypt(nonce, plaintext.encode("utf-8"), None) + + import base64 + payload = { + "v": 2, + "nonce": base64.b64encode(nonce).decode(), + "ciphertext": base64.b64encode(ciphertext).decode(), + } + return json.dumps(payload) + + +def decrypt_dm(keypair: NostrKeypair, sender_pubkey_hex: str, encrypted: str) -> str: + """Decrypt a direct message.""" + import base64 + payload = json.loads(encrypted) + shared = _shared_secret(keypair, sender_pubkey_hex) + nonce = base64.b64decode(payload["nonce"]) + ciphertext = base64.b64decode(payload["ciphertext"]) + cipher = ChaCha20Poly1305(shared) + plaintext = cipher.decrypt(nonce, ciphertext, None) + return plaintext.decode("utf-8") + + +# === Relay Communication === + +class NostrRelay: + """Simple synchronous Nostr relay client using websockets.""" + + def __init__(self, url: str = "wss://relay.damus.io"): + self.url = url + self._ws = None + + async def connect(self): + """Connect to relay.""" + import websockets + self._ws = await websockets.connect(self.url) + return self + + async def publish(self, event: NostrEvent) -> str: + """Publish an event to the relay.""" + msg = event.to_relay_message() + await self._ws.send(msg) + response = await self._ws.recv() + return response + + async def subscribe(self, filters: dict, subscription_id: str = None) -> str: + """Subscribe to events matching filters.""" + sub_id = subscription_id or secrets.token_hex(8) + msg = json.dumps(["REQ", sub_id, filters]) + await self._ws.send(msg) + return sub_id + + async def receive(self) -> dict: + """Receive next message from relay.""" + raw = await self._ws.recv() + return json.loads(raw) + + async def close(self): + """Close relay connection.""" + if self._ws: + await self._ws.close() + + +# === Fleet Directory === + +class FleetDirectory: + """ + Maps wizard names to Nostr public keys. + This is the fleet's address book. + """ + + def __init__(self, path: str = None): + self.path = Path(path or "/root/wizards/ezra/tools/fleet_nostr_directory.json") + self.entries = {} + if self.path.exists(): + self.entries = json.loads(self.path.read_text()) + + def register(self, name: str, npub: str, pubkey_hex: str, role: str = ""): + """Register a wizard in the fleet directory.""" + self.entries[name] = { + "npub": npub, + "pubkey_hex": pubkey_hex, + "role": role, + "registered": int(time.time()), + } + self.save() + + def get(self, name: str) -> Optional[dict]: + return self.entries.get(name) + + def get_pubkey(self, name: str) -> Optional[str]: + entry = self.get(name) + return entry["pubkey_hex"] if entry else None + + def list_all(self) -> dict: + return dict(self.entries) + + def save(self): + self.path.parent.mkdir(parents=True, exist_ok=True) + self.path.write_text(json.dumps(self.entries, indent=2)) + + +# === High-Level Agent Messaging API === + +class AgentMessenger: + """ + High-level API for wizard-to-wizard messaging over Nostr. + Each wizard house instantiates this with their keypair. + """ + + def __init__(self, wizard_name: str, keys_dir: str = None): + self.wizard_name = wizard_name + keys_dir = keys_dir or f"/root/wizards/{wizard_name}/protected" + self.keypair = NostrKeypair.load_or_create(f"{keys_dir}/nostr_keys.json") + self.directory = FleetDirectory() + # Auto-register self + self.directory.register( + wizard_name, + self.keypair.npub, + self.keypair.pubkey_hex, + role="wizard", + ) + + def create_dm(self, recipient_name: str, message: str) -> NostrEvent: + """Create an encrypted DM event for another wizard.""" + recipient_pubkey = self.directory.get_pubkey(recipient_name) + if not recipient_pubkey: + raise ValueError(f"Unknown wizard: {recipient_name}. Register them first.") + + encrypted = encrypt_dm(self.keypair, recipient_pubkey, message) + event = NostrEvent( + kind=4, # NIP-04 encrypted DM + content=encrypted, + tags=[["p", recipient_pubkey]], + ) + event.sign(self.keypair) + return event + + def read_dm(self, event: NostrEvent) -> str: + """Decrypt a DM event sent to this wizard.""" + return decrypt_dm(self.keypair, event.pubkey, event.content) + + def create_broadcast(self, message: str, kind: int = 30078) -> NostrEvent: + """Create a fleet broadcast (unencrypted, kind 30078 = app-specific).""" + event = NostrEvent( + kind=kind, + content=message, + tags=[["d", f"fleet-{self.wizard_name}"], ["t", "timmy-fleet"]], + ) + event.sign(self.keypair) + return event + + def identity_card(self) -> dict: + """Return this wizard's Nostr identity for sharing.""" + return { + "wizard": self.wizard_name, + "npub": self.keypair.npub, + "pubkey_hex": self.keypair.pubkey_hex, + } + + +# === CLI Entry Point === + +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage:") + print(" python3 nostr_agent.py keygen ") + print(" python3 nostr_agent.py identity ") + print(" python3 nostr_agent.py encrypt ") + print(" python3 nostr_agent.py directory") + sys.exit(0) + + cmd = sys.argv[1] + + if cmd == "keygen": + name = sys.argv[2] if len(sys.argv) > 2 else "ezra" + messenger = AgentMessenger(name) + card = messenger.identity_card() + print(f"Wizard: {card['wizard']}") + print(f"npub: {card['npub']}") + print(f"pubkey: {card['pubkey_hex']}") + print(f"Keys saved to: /root/wizards/{name}/protected/nostr_keys.json") + + elif cmd == "identity": + name = sys.argv[2] + messenger = AgentMessenger(name) + card = messenger.identity_card() + print(json.dumps(card, indent=2)) + + elif cmd == "encrypt": + sender = sys.argv[2] + recipient = sys.argv[3] + message = " ".join(sys.argv[4:]) + # Generate keys for both if needed + sender_msg = AgentMessenger(sender) + recipient_msg = AgentMessenger(recipient) + # Create and read DM + event = sender_msg.create_dm(recipient, message) + print(f"Event ID: {event.id}") + print(f"Encrypted content: {event.content[:80]}...") + decrypted = recipient_msg.read_dm(event) + print(f"Decrypted: {decrypted}") + print("Round-trip encryption: OK" if decrypted == message else "ENCRYPTION FAILED") + + elif cmd == "directory": + directory = FleetDirectory() + for name, entry in directory.list_all().items(): + print(f" {name}: {entry['npub'][:20]}... ({entry.get('role', 'unknown')})")