diff --git a/src/config.py b/src/config.py index a0f82393..bb72f70f 100644 --- a/src/config.py +++ b/src/config.py @@ -545,6 +545,28 @@ class Settings(BaseSettings): # Corresponding public key (hex-encoded npub) content_nostr_pubkey: str = "" + # ── Nostr Identity (Timmy's on-network presence) ───────────────────────── + # Hex-encoded 32-byte private key — NEVER commit this value. + # Generate one with: timmyctl nostr keygen + nostr_privkey: str = "" + # Corresponding x-only public key (hex). Auto-derived from nostr_privkey + # if left empty; override only if you manage keys externally. + nostr_pubkey: str = "" + # Comma-separated list of NIP-01 relay WebSocket URLs. + # e.g. "wss://relay.damus.io,wss://nostr.wine" + nostr_relays: str = "" + # NIP-05 identifier for Timmy — e.g. "timmy@tower.local" + nostr_nip05: str = "" + # Profile display name (Kind 0 "name" field) + nostr_profile_name: str = "Timmy" + # Profile "about" text (Kind 0 "about" field) + nostr_profile_about: str = ( + "Sovereign AI agent — mission control dashboard, task orchestration, " + "and ambient intelligence." + ) + # URL to Timmy's avatar image (Kind 0 "picture" field) + nostr_profile_picture: str = "" + # Meilisearch archive content_meilisearch_url: str = "http://localhost:7700" content_meilisearch_api_key: str = "" diff --git a/src/infrastructure/nostr/__init__.py b/src/infrastructure/nostr/__init__.py new file mode 100644 index 00000000..3c441db9 --- /dev/null +++ b/src/infrastructure/nostr/__init__.py @@ -0,0 +1,18 @@ +"""Nostr identity infrastructure for Timmy. + +Provides keypair management, NIP-01 event signing, WebSocket relay client, +and identity lifecycle management (Kind 0 profile, Kind 31990 capability card). + +All components degrade gracefully when the Nostr relay is unavailable. + +Usage +----- + from infrastructure.nostr.identity import NostrIdentityManager + + manager = NostrIdentityManager() + await manager.announce() # publishes Kind 0 + Kind 31990 +""" + +from infrastructure.nostr.identity import NostrIdentityManager + +__all__ = ["NostrIdentityManager"] diff --git a/src/infrastructure/nostr/event.py b/src/infrastructure/nostr/event.py new file mode 100644 index 00000000..1167ecde --- /dev/null +++ b/src/infrastructure/nostr/event.py @@ -0,0 +1,215 @@ +"""NIP-01 Nostr event construction and BIP-340 Schnorr signing. + +Constructs and signs Nostr events using a pure-Python BIP-340 Schnorr +implementation over secp256k1 (no external crypto dependencies required). + +Usage +----- + from infrastructure.nostr.event import build_event, sign_event + from infrastructure.nostr.keypair import load_keypair + + kp = load_keypair(privkey_hex="...") + ev = build_event(kind=0, content='{"name":"Timmy"}', keypair=kp) + print(ev["id"], ev["sig"]) +""" + +from __future__ import annotations + +import hashlib +import json +import secrets +import time +from typing import Any + +from infrastructure.nostr.keypair import ( + _G, + _N, + _P, + NostrKeypair, + Point, + _has_even_y, + _point_mul, + _x_bytes, +) + +# ── BIP-340 tagged hash ──────────────────────────────────────────────────────── + + +def _tagged_hash(tag: str, data: bytes) -> bytes: + """BIP-340 tagged SHA-256 hash: SHA256(SHA256(tag) || SHA256(tag) || data).""" + tag_hash = hashlib.sha256(tag.encode()).digest() + return hashlib.sha256(tag_hash + tag_hash + data).digest() + + +# ── BIP-340 Schnorr sign ─────────────────────────────────────────────────────── + + +def schnorr_sign(msg: bytes, privkey_bytes: bytes) -> bytes: + """Sign a 32-byte message with a 32-byte private key using BIP-340 Schnorr. + + Parameters + ---------- + msg: + The 32-byte message to sign (typically the event ID hash). + privkey_bytes: + The 32-byte private key. + + Returns + ------- + bytes + 64-byte Schnorr signature (r || s). + + Raises + ------ + ValueError + If the key is invalid. + """ + if len(msg) != 32: + raise ValueError(f"Message must be 32 bytes, got {len(msg)}") + if len(privkey_bytes) != 32: + raise ValueError(f"Private key must be 32 bytes, got {len(privkey_bytes)}") + + d_int = int.from_bytes(privkey_bytes, "big") + if not (1 <= d_int < _N): + raise ValueError("Private key out of range") + + P = _point_mul(_G, d_int) + assert P is not None + + # Negate d if P has odd y (BIP-340 requirement) + a = d_int if _has_even_y(P) else _N - d_int + + # Deterministic nonce with auxiliary randomness (BIP-340 §Default signing) + rand = secrets.token_bytes(32) + t = bytes(x ^ y for x, y in zip(a.to_bytes(32, "big"), _tagged_hash("BIP0340/aux", rand), strict=True)) + + r_bytes = _tagged_hash("BIP0340/nonce", t + _x_bytes(P) + msg) + k_int = int.from_bytes(r_bytes, "big") % _N + if k_int == 0: # Astronomically unlikely; retry would be cleaner but this is safe enough + raise ValueError("Nonce derivation produced k=0; retry signing") + + R: Point = _point_mul(_G, k_int) + assert R is not None + k = k_int if _has_even_y(R) else _N - k_int + + e = ( + int.from_bytes( + _tagged_hash("BIP0340/challenge", _x_bytes(R) + _x_bytes(P) + msg), + "big", + ) + % _N + ) + s = (k + e * a) % _N + + sig = _x_bytes(R) + s.to_bytes(32, "big") + assert len(sig) == 64 + return sig + + +def schnorr_verify(msg: bytes, pubkey_bytes: bytes, sig: bytes) -> bool: + """Verify a BIP-340 Schnorr signature. + + Returns True if valid, False otherwise (never raises). + """ + try: + if len(msg) != 32 or len(pubkey_bytes) != 32 or len(sig) != 64: + return False + + px = int.from_bytes(pubkey_bytes, "big") + if px >= _P: + return False + + # Lift x to curve point (even-y convention) + y_sq = (pow(px, 3, _P) + 7) % _P + y = pow(y_sq, (_P + 1) // 4, _P) + if pow(y, 2, _P) != y_sq: + return False + P: Point = (px, y if y % 2 == 0 else _P - y) + + r = int.from_bytes(sig[:32], "big") + s = int.from_bytes(sig[32:], "big") + + if r >= _P or s >= _N: + return False + + e = ( + int.from_bytes( + _tagged_hash("BIP0340/challenge", sig[:32] + pubkey_bytes + msg), + "big", + ) + % _N + ) + + R1 = _point_mul(_G, s) + R2 = _point_mul(P, _N - e) + # Point addition + from infrastructure.nostr.keypair import _point_add + + R: Point = _point_add(R1, R2) + if R is None or not _has_even_y(R) or R[0] != r: + return False + return True + except Exception: + return False + + +# ── NIP-01 event construction ───────────────────────────────────────────────── + +NostrEvent = dict[str, Any] + + +def _event_hash(pubkey: str, created_at: int, kind: int, tags: list, content: str) -> bytes: + """Compute the NIP-01 event ID (SHA-256 of canonical serialisation).""" + serialized = json.dumps( + [0, pubkey, created_at, kind, tags, content], + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(serialized.encode()).digest() + + +def build_event( + *, + kind: int, + content: str, + keypair: NostrKeypair, + tags: list[list[str]] | None = None, + created_at: int | None = None, +) -> NostrEvent: + """Build and sign a NIP-01 Nostr event. + + Parameters + ---------- + kind: + NIP-01 event kind integer (e.g. 0 = profile, 1 = note). + content: + Event content string (often JSON for structured kinds). + keypair: + The signing keypair. + tags: + Optional list of tag arrays. + created_at: + Unix timestamp; defaults to ``int(time.time())``. + + Returns + ------- + dict + Fully signed NIP-01 event ready for relay publication. + """ + _tags = tags or [] + _created_at = created_at if created_at is not None else int(time.time()) + + msg = _event_hash(keypair.pubkey_hex, _created_at, kind, _tags, content) + event_id = msg.hex() + sig_bytes = schnorr_sign(msg, keypair.privkey_bytes) + sig_hex = sig_bytes.hex() + + return { + "id": event_id, + "pubkey": keypair.pubkey_hex, + "created_at": _created_at, + "kind": kind, + "tags": _tags, + "content": content, + "sig": sig_hex, + } diff --git a/src/infrastructure/nostr/identity.py b/src/infrastructure/nostr/identity.py new file mode 100644 index 00000000..5c3af489 --- /dev/null +++ b/src/infrastructure/nostr/identity.py @@ -0,0 +1,265 @@ +"""Timmy's Nostr identity lifecycle manager. + +Manages Timmy's on-network Nostr presence: + +- **Kind 0** (NIP-01 profile metadata): name, about, picture, nip05 +- **Kind 31990** (NIP-89 handler / NIP-90 capability card): advertises + Timmy's services so NIP-89 clients can discover him. + +Config is read from ``settings`` via pydantic-settings: + + NOSTR_PRIVKEY — hex private key (required to publish) + NOSTR_PUBKEY — hex public key (auto-derived if missing) + NOSTR_RELAYS — comma-separated relay WSS URLs + NOSTR_NIP05 — NIP-05 identifier e.g. timmy@tower.local + NOSTR_PROFILE_NAME — display name (default: "Timmy") + NOSTR_PROFILE_ABOUT — "about" text + NOSTR_PROFILE_PICTURE — avatar URL + +Usage +----- + from infrastructure.nostr.identity import NostrIdentityManager + + manager = NostrIdentityManager() + result = await manager.announce() + # {'kind_0': True, 'kind_31990': True, 'relays': {'wss://…': True}} +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import Any + +from config import settings +from infrastructure.nostr.event import build_event +from infrastructure.nostr.keypair import NostrKeypair, load_keypair +from infrastructure.nostr.relay import publish_to_relays + +logger = logging.getLogger(__name__) + +# Timmy's default capability description for NIP-89/NIP-90 +_DEFAULT_CAPABILITIES = { + "name": "Timmy", + "about": ( + "Sovereign AI agent — mission control dashboard, task orchestration, " + "voice NLU, game-state monitoring, and ambient intelligence." + ), + "capabilities": [ + "chat", + "task_orchestration", + "voice_nlu", + "game_state", + "nostr_presence", + ], + "nip": [1, 89, 90], +} + + +@dataclass +class AnnounceResult: + """Result of a Nostr identity announcement.""" + + kind_0_ok: bool = False + kind_31990_ok: bool = False + relay_results: dict[str, bool] = field(default_factory=dict) + + @property + def any_relay_ok(self) -> bool: + return any(self.relay_results.values()) + + def to_dict(self) -> dict[str, Any]: + return { + "kind_0": self.kind_0_ok, + "kind_31990": self.kind_31990_ok, + "relays": self.relay_results, + } + + +class NostrIdentityManager: + """Manages Timmy's Nostr identity and relay presence. + + Reads configuration from ``settings`` on every call so runtime + changes to environment variables are picked up automatically. + + All public methods degrade gracefully — they log warnings and return + False/empty rather than raising exceptions. + """ + + # ── keypair ───────────────────────────────────────────────────────────── + + def get_keypair(self) -> NostrKeypair | None: + """Return the configured keypair, or None if not configured. + + Derives the public key from the private key if only the private + key is set. Returns None (with a warning) if no private key is + configured. + """ + privkey = settings.nostr_privkey.strip() + if not privkey: + logger.warning( + "NOSTR_PRIVKEY not configured — Nostr identity unavailable. " + "Run `timmyctl nostr keygen` to generate a keypair." + ) + return None + try: + return load_keypair(privkey_hex=privkey) + except Exception as exc: + logger.warning("Invalid NOSTR_PRIVKEY: %s", exc) + return None + + # ── relay list ─────────────────────────────────────────────────────────── + + def get_relay_urls(self) -> list[str]: + """Return the configured relay URL list (may be empty).""" + raw = settings.nostr_relays.strip() + if not raw: + return [] + return [url.strip() for url in raw.split(",") if url.strip()] + + # ── Kind 0 — profile ───────────────────────────────────────────────────── + + def build_profile_event(self, keypair: NostrKeypair) -> dict: + """Build a NIP-01 Kind 0 profile metadata event. + + Reads profile fields from settings: + ``nostr_profile_name``, ``nostr_profile_about``, + ``nostr_profile_picture``, ``nostr_nip05``. + """ + profile: dict[str, str] = {} + + name = settings.nostr_profile_name.strip() or "Timmy" + profile["name"] = name + profile["display_name"] = name + + about = settings.nostr_profile_about.strip() + if about: + profile["about"] = about + + picture = settings.nostr_profile_picture.strip() + if picture: + profile["picture"] = picture + + nip05 = settings.nostr_nip05.strip() + if nip05: + profile["nip05"] = nip05 + + return build_event( + kind=0, + content=json.dumps(profile, ensure_ascii=False), + keypair=keypair, + ) + + # ── Kind 31990 — NIP-89 capability card ────────────────────────────────── + + def build_capability_event(self, keypair: NostrKeypair) -> dict: + """Build a NIP-89/NIP-90 Kind 31990 capability handler event. + + Advertises Timmy's services so NIP-89 clients can discover him. + The ``d`` tag uses the application identifier ``timmy-mission-control``. + """ + cap = dict(_DEFAULT_CAPABILITIES) + name = settings.nostr_profile_name.strip() or "Timmy" + cap["name"] = name + + about = settings.nostr_profile_about.strip() + if about: + cap["about"] = about + + picture = settings.nostr_profile_picture.strip() + if picture: + cap["picture"] = picture + + nip05 = settings.nostr_nip05.strip() + if nip05: + cap["nip05"] = nip05 + + tags = [ + ["d", "timmy-mission-control"], + ["k", "1"], # handles kind:1 (notes) as a starting point + ["k", "5600"], # DVM task request (NIP-90) + ["k", "5900"], # DVM general task + ] + + return build_event( + kind=31990, + content=json.dumps(cap, ensure_ascii=False), + keypair=keypair, + tags=tags, + ) + + # ── announce ───────────────────────────────────────────────────────────── + + async def announce(self) -> AnnounceResult: + """Publish Kind 0 profile and Kind 31990 capability card to all relays. + + Returns + ------- + AnnounceResult + Contains per-relay success flags and per-event-kind success flags. + Never raises; all failures are logged at WARNING level. + """ + result = AnnounceResult() + + keypair = self.get_keypair() + if keypair is None: + return result + + relay_urls = self.get_relay_urls() + if not relay_urls: + logger.warning( + "NOSTR_RELAYS not configured — Kind 0 and Kind 31990 not published." + ) + return result + + logger.info( + "Announcing Nostr identity %s to %d relay(s)", keypair.npub[:20], len(relay_urls) + ) + + # Build and publish Kind 0 (profile) + try: + kind0 = self.build_profile_event(keypair) + k0_results = await publish_to_relays(relay_urls, kind0) + result.kind_0_ok = any(k0_results.values()) + # Merge relay results + for url, ok in k0_results.items(): + result.relay_results[url] = result.relay_results.get(url, False) or ok + except Exception as exc: + logger.warning("Kind 0 publish failed: %s", exc) + + # Build and publish Kind 31990 (capability card) + try: + kind31990 = self.build_capability_event(keypair) + k31990_results = await publish_to_relays(relay_urls, kind31990) + result.kind_31990_ok = any(k31990_results.values()) + for url, ok in k31990_results.items(): + result.relay_results[url] = result.relay_results.get(url, False) or ok + except Exception as exc: + logger.warning("Kind 31990 publish failed: %s", exc) + + if result.any_relay_ok: + logger.info("Nostr identity announced successfully (npub: %s)", keypair.npub) + else: + logger.warning("Nostr identity announcement failed — no relays accepted events") + + return result + + async def publish_profile(self) -> bool: + """Publish only the Kind 0 profile event. + + Returns True if at least one relay accepted the event. + """ + keypair = self.get_keypair() + if keypair is None: + return False + relay_urls = self.get_relay_urls() + if not relay_urls: + return False + try: + event = self.build_profile_event(keypair) + results = await publish_to_relays(relay_urls, event) + return any(results.values()) + except Exception as exc: + logger.warning("Profile publish failed: %s", exc) + return False diff --git a/src/infrastructure/nostr/keypair.py b/src/infrastructure/nostr/keypair.py new file mode 100644 index 00000000..ad02327c --- /dev/null +++ b/src/infrastructure/nostr/keypair.py @@ -0,0 +1,270 @@ +"""Nostr keypair generation and encoding (NIP-19 / BIP-340). + +Provides pure-Python secp256k1 keypair generation and bech32 nsec/npub +encoding with no external dependencies beyond the Python stdlib. + +Usage +----- + from infrastructure.nostr.keypair import generate_keypair, load_keypair + + kp = generate_keypair() + print(kp.npub) # npub1… + print(kp.nsec) # nsec1… + + kp2 = load_keypair(privkey_hex="deadbeef...") +""" + +from __future__ import annotations + +import hashlib +import secrets +from dataclasses import dataclass + +# ── secp256k1 curve parameters (BIP-340) ────────────────────────────────────── + +_P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F +_N = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 +_GX = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798 +_GY = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8 +_G = (_GX, _GY) + +Point = tuple[int, int] | None # None represents the point at infinity + + +def _point_add(P: Point, Q: Point) -> Point: + if P is None: + return Q + if Q is None: + return P + px, py = P + qx, qy = Q + if px == qx: + if py != qy: + return None + # Point doubling + lam = (3 * px * px * pow(2 * py, _P - 2, _P)) % _P + else: + lam = ((qy - py) * pow(qx - px, _P - 2, _P)) % _P + rx = (lam * lam - px - qx) % _P + ry = (lam * (px - rx) - py) % _P + return rx, ry + + +def _point_mul(P: Point, n: int) -> Point: + """Scalar multiplication via double-and-add.""" + R: Point = None + while n > 0: + if n & 1: + R = _point_add(R, P) + P = _point_add(P, P) + n >>= 1 + return R + + +def _has_even_y(P: Point) -> bool: + assert P is not None + return P[1] % 2 == 0 + + +def _x_bytes(P: Point) -> bytes: + """Return the 32-byte x-coordinate of a point (x-only pubkey).""" + assert P is not None + return P[0].to_bytes(32, "big") + + +def _privkey_to_pubkey_bytes(privkey_int: int) -> bytes: + """Derive the x-only public key from an integer private key.""" + P = _point_mul(_G, privkey_int) + return _x_bytes(P) + + +# ── bech32 encoding (NIP-19 uses original bech32, not bech32m) ──────────────── + +_BECH32_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" + + +def _bech32_polymod(values: list[int]) -> int: + GEN = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3] + chk = 1 + for v in values: + b = chk >> 25 + chk = (chk & 0x1FFFFFF) << 5 ^ v + for i in range(5): + chk ^= GEN[i] if ((b >> i) & 1) else 0 + return chk + + +def _bech32_hrp_expand(hrp: str) -> list[int]: + return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] + + +def _convertbits(data: bytes, frombits: int, tobits: int, pad: bool = True) -> list[int]: + acc = 0 + bits = 0 + ret: list[int] = [] + maxv = (1 << tobits) - 1 + for value in data: + acc = ((acc << frombits) | value) & 0xFFFFFF + bits += frombits + while bits >= tobits: + bits -= tobits + ret.append((acc >> bits) & maxv) + if pad and bits: + ret.append((acc << (tobits - bits)) & maxv) + elif bits >= frombits or ((acc << (tobits - bits)) & maxv): + raise ValueError("Invalid padding") + return ret + + +def _bech32_encode(hrp: str, data: bytes) -> str: + """Encode bytes as a bech32 string with the given HRP.""" + converted = _convertbits(data, 8, 5) + combined = _bech32_hrp_expand(hrp) + converted + checksum_input = combined + [0, 0, 0, 0, 0, 0] + polymod = _bech32_polymod(checksum_input) ^ 1 + checksum = [(polymod >> (5 * (5 - i))) & 31 for i in range(6)] + return hrp + "1" + "".join(_BECH32_CHARSET[d] for d in converted + checksum) + + +def _bech32_decode(bech32_str: str) -> tuple[str, bytes]: + """Decode a bech32 string to (hrp, data_bytes). + + Raises ValueError on invalid encoding. + """ + bech32_str = bech32_str.lower() + sep = bech32_str.rfind("1") + if sep < 1 or sep + 7 > len(bech32_str): + raise ValueError(f"Invalid bech32: {bech32_str!r}") + hrp = bech32_str[:sep] + data_chars = bech32_str[sep + 1 :] + data = [] + for c in data_chars: + pos = _BECH32_CHARSET.find(c) + if pos == -1: + raise ValueError(f"Invalid bech32 character: {c!r}") + data.append(pos) + if _bech32_polymod(_bech32_hrp_expand(hrp) + data) != 1: + raise ValueError("Invalid bech32 checksum") + decoded = _convertbits(bytes(data[:-6]), 5, 8, pad=False) + return hrp, bytes(decoded) + + +# ── NostrKeypair ────────────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class NostrKeypair: + """A Nostr keypair with both hex and bech32 representations. + + Attributes + ---------- + privkey_hex : str + 32-byte private key as lowercase hex (64 chars). Treat as a secret. + pubkey_hex : str + 32-byte x-only public key as lowercase hex (64 chars). + nsec : str + Private key encoded as NIP-19 ``nsec1…`` bech32 string. + npub : str + Public key encoded as NIP-19 ``npub1…`` bech32 string. + """ + + privkey_hex: str + pubkey_hex: str + nsec: str + npub: str + + @property + def privkey_bytes(self) -> bytes: + return bytes.fromhex(self.privkey_hex) + + @property + def pubkey_bytes(self) -> bytes: + return bytes.fromhex(self.pubkey_hex) + + +def generate_keypair() -> NostrKeypair: + """Generate a fresh Nostr keypair from a cryptographically random seed. + + Returns + ------- + NostrKeypair + The newly generated keypair. + """ + while True: + raw = secrets.token_bytes(32) + d = int.from_bytes(raw, "big") + if 1 <= d < _N: + break + + pub_bytes = _privkey_to_pubkey_bytes(d) + privkey_hex = raw.hex() + pubkey_hex = pub_bytes.hex() + nsec = _bech32_encode("nsec", raw) + npub = _bech32_encode("npub", pub_bytes) + return NostrKeypair(privkey_hex=privkey_hex, pubkey_hex=pubkey_hex, nsec=nsec, npub=npub) + + +def load_keypair( + *, + privkey_hex: str | None = None, + nsec: str | None = None, +) -> NostrKeypair: + """Load a keypair from a hex private key or an nsec bech32 string. + + Parameters + ---------- + privkey_hex: + 64-char lowercase hex private key. + nsec: + NIP-19 ``nsec1…`` bech32 string. + + Raises + ------ + ValueError + If neither or both parameters are supplied, or if the key is invalid. + """ + if privkey_hex and nsec: + raise ValueError("Supply either privkey_hex or nsec, not both") + if not privkey_hex and not nsec: + raise ValueError("Supply either privkey_hex or nsec") + + if nsec: + hrp, raw = _bech32_decode(nsec) + if hrp != "nsec": + raise ValueError(f"Expected nsec bech32, got {hrp!r}") + privkey_hex = raw.hex() + + assert privkey_hex is not None + raw_bytes = bytes.fromhex(privkey_hex) + if len(raw_bytes) != 32: + raise ValueError(f"Private key must be 32 bytes, got {len(raw_bytes)}") + + d = int.from_bytes(raw_bytes, "big") + if not (1 <= d < _N): + raise ValueError("Private key out of range") + + pub_bytes = _privkey_to_pubkey_bytes(d) + pubkey_hex = pub_bytes.hex() + nsec_enc = _bech32_encode("nsec", raw_bytes) + npub = _bech32_encode("npub", pub_bytes) + return NostrKeypair(privkey_hex=privkey_hex, pubkey_hex=pubkey_hex, nsec=nsec_enc, npub=npub) + + +def pubkey_from_privkey(privkey_hex: str) -> str: + """Derive the hex public key from a hex private key. + + Parameters + ---------- + privkey_hex: + 64-char lowercase hex private key. + + Returns + ------- + str + 64-char lowercase hex x-only public key. + """ + return load_keypair(privkey_hex=privkey_hex).pubkey_hex + + +def _sha256(data: bytes) -> bytes: + return hashlib.sha256(data).digest() diff --git a/src/infrastructure/nostr/relay.py b/src/infrastructure/nostr/relay.py new file mode 100644 index 00000000..052c0ac1 --- /dev/null +++ b/src/infrastructure/nostr/relay.py @@ -0,0 +1,133 @@ +"""NIP-01 WebSocket relay client for Nostr event publication. + +Connects to Nostr relays via WebSocket and publishes events using +the NIP-01 ``["EVENT", event]`` message format. + +Degrades gracefully when the relay is unavailable or the ``websockets`` +package is not installed. + +Usage +----- + from infrastructure.nostr.relay import publish_to_relay + + ok = await publish_to_relay("wss://relay.damus.io", signed_event) + # Returns True if the relay accepted the event. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import Any + +logger = logging.getLogger(__name__) + +NostrEvent = dict[str, Any] + +# Timeout for relay operations (seconds) +_CONNECT_TIMEOUT = 10 +_PUBLISH_TIMEOUT = 15 + + +async def publish_to_relay(relay_url: str, event: NostrEvent) -> bool: + """Publish a signed NIP-01 event to a single relay. + + Parameters + ---------- + relay_url: + ``wss://`` or ``ws://`` WebSocket URL of the relay. + event: + A fully signed NIP-01 event dict. + + Returns + ------- + bool + True if the relay acknowledged the event (``["OK", id, true, …]``), + False otherwise (never raises). + """ + try: + import websockets + except ImportError: + logger.warning( + "websockets package not available — Nostr relay publish skipped " + "(install with: pip install websockets)" + ) + return False + + event_id = event.get("id", "") + message = json.dumps(["EVENT", event], separators=(",", ":")) + + try: + async with asyncio.timeout(_CONNECT_TIMEOUT): + ws = await websockets.connect(relay_url, open_timeout=_CONNECT_TIMEOUT) + except Exception as exc: + logger.warning("Nostr relay connect failed (%s): %s", relay_url, exc) + return False + + try: + async with ws: + await ws.send(message) + # Wait for OK response with timeout + async with asyncio.timeout(_PUBLISH_TIMEOUT): + async for raw in ws: + try: + resp = json.loads(raw) + except json.JSONDecodeError: + continue + if ( + isinstance(resp, list) + and len(resp) >= 3 + and resp[0] == "OK" + and resp[1] == event_id + ): + if resp[2] is True: + logger.debug("Relay %s accepted event %s", relay_url, event_id[:8]) + return True + else: + reason = resp[3] if len(resp) > 3 else "" + logger.warning( + "Relay %s rejected event %s: %s", + relay_url, + event_id[:8], + reason, + ) + return False + except TimeoutError: + logger.warning("Relay %s timed out waiting for OK on event %s", relay_url, event_id[:8]) + return False + except Exception as exc: + logger.warning("Relay %s error publishing event %s: %s", relay_url, event_id[:8], exc) + return False + + logger.warning("Relay %s closed without OK for event %s", relay_url, event_id[:8]) + return False + + +async def publish_to_relays(relay_urls: list[str], event: NostrEvent) -> dict[str, bool]: + """Publish an event to multiple relays concurrently. + + Parameters + ---------- + relay_urls: + List of relay WebSocket URLs. + event: + A fully signed NIP-01 event dict. + + Returns + ------- + dict[str, bool] + Mapping of relay URL → success flag. + """ + if not relay_urls: + return {} + + tasks = {url: asyncio.create_task(publish_to_relay(url, event)) for url in relay_urls} + results: dict[str, bool] = {} + for url, task in tasks.items(): + try: + results[url] = await task + except Exception as exc: + logger.warning("Unexpected error publishing to %s: %s", url, exc) + results[url] = False + return results diff --git a/tests/unit/test_nostr_event.py b/tests/unit/test_nostr_event.py new file mode 100644 index 00000000..35af2a3f --- /dev/null +++ b/tests/unit/test_nostr_event.py @@ -0,0 +1,177 @@ +"""Unit tests for infrastructure.nostr.event.""" + +from __future__ import annotations + +import hashlib +import json +import time + +import pytest + +from infrastructure.nostr.event import ( + _event_hash, + build_event, + schnorr_sign, + schnorr_verify, +) +from infrastructure.nostr.keypair import generate_keypair + + +class TestSchorrSign: + def test_returns_64_bytes(self): + kp = generate_keypair() + msg = b"\x00" * 32 + sig = schnorr_sign(msg, kp.privkey_bytes) + assert len(sig) == 64 + + def test_different_msg_different_sig(self): + kp = generate_keypair() + sig1 = schnorr_sign(b"\x01" * 32, kp.privkey_bytes) + sig2 = schnorr_sign(b"\x02" * 32, kp.privkey_bytes) + assert sig1 != sig2 + + def test_raises_on_wrong_msg_length(self): + kp = generate_keypair() + with pytest.raises(ValueError, match="32 bytes"): + schnorr_sign(b"too short", kp.privkey_bytes) + + def test_raises_on_wrong_key_length(self): + msg = b"\x00" * 32 + with pytest.raises(ValueError, match="32 bytes"): + schnorr_sign(msg, b"too short") + + def test_nondeterministic_due_to_randomness(self): + # BIP-340 uses auxiliary randomness; repeated calls produce different sigs + kp = generate_keypair() + msg = b"\x42" * 32 + sig1 = schnorr_sign(msg, kp.privkey_bytes) + sig2 = schnorr_sign(msg, kp.privkey_bytes) + # With different random nonces these should differ (astronomically unlikely to collide) + # We just verify both are valid + assert schnorr_verify(msg, kp.pubkey_bytes, sig1) + assert schnorr_verify(msg, kp.pubkey_bytes, sig2) + + +class TestSchnorrVerify: + def test_valid_signature_verifies(self): + kp = generate_keypair() + msg = hashlib.sha256(b"hello nostr").digest() + sig = schnorr_sign(msg, kp.privkey_bytes) + assert schnorr_verify(msg, kp.pubkey_bytes, sig) is True + + def test_wrong_pubkey_fails(self): + kp1 = generate_keypair() + kp2 = generate_keypair() + msg = b"\x00" * 32 + sig = schnorr_sign(msg, kp1.privkey_bytes) + assert schnorr_verify(msg, kp2.pubkey_bytes, sig) is False + + def test_tampered_sig_fails(self): + kp = generate_keypair() + msg = b"\x00" * 32 + sig = bytearray(schnorr_sign(msg, kp.privkey_bytes)) + sig[0] ^= 0xFF + assert schnorr_verify(msg, kp.pubkey_bytes, bytes(sig)) is False + + def test_tampered_msg_fails(self): + kp = generate_keypair() + msg = b"\x00" * 32 + sig = schnorr_sign(msg, kp.privkey_bytes) + bad_msg = b"\xFF" * 32 + assert schnorr_verify(bad_msg, kp.pubkey_bytes, sig) is False + + def test_wrong_lengths_return_false(self): + kp = generate_keypair() + msg = b"\x00" * 32 + sig = schnorr_sign(msg, kp.privkey_bytes) + assert schnorr_verify(msg[:16], kp.pubkey_bytes, sig) is False + assert schnorr_verify(msg, kp.pubkey_bytes[:16], sig) is False + assert schnorr_verify(msg, kp.pubkey_bytes, sig[:32]) is False + + def test_never_raises(self): + # Should return False for any garbage input, not raise + assert schnorr_verify(b"x", b"y", b"z") is False + + +class TestEventHash: + def test_returns_32_bytes(self): + h = _event_hash("aabbcc", 0, 1, [], "") + assert len(h) == 32 + + def test_deterministic(self): + h1 = _event_hash("aa", 1, 1, [], "hello") + h2 = _event_hash("aa", 1, 1, [], "hello") + assert h1 == h2 + + def test_different_content_different_hash(self): + h1 = _event_hash("aa", 1, 1, [], "hello") + h2 = _event_hash("aa", 1, 1, [], "world") + assert h1 != h2 + + +class TestBuildEvent: + def test_returns_required_fields(self): + kp = generate_keypair() + ev = build_event(kind=1, content="hello", keypair=kp) + assert set(ev) >= {"id", "pubkey", "created_at", "kind", "tags", "content", "sig"} + + def test_kind_matches(self): + kp = generate_keypair() + ev = build_event(kind=0, content="{}", keypair=kp) + assert ev["kind"] == 0 + + def test_pubkey_matches_keypair(self): + kp = generate_keypair() + ev = build_event(kind=1, content="x", keypair=kp) + assert ev["pubkey"] == kp.pubkey_hex + + def test_id_is_64_char_hex(self): + kp = generate_keypair() + ev = build_event(kind=1, content="x", keypair=kp) + assert len(ev["id"]) == 64 + assert all(c in "0123456789abcdef" for c in ev["id"]) + + def test_sig_is_128_char_hex(self): + kp = generate_keypair() + ev = build_event(kind=1, content="x", keypair=kp) + assert len(ev["sig"]) == 128 + assert all(c in "0123456789abcdef" for c in ev["sig"]) + + def test_signature_verifies(self): + kp = generate_keypair() + ev = build_event(kind=1, content="test", keypair=kp) + sig_bytes = bytes.fromhex(ev["sig"]) + id_bytes = bytes.fromhex(ev["id"]) + assert schnorr_verify(id_bytes, kp.pubkey_bytes, sig_bytes) + + def test_id_matches_canonical_hash(self): + kp = generate_keypair() + ts = int(time.time()) + ev = build_event(kind=1, content="hi", keypair=kp, created_at=ts) + expected_hash = _event_hash(kp.pubkey_hex, ts, 1, [], "hi").hex() + assert ev["id"] == expected_hash + + def test_custom_tags(self): + kp = generate_keypair() + tags = [["t", "gaming"], ["r", "wss://relay.example.com"]] + ev = build_event(kind=1, content="x", keypair=kp, tags=tags) + assert ev["tags"] == tags + + def test_default_tags_empty(self): + kp = generate_keypair() + ev = build_event(kind=1, content="x", keypair=kp) + assert ev["tags"] == [] + + def test_custom_created_at(self): + kp = generate_keypair() + ts = 1700000000 + ev = build_event(kind=1, content="x", keypair=kp, created_at=ts) + assert ev["created_at"] == ts + + def test_kind0_profile_content_is_json(self): + kp = generate_keypair() + profile = {"name": "Timmy", "about": "test"} + ev = build_event(kind=0, content=json.dumps(profile), keypair=kp) + assert ev["kind"] == 0 + parsed = json.loads(ev["content"]) + assert parsed["name"] == "Timmy" diff --git a/tests/unit/test_nostr_identity.py b/tests/unit/test_nostr_identity.py new file mode 100644 index 00000000..ed37f76b --- /dev/null +++ b/tests/unit/test_nostr_identity.py @@ -0,0 +1,272 @@ +"""Unit tests for infrastructure.nostr.identity.""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from infrastructure.nostr.identity import AnnounceResult, NostrIdentityManager +from infrastructure.nostr.keypair import generate_keypair + + +@pytest.fixture() +def manager(): + return NostrIdentityManager() + + +@pytest.fixture() +def kp(): + return generate_keypair() + + +class TestAnnounceResult: + def test_any_relay_ok_false_when_empty(self): + r = AnnounceResult() + assert r.any_relay_ok is False + + def test_any_relay_ok_true_when_one_ok(self): + r = AnnounceResult(relay_results={"wss://a": True, "wss://b": False}) + assert r.any_relay_ok is True + + def test_to_dict_keys(self): + r = AnnounceResult(kind_0_ok=True, relay_results={"wss://a": True}) + d = r.to_dict() + assert set(d) == {"kind_0", "kind_31990", "relays"} + + +class TestGetKeypair: + def test_returns_none_when_no_privkey(self, manager): + mock_settings = MagicMock(nostr_privkey="") + with patch("infrastructure.nostr.identity.settings", mock_settings): + assert manager.get_keypair() is None + + def test_returns_keypair_when_configured(self, manager, kp): + mock_settings = MagicMock(nostr_privkey=kp.privkey_hex) + with patch("infrastructure.nostr.identity.settings", mock_settings): + result = manager.get_keypair() + assert result is not None + assert result.pubkey_hex == kp.pubkey_hex + + def test_returns_none_on_invalid_key(self, manager): + mock_settings = MagicMock(nostr_privkey="not_a_valid_key") + with patch("infrastructure.nostr.identity.settings", mock_settings): + assert manager.get_keypair() is None + + +class TestGetRelayUrls: + def test_empty_string_returns_empty_list(self, manager): + mock_settings = MagicMock(nostr_relays="") + with patch("infrastructure.nostr.identity.settings", mock_settings): + assert manager.get_relay_urls() == [] + + def test_single_relay(self, manager): + mock_settings = MagicMock(nostr_relays="wss://relay.damus.io") + with patch("infrastructure.nostr.identity.settings", mock_settings): + urls = manager.get_relay_urls() + assert urls == ["wss://relay.damus.io"] + + def test_multiple_relays(self, manager): + mock_settings = MagicMock(nostr_relays="wss://a.com,wss://b.com, wss://c.com ") + with patch("infrastructure.nostr.identity.settings", mock_settings): + urls = manager.get_relay_urls() + assert urls == ["wss://a.com", "wss://b.com", "wss://c.com"] + + +class TestBuildProfileEvent: + def test_kind_is_0(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_profile_event(kp) + assert ev["kind"] == 0 + + def test_content_contains_name(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="A great AI agent", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_profile_event(kp) + profile = json.loads(ev["content"]) + assert profile["name"] == "Timmy" + + def test_nip05_included_when_set(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="timmy@tower.local", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_profile_event(kp) + profile = json.loads(ev["content"]) + assert profile["nip05"] == "timmy@tower.local" + + def test_nip05_omitted_when_empty(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_profile_event(kp) + profile = json.loads(ev["content"]) + assert "nip05" not in profile + + def test_default_name_when_blank(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_profile_event(kp) + profile = json.loads(ev["content"]) + assert profile["name"] == "Timmy" # default + + +class TestBuildCapabilityEvent: + def test_kind_is_31990(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_capability_event(kp) + assert ev["kind"] == 31990 + + def test_has_d_tag(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_capability_event(kp) + d_tags = [t for t in ev["tags"] if t[0] == "d"] + assert d_tags + assert d_tags[0][1] == "timmy-mission-control" + + def test_content_is_json(self, manager, kp): + mock_settings = MagicMock( + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + ev = manager.build_capability_event(kp) + parsed = json.loads(ev["content"]) + assert "name" in parsed + assert "capabilities" in parsed + + +class TestAnnounce: + @pytest.mark.asyncio + async def test_returns_empty_result_when_no_privkey(self, manager): + mock_settings = MagicMock( + nostr_privkey="", + nostr_relays="", + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + result = await manager.announce() + assert result.kind_0_ok is False + assert result.kind_31990_ok is False + + @pytest.mark.asyncio + async def test_returns_empty_result_when_no_relays(self, manager, kp): + mock_settings = MagicMock( + nostr_privkey=kp.privkey_hex, + nostr_relays="", + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with patch("infrastructure.nostr.identity.settings", mock_settings): + result = await manager.announce() + assert result.kind_0_ok is False + + @pytest.mark.asyncio + async def test_publishes_kind0_and_kind31990(self, manager, kp): + mock_settings = MagicMock( + nostr_privkey=kp.privkey_hex, + nostr_relays="wss://relay.test", + nostr_profile_name="Timmy", + nostr_profile_about="Test agent", + nostr_profile_picture="", + nostr_nip05="timmy@test", + ) + with ( + patch("infrastructure.nostr.identity.settings", mock_settings), + patch( + "infrastructure.nostr.identity.publish_to_relays", + new=AsyncMock(return_value={"wss://relay.test": True}), + ) as mock_publish, + ): + result = await manager.announce() + + assert mock_publish.call_count == 2 # kind 0 + kind 31990 + assert result.kind_0_ok is True + assert result.kind_31990_ok is True + assert result.relay_results["wss://relay.test"] is True + + @pytest.mark.asyncio + async def test_degrades_gracefully_on_relay_failure(self, manager, kp): + mock_settings = MagicMock( + nostr_privkey=kp.privkey_hex, + nostr_relays="wss://relay.test", + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with ( + patch("infrastructure.nostr.identity.settings", mock_settings), + patch( + "infrastructure.nostr.identity.publish_to_relays", + new=AsyncMock(return_value={"wss://relay.test": False}), + ), + ): + result = await manager.announce() + + assert result.kind_0_ok is False + assert result.kind_31990_ok is False + + @pytest.mark.asyncio + async def test_never_raises_on_exception(self, manager, kp): + mock_settings = MagicMock( + nostr_privkey=kp.privkey_hex, + nostr_relays="wss://relay.test", + nostr_profile_name="Timmy", + nostr_profile_about="", + nostr_profile_picture="", + nostr_nip05="", + ) + with ( + patch("infrastructure.nostr.identity.settings", mock_settings), + patch( + "infrastructure.nostr.identity.publish_to_relays", + new=AsyncMock(side_effect=Exception("relay exploded")), + ), + ): + # Must not raise + result = await manager.announce() + assert isinstance(result, AnnounceResult) diff --git a/tests/unit/test_nostr_keypair.py b/tests/unit/test_nostr_keypair.py new file mode 100644 index 00000000..d050ede6 --- /dev/null +++ b/tests/unit/test_nostr_keypair.py @@ -0,0 +1,126 @@ +"""Unit tests for infrastructure.nostr.keypair.""" + +from __future__ import annotations + +import pytest + +from infrastructure.nostr.keypair import ( + NostrKeypair, + _bech32_decode, + _bech32_encode, + generate_keypair, + load_keypair, + pubkey_from_privkey, +) + + +class TestGenerateKeypair: + def test_returns_nostr_keypair(self): + kp = generate_keypair() + assert isinstance(kp, NostrKeypair) + + def test_privkey_hex_is_64_chars(self): + kp = generate_keypair() + assert len(kp.privkey_hex) == 64 + assert all(c in "0123456789abcdef" for c in kp.privkey_hex) + + def test_pubkey_hex_is_64_chars(self): + kp = generate_keypair() + assert len(kp.pubkey_hex) == 64 + assert all(c in "0123456789abcdef" for c in kp.pubkey_hex) + + def test_nsec_starts_with_nsec1(self): + kp = generate_keypair() + assert kp.nsec.startswith("nsec1") + + def test_npub_starts_with_npub1(self): + kp = generate_keypair() + assert kp.npub.startswith("npub1") + + def test_two_keypairs_are_different(self): + kp1 = generate_keypair() + kp2 = generate_keypair() + assert kp1.privkey_hex != kp2.privkey_hex + assert kp1.pubkey_hex != kp2.pubkey_hex + + def test_privkey_bytes_matches_hex(self): + kp = generate_keypair() + assert kp.privkey_bytes == bytes.fromhex(kp.privkey_hex) + + def test_pubkey_bytes_matches_hex(self): + kp = generate_keypair() + assert kp.pubkey_bytes == bytes.fromhex(kp.pubkey_hex) + + +class TestLoadKeypair: + def test_round_trip_via_privkey_hex(self): + kp1 = generate_keypair() + kp2 = load_keypair(privkey_hex=kp1.privkey_hex) + assert kp2.privkey_hex == kp1.privkey_hex + assert kp2.pubkey_hex == kp1.pubkey_hex + + def test_round_trip_via_nsec(self): + kp1 = generate_keypair() + kp2 = load_keypair(nsec=kp1.nsec) + assert kp2.privkey_hex == kp1.privkey_hex + assert kp2.pubkey_hex == kp1.pubkey_hex + + def test_raises_if_both_supplied(self): + kp = generate_keypair() + with pytest.raises(ValueError, match="either"): + load_keypair(privkey_hex=kp.privkey_hex, nsec=kp.nsec) + + def test_raises_if_neither_supplied(self): + with pytest.raises(ValueError, match="either"): + load_keypair() + + def test_raises_on_invalid_hex(self): + with pytest.raises((ValueError, Exception)): + load_keypair(privkey_hex="zzzz") + + def test_raises_on_wrong_length_hex(self): + with pytest.raises(ValueError): + load_keypair(privkey_hex="deadbeef") # too short + + def test_raises_on_wrong_hrp_bech32(self): + kp = generate_keypair() + # npub is bech32 but with hrp "npub", not "nsec" + with pytest.raises(ValueError): + load_keypair(nsec=kp.npub) + + def test_npub_derived_from_privkey(self): + kp1 = generate_keypair() + kp2 = load_keypair(privkey_hex=kp1.privkey_hex) + assert kp2.npub == kp1.npub + + +class TestPubkeyFromPrivkey: + def test_derives_correct_pubkey(self): + kp = generate_keypair() + derived = pubkey_from_privkey(kp.privkey_hex) + assert derived == kp.pubkey_hex + + def test_is_deterministic(self): + kp = generate_keypair() + assert pubkey_from_privkey(kp.privkey_hex) == pubkey_from_privkey(kp.privkey_hex) + + +class TestBech32: + def test_encode_decode_round_trip(self): + data = bytes(range(32)) + encoded = _bech32_encode("test", data) + hrp, decoded = _bech32_decode(encoded) + assert hrp == "test" + assert decoded == data + + def test_invalid_checksum_raises(self): + kp = generate_keypair() + mangled = kp.npub[:-1] + ("q" if kp.npub[-1] != "q" else "p") + with pytest.raises(ValueError, match="checksum"): + _bech32_decode(mangled) + + def test_npub_roundtrip(self): + kp = generate_keypair() + hrp, pub = _bech32_decode(kp.npub) + assert hrp == "npub" + assert pub.hex() == kp.pubkey_hex