Implements NIP-01 (events), NIP-04/44 (encrypted DMs), NIP-19 (bech32). Zero new deps except websockets. Keys generated for 4 VPS wizards. Fleet directory with auto-registration. ChaCha20-Poly1305 encryption.
296 lines
10 KiB
Python
296 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for Nostr agent messaging layer."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
from tools.nostr_agent import (
|
|
NostrKeypair,
|
|
NostrEvent,
|
|
FleetDirectory,
|
|
AgentMessenger,
|
|
bech32_encode,
|
|
bech32_decode,
|
|
encrypt_dm,
|
|
decrypt_dm,
|
|
)
|
|
|
|
|
|
class TestBech32(unittest.TestCase):
|
|
"""Test NIP-19 bech32 encoding."""
|
|
|
|
def test_encode_decode_roundtrip(self):
|
|
data = os.urandom(32)
|
|
encoded = bech32_encode("npub", data)
|
|
self.assertTrue(encoded.startswith("npub1"))
|
|
hrp, decoded = bech32_decode(encoded)
|
|
self.assertEqual(hrp, "npub")
|
|
self.assertEqual(decoded, data)
|
|
|
|
def test_nsec_encode_decode(self):
|
|
data = os.urandom(32)
|
|
encoded = bech32_encode("nsec", data)
|
|
self.assertTrue(encoded.startswith("nsec1"))
|
|
hrp, decoded = bech32_decode(encoded)
|
|
self.assertEqual(hrp, "nsec")
|
|
self.assertEqual(decoded, data)
|
|
|
|
|
|
class TestNostrKeypair(unittest.TestCase):
|
|
"""Test key generation and management."""
|
|
|
|
def test_generate(self):
|
|
kp = NostrKeypair.generate()
|
|
self.assertEqual(len(kp.public_key), 32)
|
|
self.assertEqual(len(kp.secret_key), 32)
|
|
self.assertTrue(kp.npub.startswith("npub1"))
|
|
self.assertTrue(kp.nsec.startswith("nsec1"))
|
|
|
|
def test_pubkey_hex(self):
|
|
kp = NostrKeypair.generate()
|
|
self.assertEqual(len(kp.pubkey_hex), 64)
|
|
self.assertEqual(len(kp.seckey_hex), 64)
|
|
|
|
def test_deterministic_from_nsec(self):
|
|
kp1 = NostrKeypair.generate()
|
|
kp2 = NostrKeypair.from_nsec(kp1.nsec)
|
|
self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex)
|
|
self.assertEqual(kp1.seckey_hex, kp2.seckey_hex)
|
|
|
|
def test_from_hex(self):
|
|
kp1 = NostrKeypair.generate()
|
|
kp2 = NostrKeypair.from_hex(kp1.seckey_hex)
|
|
self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex)
|
|
|
|
def test_save_load(self):
|
|
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
|
|
path = f.name
|
|
try:
|
|
kp1 = NostrKeypair.generate()
|
|
kp1.save(path)
|
|
kp2 = NostrKeypair.load(path)
|
|
self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex)
|
|
self.assertEqual(kp1.seckey_hex, kp2.seckey_hex)
|
|
# Verify file permissions
|
|
mode = oct(os.stat(path).st_mode)[-3:]
|
|
self.assertEqual(mode, "600")
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
def test_load_or_create_new(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
path = os.path.join(tmp, "keys.json")
|
|
kp = NostrKeypair.load_or_create(path)
|
|
self.assertTrue(os.path.exists(path))
|
|
self.assertTrue(kp.npub.startswith("npub1"))
|
|
|
|
def test_load_or_create_existing(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
path = os.path.join(tmp, "keys.json")
|
|
kp1 = NostrKeypair.load_or_create(path)
|
|
kp2 = NostrKeypair.load_or_create(path)
|
|
self.assertEqual(kp1.pubkey_hex, kp2.pubkey_hex)
|
|
|
|
def test_two_keys_differ(self):
|
|
kp1 = NostrKeypair.generate()
|
|
kp2 = NostrKeypair.generate()
|
|
self.assertNotEqual(kp1.pubkey_hex, kp2.pubkey_hex)
|
|
|
|
|
|
class TestNostrEvent(unittest.TestCase):
|
|
"""Test event creation and signing."""
|
|
|
|
def test_create_event(self):
|
|
event = NostrEvent(kind=1, content="Hello Nostr!")
|
|
self.assertEqual(event.kind, 1)
|
|
self.assertEqual(event.content, "Hello Nostr!")
|
|
|
|
def test_compute_id(self):
|
|
kp = NostrKeypair.generate()
|
|
event = NostrEvent(kind=1, content="test", created_at=1234567890)
|
|
event.pubkey = kp.pubkey_hex
|
|
event_id = event.compute_id()
|
|
self.assertEqual(len(event_id), 64) # sha256 hex
|
|
# Deterministic
|
|
self.assertEqual(event.compute_id(), event_id)
|
|
|
|
def test_sign_event(self):
|
|
kp = NostrKeypair.generate()
|
|
event = NostrEvent(kind=1, content="signed message")
|
|
event.sign(kp)
|
|
self.assertEqual(event.pubkey, kp.pubkey_hex)
|
|
self.assertEqual(len(event.id), 64)
|
|
self.assertEqual(len(event.sig), 128) # 64 bytes hex
|
|
|
|
def test_to_dict(self):
|
|
kp = NostrKeypair.generate()
|
|
event = NostrEvent(kind=1, content="test")
|
|
event.sign(kp)
|
|
d = event.to_dict()
|
|
self.assertIn("id", d)
|
|
self.assertIn("pubkey", d)
|
|
self.assertIn("sig", d)
|
|
self.assertIn("content", d)
|
|
self.assertEqual(d["kind"], 1)
|
|
|
|
def test_to_relay_message(self):
|
|
kp = NostrKeypair.generate()
|
|
event = NostrEvent(kind=1, content="test")
|
|
event.sign(kp)
|
|
msg = event.to_relay_message()
|
|
parsed = json.loads(msg)
|
|
self.assertEqual(parsed[0], "EVENT")
|
|
self.assertEqual(parsed[1]["content"], "test")
|
|
|
|
def test_from_dict(self):
|
|
kp = NostrKeypair.generate()
|
|
event = NostrEvent(kind=1, content="roundtrip")
|
|
event.sign(kp)
|
|
d = event.to_dict()
|
|
event2 = NostrEvent.from_dict(d)
|
|
self.assertEqual(event2.id, event.id)
|
|
self.assertEqual(event2.content, event.content)
|
|
|
|
def test_tags(self):
|
|
event = NostrEvent(kind=1, content="tagged", tags=[["p", "abc123"], ["t", "fleet"]])
|
|
self.assertEqual(len(event.tags), 2)
|
|
self.assertEqual(event.tags[0][1], "abc123")
|
|
|
|
|
|
class TestEncryptedDM(unittest.TestCase):
|
|
"""Test encrypted direct messaging."""
|
|
|
|
def test_encrypt_decrypt_roundtrip(self):
|
|
alice = NostrKeypair.generate()
|
|
bob = NostrKeypair.generate()
|
|
message = "Hello Bob, this is Alice!"
|
|
encrypted = encrypt_dm(alice, bob.pubkey_hex, message)
|
|
self.assertNotEqual(encrypted, message)
|
|
self.assertIn("ciphertext", encrypted)
|
|
decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted)
|
|
self.assertEqual(decrypted, message)
|
|
|
|
def test_wrong_key_fails(self):
|
|
alice = NostrKeypair.generate()
|
|
bob = NostrKeypair.generate()
|
|
eve = NostrKeypair.generate()
|
|
encrypted = encrypt_dm(alice, bob.pubkey_hex, "secret")
|
|
with self.assertRaises(Exception):
|
|
decrypt_dm(eve, alice.pubkey_hex, encrypted)
|
|
|
|
def test_unicode_message(self):
|
|
alice = NostrKeypair.generate()
|
|
bob = NostrKeypair.generate()
|
|
message = "Hello 🧙♂️ こんにちは العربية"
|
|
encrypted = encrypt_dm(alice, bob.pubkey_hex, message)
|
|
decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted)
|
|
self.assertEqual(decrypted, message)
|
|
|
|
def test_empty_message(self):
|
|
alice = NostrKeypair.generate()
|
|
bob = NostrKeypair.generate()
|
|
encrypted = encrypt_dm(alice, bob.pubkey_hex, "")
|
|
decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted)
|
|
self.assertEqual(decrypted, "")
|
|
|
|
def test_long_message(self):
|
|
alice = NostrKeypair.generate()
|
|
bob = NostrKeypair.generate()
|
|
message = "x" * 10000
|
|
encrypted = encrypt_dm(alice, bob.pubkey_hex, message)
|
|
decrypted = decrypt_dm(bob, alice.pubkey_hex, encrypted)
|
|
self.assertEqual(decrypted, message)
|
|
|
|
|
|
class TestFleetDirectory(unittest.TestCase):
|
|
"""Test fleet directory management."""
|
|
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
self.dir_path = os.path.join(self.tmp_dir, "directory.json")
|
|
self.directory = FleetDirectory(self.dir_path)
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp_dir, ignore_errors=True)
|
|
|
|
def test_register_and_get(self):
|
|
self.directory.register("ezra", "npub1test...", "abcdef", "scribe")
|
|
entry = self.directory.get("ezra")
|
|
self.assertIsNotNone(entry)
|
|
self.assertEqual(entry["npub"], "npub1test...")
|
|
self.assertEqual(entry["role"], "scribe")
|
|
|
|
def test_get_unknown(self):
|
|
self.assertIsNone(self.directory.get("unknown"))
|
|
|
|
def test_get_pubkey(self):
|
|
self.directory.register("bilbo", "npub1...", "deadbeef", "hobbit")
|
|
self.assertEqual(self.directory.get_pubkey("bilbo"), "deadbeef")
|
|
self.assertIsNone(self.directory.get_pubkey("gandalf"))
|
|
|
|
def test_persistence(self):
|
|
self.directory.register("ezra", "npub1...", "abc", "scribe")
|
|
dir2 = FleetDirectory(self.dir_path)
|
|
self.assertIsNotNone(dir2.get("ezra"))
|
|
|
|
def test_list_all(self):
|
|
self.directory.register("ezra", "npub1...", "abc", "scribe")
|
|
self.directory.register("bilbo", "npub2...", "def", "hobbit")
|
|
all_entries = self.directory.list_all()
|
|
self.assertEqual(len(all_entries), 2)
|
|
|
|
|
|
class TestAgentMessenger(unittest.TestCase):
|
|
"""Test high-level agent messaging."""
|
|
|
|
def setUp(self):
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
|
|
def tearDown(self):
|
|
import shutil
|
|
shutil.rmtree(self.tmp_dir, ignore_errors=True)
|
|
|
|
def test_create_messenger(self):
|
|
msg = AgentMessenger("test-wizard", keys_dir=self.tmp_dir)
|
|
card = msg.identity_card()
|
|
self.assertEqual(card["wizard"], "test-wizard")
|
|
self.assertTrue(card["npub"].startswith("npub1"))
|
|
|
|
def test_dm_roundtrip(self):
|
|
ezra = AgentMessenger("ezra-test", keys_dir=os.path.join(self.tmp_dir, "ezra"))
|
|
bilbo = AgentMessenger("bilbo-test", keys_dir=os.path.join(self.tmp_dir, "bilbo"))
|
|
|
|
# Register each other
|
|
ezra.directory = bilbo.directory # Share directory for test
|
|
|
|
event = ezra.create_dm("bilbo-test", "Good morning from Ezra!")
|
|
self.assertEqual(event.kind, 4)
|
|
self.assertIn(bilbo.keypair.pubkey_hex, [t[1] for t in event.tags if t[0] == "p"])
|
|
|
|
decrypted = bilbo.read_dm(event)
|
|
self.assertEqual(decrypted, "Good morning from Ezra!")
|
|
|
|
def test_broadcast(self):
|
|
ezra = AgentMessenger("ezra-test", keys_dir=os.path.join(self.tmp_dir, "ezra"))
|
|
event = ezra.create_broadcast("Fleet status: all systems nominal")
|
|
self.assertEqual(event.kind, 30078)
|
|
self.assertEqual(event.content, "Fleet status: all systems nominal")
|
|
self.assertTrue(any(t[0] == "t" and t[1] == "timmy-fleet" for t in event.tags))
|
|
|
|
def test_identity_card(self):
|
|
msg = AgentMessenger("test", keys_dir=self.tmp_dir)
|
|
card = msg.identity_card()
|
|
self.assertIn("wizard", card)
|
|
self.assertIn("npub", card)
|
|
self.assertIn("pubkey_hex", card)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|