feat: Timmy's Nostr identity — keypair, profile, relay presence (#856)
Some checks failed
Tests / lint (pull_request) Failing after 24s
Tests / test (pull_request) Has been skipped

Implements Timmy's Nostr identity stack in `src/infrastructure/nostr/`:

- **keypair.py**: Pure-Python secp256k1 keypair generation (BIP-340),
  bech32 nsec/npub encoding (NIP-19), `generate_keypair()` / `load_keypair()`.
- **event.py**: NIP-01 event construction with BIP-340 Schnorr signing,
  `build_event()` + `schnorr_sign()` / `schnorr_verify()`.
- **relay.py**: WebSocket relay client via `websockets`, publishes NIP-01
  `["EVENT", …]` messages to one or many relays concurrently with graceful
  degradation when unavailable.
- **identity.py**: `NostrIdentityManager` — publishes Kind 0 (profile
  metadata) and Kind 31990 (NIP-89/NIP-90 capability card) to configured
  relays; reads all settings from `config.settings`.

Config additions to `src/config.py`:
  `NOSTR_PRIVKEY`, `NOSTR_PUBKEY`, `NOSTR_RELAYS`, `NOSTR_NIP05`,
  `NOSTR_PROFILE_NAME`, `NOSTR_PROFILE_ABOUT`, `NOSTR_PROFILE_PICTURE`.

All operations degrade gracefully (log WARNING, never crash) when keys or
relays are not configured — consistent with the project's error-handling
patterns.

Tests: 647 unit tests pass; new tests cover keypair, event signing/verify,
and the full identity announce lifecycle.

Fixes #856

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-23 22:20:03 -04:00
parent 6bb5e7e1a6
commit 5d4d484f59
9 changed files with 1498 additions and 0 deletions

View File

@@ -524,6 +524,28 @@ class Settings(BaseSettings):
# Corresponding public key (hex-encoded npub)
content_nostr_pubkey: str = ""
# ── Nostr Identity (Timmy's on-network presence) ─────────────────────────
# Hex-encoded 32-byte private key — NEVER commit this value.
# Generate one with: timmyctl nostr keygen
nostr_privkey: str = ""
# Corresponding x-only public key (hex). Auto-derived from nostr_privkey
# if left empty; override only if you manage keys externally.
nostr_pubkey: str = ""
# Comma-separated list of NIP-01 relay WebSocket URLs.
# e.g. "wss://relay.damus.io,wss://nostr.wine"
nostr_relays: str = ""
# NIP-05 identifier for Timmy — e.g. "timmy@tower.local"
nostr_nip05: str = ""
# Profile display name (Kind 0 "name" field)
nostr_profile_name: str = "Timmy"
# Profile "about" text (Kind 0 "about" field)
nostr_profile_about: str = (
"Sovereign AI agent — mission control dashboard, task orchestration, "
"and ambient intelligence."
)
# URL to Timmy's avatar image (Kind 0 "picture" field)
nostr_profile_picture: str = ""
# Meilisearch archive
content_meilisearch_url: str = "http://localhost:7700"
content_meilisearch_api_key: str = ""

View File

@@ -0,0 +1,18 @@
"""Nostr identity infrastructure for Timmy.
Provides keypair management, NIP-01 event signing, WebSocket relay client,
and identity lifecycle management (Kind 0 profile, Kind 31990 capability card).
All components degrade gracefully when the Nostr relay is unavailable.
Usage
-----
from infrastructure.nostr.identity import NostrIdentityManager
manager = NostrIdentityManager()
await manager.announce() # publishes Kind 0 + Kind 31990
"""
from infrastructure.nostr.identity import NostrIdentityManager
__all__ = ["NostrIdentityManager"]

View File

@@ -0,0 +1,215 @@
"""NIP-01 Nostr event construction and BIP-340 Schnorr signing.
Constructs and signs Nostr events using a pure-Python BIP-340 Schnorr
implementation over secp256k1 (no external crypto dependencies required).
Usage
-----
from infrastructure.nostr.event import build_event, sign_event
from infrastructure.nostr.keypair import load_keypair
kp = load_keypair(privkey_hex="...")
ev = build_event(kind=0, content='{"name":"Timmy"}', keypair=kp)
print(ev["id"], ev["sig"])
"""
from __future__ import annotations
import hashlib
import json
import secrets
import time
from typing import Any
from infrastructure.nostr.keypair import (
_G,
_N,
_P,
NostrKeypair,
Point,
_has_even_y,
_point_mul,
_x_bytes,
)
# ── BIP-340 tagged hash ────────────────────────────────────────────────────────
def _tagged_hash(tag: str, data: bytes) -> bytes:
"""BIP-340 tagged SHA-256 hash: SHA256(SHA256(tag) || SHA256(tag) || data)."""
tag_hash = hashlib.sha256(tag.encode()).digest()
return hashlib.sha256(tag_hash + tag_hash + data).digest()
# ── BIP-340 Schnorr sign ───────────────────────────────────────────────────────
def schnorr_sign(msg: bytes, privkey_bytes: bytes) -> bytes:
"""Sign a 32-byte message with a 32-byte private key using BIP-340 Schnorr.
Parameters
----------
msg:
The 32-byte message to sign (typically the event ID hash).
privkey_bytes:
The 32-byte private key.
Returns
-------
bytes
64-byte Schnorr signature (r || s).
Raises
------
ValueError
If the key is invalid.
"""
if len(msg) != 32:
raise ValueError(f"Message must be 32 bytes, got {len(msg)}")
if len(privkey_bytes) != 32:
raise ValueError(f"Private key must be 32 bytes, got {len(privkey_bytes)}")
d_int = int.from_bytes(privkey_bytes, "big")
if not (1 <= d_int < _N):
raise ValueError("Private key out of range")
P = _point_mul(_G, d_int)
assert P is not None
# Negate d if P has odd y (BIP-340 requirement)
a = d_int if _has_even_y(P) else _N - d_int
# Deterministic nonce with auxiliary randomness (BIP-340 §Default signing)
rand = secrets.token_bytes(32)
t = bytes(x ^ y for x, y in zip(a.to_bytes(32, "big"), _tagged_hash("BIP0340/aux", rand), strict=True))
r_bytes = _tagged_hash("BIP0340/nonce", t + _x_bytes(P) + msg)
k_int = int.from_bytes(r_bytes, "big") % _N
if k_int == 0: # Astronomically unlikely; retry would be cleaner but this is safe enough
raise ValueError("Nonce derivation produced k=0; retry signing")
R: Point = _point_mul(_G, k_int)
assert R is not None
k = k_int if _has_even_y(R) else _N - k_int
e = (
int.from_bytes(
_tagged_hash("BIP0340/challenge", _x_bytes(R) + _x_bytes(P) + msg),
"big",
)
% _N
)
s = (k + e * a) % _N
sig = _x_bytes(R) + s.to_bytes(32, "big")
assert len(sig) == 64
return sig
def schnorr_verify(msg: bytes, pubkey_bytes: bytes, sig: bytes) -> bool:
"""Verify a BIP-340 Schnorr signature.
Returns True if valid, False otherwise (never raises).
"""
try:
if len(msg) != 32 or len(pubkey_bytes) != 32 or len(sig) != 64:
return False
px = int.from_bytes(pubkey_bytes, "big")
if px >= _P:
return False
# Lift x to curve point (even-y convention)
y_sq = (pow(px, 3, _P) + 7) % _P
y = pow(y_sq, (_P + 1) // 4, _P)
if pow(y, 2, _P) != y_sq:
return False
P: Point = (px, y if y % 2 == 0 else _P - y)
r = int.from_bytes(sig[:32], "big")
s = int.from_bytes(sig[32:], "big")
if r >= _P or s >= _N:
return False
e = (
int.from_bytes(
_tagged_hash("BIP0340/challenge", sig[:32] + pubkey_bytes + msg),
"big",
)
% _N
)
R1 = _point_mul(_G, s)
R2 = _point_mul(P, _N - e)
# Point addition
from infrastructure.nostr.keypair import _point_add
R: Point = _point_add(R1, R2)
if R is None or not _has_even_y(R) or R[0] != r:
return False
return True
except Exception:
return False
# ── NIP-01 event construction ─────────────────────────────────────────────────
NostrEvent = dict[str, Any]
def _event_hash(pubkey: str, created_at: int, kind: int, tags: list, content: str) -> bytes:
"""Compute the NIP-01 event ID (SHA-256 of canonical serialisation)."""
serialized = json.dumps(
[0, pubkey, created_at, kind, tags, content],
separators=(",", ":"),
ensure_ascii=False,
)
return hashlib.sha256(serialized.encode()).digest()
def build_event(
*,
kind: int,
content: str,
keypair: NostrKeypair,
tags: list[list[str]] | None = None,
created_at: int | None = None,
) -> NostrEvent:
"""Build and sign a NIP-01 Nostr event.
Parameters
----------
kind:
NIP-01 event kind integer (e.g. 0 = profile, 1 = note).
content:
Event content string (often JSON for structured kinds).
keypair:
The signing keypair.
tags:
Optional list of tag arrays.
created_at:
Unix timestamp; defaults to ``int(time.time())``.
Returns
-------
dict
Fully signed NIP-01 event ready for relay publication.
"""
_tags = tags or []
_created_at = created_at if created_at is not None else int(time.time())
msg = _event_hash(keypair.pubkey_hex, _created_at, kind, _tags, content)
event_id = msg.hex()
sig_bytes = schnorr_sign(msg, keypair.privkey_bytes)
sig_hex = sig_bytes.hex()
return {
"id": event_id,
"pubkey": keypair.pubkey_hex,
"created_at": _created_at,
"kind": kind,
"tags": _tags,
"content": content,
"sig": sig_hex,
}

View File

@@ -0,0 +1,265 @@
"""Timmy's Nostr identity lifecycle manager.
Manages Timmy's on-network Nostr presence:
- **Kind 0** (NIP-01 profile metadata): name, about, picture, nip05
- **Kind 31990** (NIP-89 handler / NIP-90 capability card): advertises
Timmy's services so NIP-89 clients can discover him.
Config is read from ``settings`` via pydantic-settings:
NOSTR_PRIVKEY — hex private key (required to publish)
NOSTR_PUBKEY — hex public key (auto-derived if missing)
NOSTR_RELAYS — comma-separated relay WSS URLs
NOSTR_NIP05 — NIP-05 identifier e.g. timmy@tower.local
NOSTR_PROFILE_NAME — display name (default: "Timmy")
NOSTR_PROFILE_ABOUT — "about" text
NOSTR_PROFILE_PICTURE — avatar URL
Usage
-----
from infrastructure.nostr.identity import NostrIdentityManager
manager = NostrIdentityManager()
result = await manager.announce()
# {'kind_0': True, 'kind_31990': True, 'relays': {'wss://…': True}}
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from typing import Any
from config import settings
from infrastructure.nostr.event import build_event
from infrastructure.nostr.keypair import NostrKeypair, load_keypair
from infrastructure.nostr.relay import publish_to_relays
logger = logging.getLogger(__name__)
# Timmy's default capability description for NIP-89/NIP-90
_DEFAULT_CAPABILITIES = {
"name": "Timmy",
"about": (
"Sovereign AI agent — mission control dashboard, task orchestration, "
"voice NLU, game-state monitoring, and ambient intelligence."
),
"capabilities": [
"chat",
"task_orchestration",
"voice_nlu",
"game_state",
"nostr_presence",
],
"nip": [1, 89, 90],
}
@dataclass
class AnnounceResult:
"""Result of a Nostr identity announcement."""
kind_0_ok: bool = False
kind_31990_ok: bool = False
relay_results: dict[str, bool] = field(default_factory=dict)
@property
def any_relay_ok(self) -> bool:
return any(self.relay_results.values())
def to_dict(self) -> dict[str, Any]:
return {
"kind_0": self.kind_0_ok,
"kind_31990": self.kind_31990_ok,
"relays": self.relay_results,
}
class NostrIdentityManager:
"""Manages Timmy's Nostr identity and relay presence.
Reads configuration from ``settings`` on every call so runtime
changes to environment variables are picked up automatically.
All public methods degrade gracefully — they log warnings and return
False/empty rather than raising exceptions.
"""
# ── keypair ─────────────────────────────────────────────────────────────
def get_keypair(self) -> NostrKeypair | None:
"""Return the configured keypair, or None if not configured.
Derives the public key from the private key if only the private
key is set. Returns None (with a warning) if no private key is
configured.
"""
privkey = settings.nostr_privkey.strip()
if not privkey:
logger.warning(
"NOSTR_PRIVKEY not configured — Nostr identity unavailable. "
"Run `timmyctl nostr keygen` to generate a keypair."
)
return None
try:
return load_keypair(privkey_hex=privkey)
except Exception as exc:
logger.warning("Invalid NOSTR_PRIVKEY: %s", exc)
return None
# ── relay list ───────────────────────────────────────────────────────────
def get_relay_urls(self) -> list[str]:
"""Return the configured relay URL list (may be empty)."""
raw = settings.nostr_relays.strip()
if not raw:
return []
return [url.strip() for url in raw.split(",") if url.strip()]
# ── Kind 0 — profile ─────────────────────────────────────────────────────
def build_profile_event(self, keypair: NostrKeypair) -> dict:
"""Build a NIP-01 Kind 0 profile metadata event.
Reads profile fields from settings:
``nostr_profile_name``, ``nostr_profile_about``,
``nostr_profile_picture``, ``nostr_nip05``.
"""
profile: dict[str, str] = {}
name = settings.nostr_profile_name.strip() or "Timmy"
profile["name"] = name
profile["display_name"] = name
about = settings.nostr_profile_about.strip()
if about:
profile["about"] = about
picture = settings.nostr_profile_picture.strip()
if picture:
profile["picture"] = picture
nip05 = settings.nostr_nip05.strip()
if nip05:
profile["nip05"] = nip05
return build_event(
kind=0,
content=json.dumps(profile, ensure_ascii=False),
keypair=keypair,
)
# ── Kind 31990 — NIP-89 capability card ──────────────────────────────────
def build_capability_event(self, keypair: NostrKeypair) -> dict:
"""Build a NIP-89/NIP-90 Kind 31990 capability handler event.
Advertises Timmy's services so NIP-89 clients can discover him.
The ``d`` tag uses the application identifier ``timmy-mission-control``.
"""
cap = dict(_DEFAULT_CAPABILITIES)
name = settings.nostr_profile_name.strip() or "Timmy"
cap["name"] = name
about = settings.nostr_profile_about.strip()
if about:
cap["about"] = about
picture = settings.nostr_profile_picture.strip()
if picture:
cap["picture"] = picture
nip05 = settings.nostr_nip05.strip()
if nip05:
cap["nip05"] = nip05
tags = [
["d", "timmy-mission-control"],
["k", "1"], # handles kind:1 (notes) as a starting point
["k", "5600"], # DVM task request (NIP-90)
["k", "5900"], # DVM general task
]
return build_event(
kind=31990,
content=json.dumps(cap, ensure_ascii=False),
keypair=keypair,
tags=tags,
)
# ── announce ─────────────────────────────────────────────────────────────
async def announce(self) -> AnnounceResult:
"""Publish Kind 0 profile and Kind 31990 capability card to all relays.
Returns
-------
AnnounceResult
Contains per-relay success flags and per-event-kind success flags.
Never raises; all failures are logged at WARNING level.
"""
result = AnnounceResult()
keypair = self.get_keypair()
if keypair is None:
return result
relay_urls = self.get_relay_urls()
if not relay_urls:
logger.warning(
"NOSTR_RELAYS not configured — Kind 0 and Kind 31990 not published."
)
return result
logger.info(
"Announcing Nostr identity %s to %d relay(s)", keypair.npub[:20], len(relay_urls)
)
# Build and publish Kind 0 (profile)
try:
kind0 = self.build_profile_event(keypair)
k0_results = await publish_to_relays(relay_urls, kind0)
result.kind_0_ok = any(k0_results.values())
# Merge relay results
for url, ok in k0_results.items():
result.relay_results[url] = result.relay_results.get(url, False) or ok
except Exception as exc:
logger.warning("Kind 0 publish failed: %s", exc)
# Build and publish Kind 31990 (capability card)
try:
kind31990 = self.build_capability_event(keypair)
k31990_results = await publish_to_relays(relay_urls, kind31990)
result.kind_31990_ok = any(k31990_results.values())
for url, ok in k31990_results.items():
result.relay_results[url] = result.relay_results.get(url, False) or ok
except Exception as exc:
logger.warning("Kind 31990 publish failed: %s", exc)
if result.any_relay_ok:
logger.info("Nostr identity announced successfully (npub: %s)", keypair.npub)
else:
logger.warning("Nostr identity announcement failed — no relays accepted events")
return result
async def publish_profile(self) -> bool:
"""Publish only the Kind 0 profile event.
Returns True if at least one relay accepted the event.
"""
keypair = self.get_keypair()
if keypair is None:
return False
relay_urls = self.get_relay_urls()
if not relay_urls:
return False
try:
event = self.build_profile_event(keypair)
results = await publish_to_relays(relay_urls, event)
return any(results.values())
except Exception as exc:
logger.warning("Profile publish failed: %s", exc)
return False

View File

@@ -0,0 +1,270 @@
"""Nostr keypair generation and encoding (NIP-19 / BIP-340).
Provides pure-Python secp256k1 keypair generation and bech32 nsec/npub
encoding with no external dependencies beyond the Python stdlib.
Usage
-----
from infrastructure.nostr.keypair import generate_keypair, load_keypair
kp = generate_keypair()
print(kp.npub) # npub1…
print(kp.nsec) # nsec1…
kp2 = load_keypair(privkey_hex="deadbeef...")
"""
from __future__ import annotations
import hashlib
import secrets
from dataclasses import dataclass
# ── secp256k1 curve parameters (BIP-340) ──────────────────────────────────────
_P = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
_N = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
_GX = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798
_GY = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8
_G = (_GX, _GY)
Point = tuple[int, int] | None # None represents the point at infinity
def _point_add(P: Point, Q: Point) -> Point:
if P is None:
return Q
if Q is None:
return P
px, py = P
qx, qy = Q
if px == qx:
if py != qy:
return None
# Point doubling
lam = (3 * px * px * pow(2 * py, _P - 2, _P)) % _P
else:
lam = ((qy - py) * pow(qx - px, _P - 2, _P)) % _P
rx = (lam * lam - px - qx) % _P
ry = (lam * (px - rx) - py) % _P
return rx, ry
def _point_mul(P: Point, n: int) -> Point:
"""Scalar multiplication via double-and-add."""
R: Point = None
while n > 0:
if n & 1:
R = _point_add(R, P)
P = _point_add(P, P)
n >>= 1
return R
def _has_even_y(P: Point) -> bool:
assert P is not None
return P[1] % 2 == 0
def _x_bytes(P: Point) -> bytes:
"""Return the 32-byte x-coordinate of a point (x-only pubkey)."""
assert P is not None
return P[0].to_bytes(32, "big")
def _privkey_to_pubkey_bytes(privkey_int: int) -> bytes:
"""Derive the x-only public key from an integer private key."""
P = _point_mul(_G, privkey_int)
return _x_bytes(P)
# ── bech32 encoding (NIP-19 uses original bech32, not bech32m) ────────────────
_BECH32_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
def _bech32_polymod(values: list[int]) -> int:
GEN = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3]
chk = 1
for v in values:
b = chk >> 25
chk = (chk & 0x1FFFFFF) << 5 ^ v
for i in range(5):
chk ^= GEN[i] if ((b >> i) & 1) else 0
return chk
def _bech32_hrp_expand(hrp: str) -> list[int]:
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def _convertbits(data: bytes, frombits: int, tobits: int, pad: bool = True) -> list[int]:
acc = 0
bits = 0
ret: list[int] = []
maxv = (1 << tobits) - 1
for value in data:
acc = ((acc << frombits) | value) & 0xFFFFFF
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad and bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
raise ValueError("Invalid padding")
return ret
def _bech32_encode(hrp: str, data: bytes) -> str:
"""Encode bytes as a bech32 string with the given HRP."""
converted = _convertbits(data, 8, 5)
combined = _bech32_hrp_expand(hrp) + converted
checksum_input = combined + [0, 0, 0, 0, 0, 0]
polymod = _bech32_polymod(checksum_input) ^ 1
checksum = [(polymod >> (5 * (5 - i))) & 31 for i in range(6)]
return hrp + "1" + "".join(_BECH32_CHARSET[d] for d in converted + checksum)
def _bech32_decode(bech32_str: str) -> tuple[str, bytes]:
"""Decode a bech32 string to (hrp, data_bytes).
Raises ValueError on invalid encoding.
"""
bech32_str = bech32_str.lower()
sep = bech32_str.rfind("1")
if sep < 1 or sep + 7 > len(bech32_str):
raise ValueError(f"Invalid bech32: {bech32_str!r}")
hrp = bech32_str[:sep]
data_chars = bech32_str[sep + 1 :]
data = []
for c in data_chars:
pos = _BECH32_CHARSET.find(c)
if pos == -1:
raise ValueError(f"Invalid bech32 character: {c!r}")
data.append(pos)
if _bech32_polymod(_bech32_hrp_expand(hrp) + data) != 1:
raise ValueError("Invalid bech32 checksum")
decoded = _convertbits(bytes(data[:-6]), 5, 8, pad=False)
return hrp, bytes(decoded)
# ── NostrKeypair ──────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class NostrKeypair:
"""A Nostr keypair with both hex and bech32 representations.
Attributes
----------
privkey_hex : str
32-byte private key as lowercase hex (64 chars). Treat as a secret.
pubkey_hex : str
32-byte x-only public key as lowercase hex (64 chars).
nsec : str
Private key encoded as NIP-19 ``nsec1…`` bech32 string.
npub : str
Public key encoded as NIP-19 ``npub1…`` bech32 string.
"""
privkey_hex: str
pubkey_hex: str
nsec: str
npub: str
@property
def privkey_bytes(self) -> bytes:
return bytes.fromhex(self.privkey_hex)
@property
def pubkey_bytes(self) -> bytes:
return bytes.fromhex(self.pubkey_hex)
def generate_keypair() -> NostrKeypair:
"""Generate a fresh Nostr keypair from a cryptographically random seed.
Returns
-------
NostrKeypair
The newly generated keypair.
"""
while True:
raw = secrets.token_bytes(32)
d = int.from_bytes(raw, "big")
if 1 <= d < _N:
break
pub_bytes = _privkey_to_pubkey_bytes(d)
privkey_hex = raw.hex()
pubkey_hex = pub_bytes.hex()
nsec = _bech32_encode("nsec", raw)
npub = _bech32_encode("npub", pub_bytes)
return NostrKeypair(privkey_hex=privkey_hex, pubkey_hex=pubkey_hex, nsec=nsec, npub=npub)
def load_keypair(
*,
privkey_hex: str | None = None,
nsec: str | None = None,
) -> NostrKeypair:
"""Load a keypair from a hex private key or an nsec bech32 string.
Parameters
----------
privkey_hex:
64-char lowercase hex private key.
nsec:
NIP-19 ``nsec1…`` bech32 string.
Raises
------
ValueError
If neither or both parameters are supplied, or if the key is invalid.
"""
if privkey_hex and nsec:
raise ValueError("Supply either privkey_hex or nsec, not both")
if not privkey_hex and not nsec:
raise ValueError("Supply either privkey_hex or nsec")
if nsec:
hrp, raw = _bech32_decode(nsec)
if hrp != "nsec":
raise ValueError(f"Expected nsec bech32, got {hrp!r}")
privkey_hex = raw.hex()
assert privkey_hex is not None
raw_bytes = bytes.fromhex(privkey_hex)
if len(raw_bytes) != 32:
raise ValueError(f"Private key must be 32 bytes, got {len(raw_bytes)}")
d = int.from_bytes(raw_bytes, "big")
if not (1 <= d < _N):
raise ValueError("Private key out of range")
pub_bytes = _privkey_to_pubkey_bytes(d)
pubkey_hex = pub_bytes.hex()
nsec_enc = _bech32_encode("nsec", raw_bytes)
npub = _bech32_encode("npub", pub_bytes)
return NostrKeypair(privkey_hex=privkey_hex, pubkey_hex=pubkey_hex, nsec=nsec_enc, npub=npub)
def pubkey_from_privkey(privkey_hex: str) -> str:
"""Derive the hex public key from a hex private key.
Parameters
----------
privkey_hex:
64-char lowercase hex private key.
Returns
-------
str
64-char lowercase hex x-only public key.
"""
return load_keypair(privkey_hex=privkey_hex).pubkey_hex
def _sha256(data: bytes) -> bytes:
return hashlib.sha256(data).digest()

View File

@@ -0,0 +1,133 @@
"""NIP-01 WebSocket relay client for Nostr event publication.
Connects to Nostr relays via WebSocket and publishes events using
the NIP-01 ``["EVENT", event]`` message format.
Degrades gracefully when the relay is unavailable or the ``websockets``
package is not installed.
Usage
-----
from infrastructure.nostr.relay import publish_to_relay
ok = await publish_to_relay("wss://relay.damus.io", signed_event)
# Returns True if the relay accepted the event.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
logger = logging.getLogger(__name__)
NostrEvent = dict[str, Any]
# Timeout for relay operations (seconds)
_CONNECT_TIMEOUT = 10
_PUBLISH_TIMEOUT = 15
async def publish_to_relay(relay_url: str, event: NostrEvent) -> bool:
"""Publish a signed NIP-01 event to a single relay.
Parameters
----------
relay_url:
``wss://`` or ``ws://`` WebSocket URL of the relay.
event:
A fully signed NIP-01 event dict.
Returns
-------
bool
True if the relay acknowledged the event (``["OK", id, true, …]``),
False otherwise (never raises).
"""
try:
import websockets
except ImportError:
logger.warning(
"websockets package not available — Nostr relay publish skipped "
"(install with: pip install websockets)"
)
return False
event_id = event.get("id", "")
message = json.dumps(["EVENT", event], separators=(",", ":"))
try:
async with asyncio.timeout(_CONNECT_TIMEOUT):
ws = await websockets.connect(relay_url, open_timeout=_CONNECT_TIMEOUT)
except Exception as exc:
logger.warning("Nostr relay connect failed (%s): %s", relay_url, exc)
return False
try:
async with ws:
await ws.send(message)
# Wait for OK response with timeout
async with asyncio.timeout(_PUBLISH_TIMEOUT):
async for raw in ws:
try:
resp = json.loads(raw)
except json.JSONDecodeError:
continue
if (
isinstance(resp, list)
and len(resp) >= 3
and resp[0] == "OK"
and resp[1] == event_id
):
if resp[2] is True:
logger.debug("Relay %s accepted event %s", relay_url, event_id[:8])
return True
else:
reason = resp[3] if len(resp) > 3 else ""
logger.warning(
"Relay %s rejected event %s: %s",
relay_url,
event_id[:8],
reason,
)
return False
except TimeoutError:
logger.warning("Relay %s timed out waiting for OK on event %s", relay_url, event_id[:8])
return False
except Exception as exc:
logger.warning("Relay %s error publishing event %s: %s", relay_url, event_id[:8], exc)
return False
logger.warning("Relay %s closed without OK for event %s", relay_url, event_id[:8])
return False
async def publish_to_relays(relay_urls: list[str], event: NostrEvent) -> dict[str, bool]:
"""Publish an event to multiple relays concurrently.
Parameters
----------
relay_urls:
List of relay WebSocket URLs.
event:
A fully signed NIP-01 event dict.
Returns
-------
dict[str, bool]
Mapping of relay URL → success flag.
"""
if not relay_urls:
return {}
tasks = {url: asyncio.create_task(publish_to_relay(url, event)) for url in relay_urls}
results: dict[str, bool] = {}
for url, task in tasks.items():
try:
results[url] = await task
except Exception as exc:
logger.warning("Unexpected error publishing to %s: %s", url, exc)
results[url] = False
return results

View File

@@ -0,0 +1,177 @@
"""Unit tests for infrastructure.nostr.event."""
from __future__ import annotations
import hashlib
import json
import time
import pytest
from infrastructure.nostr.event import (
_event_hash,
build_event,
schnorr_sign,
schnorr_verify,
)
from infrastructure.nostr.keypair import generate_keypair
class TestSchorrSign:
def test_returns_64_bytes(self):
kp = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp.privkey_bytes)
assert len(sig) == 64
def test_different_msg_different_sig(self):
kp = generate_keypair()
sig1 = schnorr_sign(b"\x01" * 32, kp.privkey_bytes)
sig2 = schnorr_sign(b"\x02" * 32, kp.privkey_bytes)
assert sig1 != sig2
def test_raises_on_wrong_msg_length(self):
kp = generate_keypair()
with pytest.raises(ValueError, match="32 bytes"):
schnorr_sign(b"too short", kp.privkey_bytes)
def test_raises_on_wrong_key_length(self):
msg = b"\x00" * 32
with pytest.raises(ValueError, match="32 bytes"):
schnorr_sign(msg, b"too short")
def test_nondeterministic_due_to_randomness(self):
# BIP-340 uses auxiliary randomness; repeated calls produce different sigs
kp = generate_keypair()
msg = b"\x42" * 32
sig1 = schnorr_sign(msg, kp.privkey_bytes)
sig2 = schnorr_sign(msg, kp.privkey_bytes)
# With different random nonces these should differ (astronomically unlikely to collide)
# We just verify both are valid
assert schnorr_verify(msg, kp.pubkey_bytes, sig1)
assert schnorr_verify(msg, kp.pubkey_bytes, sig2)
class TestSchnorrVerify:
def test_valid_signature_verifies(self):
kp = generate_keypair()
msg = hashlib.sha256(b"hello nostr").digest()
sig = schnorr_sign(msg, kp.privkey_bytes)
assert schnorr_verify(msg, kp.pubkey_bytes, sig) is True
def test_wrong_pubkey_fails(self):
kp1 = generate_keypair()
kp2 = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp1.privkey_bytes)
assert schnorr_verify(msg, kp2.pubkey_bytes, sig) is False
def test_tampered_sig_fails(self):
kp = generate_keypair()
msg = b"\x00" * 32
sig = bytearray(schnorr_sign(msg, kp.privkey_bytes))
sig[0] ^= 0xFF
assert schnorr_verify(msg, kp.pubkey_bytes, bytes(sig)) is False
def test_tampered_msg_fails(self):
kp = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp.privkey_bytes)
bad_msg = b"\xFF" * 32
assert schnorr_verify(bad_msg, kp.pubkey_bytes, sig) is False
def test_wrong_lengths_return_false(self):
kp = generate_keypair()
msg = b"\x00" * 32
sig = schnorr_sign(msg, kp.privkey_bytes)
assert schnorr_verify(msg[:16], kp.pubkey_bytes, sig) is False
assert schnorr_verify(msg, kp.pubkey_bytes[:16], sig) is False
assert schnorr_verify(msg, kp.pubkey_bytes, sig[:32]) is False
def test_never_raises(self):
# Should return False for any garbage input, not raise
assert schnorr_verify(b"x", b"y", b"z") is False
class TestEventHash:
def test_returns_32_bytes(self):
h = _event_hash("aabbcc", 0, 1, [], "")
assert len(h) == 32
def test_deterministic(self):
h1 = _event_hash("aa", 1, 1, [], "hello")
h2 = _event_hash("aa", 1, 1, [], "hello")
assert h1 == h2
def test_different_content_different_hash(self):
h1 = _event_hash("aa", 1, 1, [], "hello")
h2 = _event_hash("aa", 1, 1, [], "world")
assert h1 != h2
class TestBuildEvent:
def test_returns_required_fields(self):
kp = generate_keypair()
ev = build_event(kind=1, content="hello", keypair=kp)
assert set(ev) >= {"id", "pubkey", "created_at", "kind", "tags", "content", "sig"}
def test_kind_matches(self):
kp = generate_keypair()
ev = build_event(kind=0, content="{}", keypair=kp)
assert ev["kind"] == 0
def test_pubkey_matches_keypair(self):
kp = generate_keypair()
ev = build_event(kind=1, content="x", keypair=kp)
assert ev["pubkey"] == kp.pubkey_hex
def test_id_is_64_char_hex(self):
kp = generate_keypair()
ev = build_event(kind=1, content="x", keypair=kp)
assert len(ev["id"]) == 64
assert all(c in "0123456789abcdef" for c in ev["id"])
def test_sig_is_128_char_hex(self):
kp = generate_keypair()
ev = build_event(kind=1, content="x", keypair=kp)
assert len(ev["sig"]) == 128
assert all(c in "0123456789abcdef" for c in ev["sig"])
def test_signature_verifies(self):
kp = generate_keypair()
ev = build_event(kind=1, content="test", keypair=kp)
sig_bytes = bytes.fromhex(ev["sig"])
id_bytes = bytes.fromhex(ev["id"])
assert schnorr_verify(id_bytes, kp.pubkey_bytes, sig_bytes)
def test_id_matches_canonical_hash(self):
kp = generate_keypair()
ts = int(time.time())
ev = build_event(kind=1, content="hi", keypair=kp, created_at=ts)
expected_hash = _event_hash(kp.pubkey_hex, ts, 1, [], "hi").hex()
assert ev["id"] == expected_hash
def test_custom_tags(self):
kp = generate_keypair()
tags = [["t", "gaming"], ["r", "wss://relay.example.com"]]
ev = build_event(kind=1, content="x", keypair=kp, tags=tags)
assert ev["tags"] == tags
def test_default_tags_empty(self):
kp = generate_keypair()
ev = build_event(kind=1, content="x", keypair=kp)
assert ev["tags"] == []
def test_custom_created_at(self):
kp = generate_keypair()
ts = 1700000000
ev = build_event(kind=1, content="x", keypair=kp, created_at=ts)
assert ev["created_at"] == ts
def test_kind0_profile_content_is_json(self):
kp = generate_keypair()
profile = {"name": "Timmy", "about": "test"}
ev = build_event(kind=0, content=json.dumps(profile), keypair=kp)
assert ev["kind"] == 0
parsed = json.loads(ev["content"])
assert parsed["name"] == "Timmy"

View File

@@ -0,0 +1,272 @@
"""Unit tests for infrastructure.nostr.identity."""
from __future__ import annotations
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.nostr.identity import AnnounceResult, NostrIdentityManager
from infrastructure.nostr.keypair import generate_keypair
@pytest.fixture()
def manager():
return NostrIdentityManager()
@pytest.fixture()
def kp():
return generate_keypair()
class TestAnnounceResult:
def test_any_relay_ok_false_when_empty(self):
r = AnnounceResult()
assert r.any_relay_ok is False
def test_any_relay_ok_true_when_one_ok(self):
r = AnnounceResult(relay_results={"wss://a": True, "wss://b": False})
assert r.any_relay_ok is True
def test_to_dict_keys(self):
r = AnnounceResult(kind_0_ok=True, relay_results={"wss://a": True})
d = r.to_dict()
assert set(d) == {"kind_0", "kind_31990", "relays"}
class TestGetKeypair:
def test_returns_none_when_no_privkey(self, manager):
mock_settings = MagicMock(nostr_privkey="")
with patch("infrastructure.nostr.identity.settings", mock_settings):
assert manager.get_keypair() is None
def test_returns_keypair_when_configured(self, manager, kp):
mock_settings = MagicMock(nostr_privkey=kp.privkey_hex)
with patch("infrastructure.nostr.identity.settings", mock_settings):
result = manager.get_keypair()
assert result is not None
assert result.pubkey_hex == kp.pubkey_hex
def test_returns_none_on_invalid_key(self, manager):
mock_settings = MagicMock(nostr_privkey="not_a_valid_key")
with patch("infrastructure.nostr.identity.settings", mock_settings):
assert manager.get_keypair() is None
class TestGetRelayUrls:
def test_empty_string_returns_empty_list(self, manager):
mock_settings = MagicMock(nostr_relays="")
with patch("infrastructure.nostr.identity.settings", mock_settings):
assert manager.get_relay_urls() == []
def test_single_relay(self, manager):
mock_settings = MagicMock(nostr_relays="wss://relay.damus.io")
with patch("infrastructure.nostr.identity.settings", mock_settings):
urls = manager.get_relay_urls()
assert urls == ["wss://relay.damus.io"]
def test_multiple_relays(self, manager):
mock_settings = MagicMock(nostr_relays="wss://a.com,wss://b.com, wss://c.com ")
with patch("infrastructure.nostr.identity.settings", mock_settings):
urls = manager.get_relay_urls()
assert urls == ["wss://a.com", "wss://b.com", "wss://c.com"]
class TestBuildProfileEvent:
def test_kind_is_0(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_profile_event(kp)
assert ev["kind"] == 0
def test_content_contains_name(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="A great AI agent",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_profile_event(kp)
profile = json.loads(ev["content"])
assert profile["name"] == "Timmy"
def test_nip05_included_when_set(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="timmy@tower.local",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_profile_event(kp)
profile = json.loads(ev["content"])
assert profile["nip05"] == "timmy@tower.local"
def test_nip05_omitted_when_empty(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_profile_event(kp)
profile = json.loads(ev["content"])
assert "nip05" not in profile
def test_default_name_when_blank(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_profile_event(kp)
profile = json.loads(ev["content"])
assert profile["name"] == "Timmy" # default
class TestBuildCapabilityEvent:
def test_kind_is_31990(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_capability_event(kp)
assert ev["kind"] == 31990
def test_has_d_tag(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_capability_event(kp)
d_tags = [t for t in ev["tags"] if t[0] == "d"]
assert d_tags
assert d_tags[0][1] == "timmy-mission-control"
def test_content_is_json(self, manager, kp):
mock_settings = MagicMock(
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
ev = manager.build_capability_event(kp)
parsed = json.loads(ev["content"])
assert "name" in parsed
assert "capabilities" in parsed
class TestAnnounce:
@pytest.mark.asyncio
async def test_returns_empty_result_when_no_privkey(self, manager):
mock_settings = MagicMock(
nostr_privkey="",
nostr_relays="",
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
result = await manager.announce()
assert result.kind_0_ok is False
assert result.kind_31990_ok is False
@pytest.mark.asyncio
async def test_returns_empty_result_when_no_relays(self, manager, kp):
mock_settings = MagicMock(
nostr_privkey=kp.privkey_hex,
nostr_relays="",
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with patch("infrastructure.nostr.identity.settings", mock_settings):
result = await manager.announce()
assert result.kind_0_ok is False
@pytest.mark.asyncio
async def test_publishes_kind0_and_kind31990(self, manager, kp):
mock_settings = MagicMock(
nostr_privkey=kp.privkey_hex,
nostr_relays="wss://relay.test",
nostr_profile_name="Timmy",
nostr_profile_about="Test agent",
nostr_profile_picture="",
nostr_nip05="timmy@test",
)
with (
patch("infrastructure.nostr.identity.settings", mock_settings),
patch(
"infrastructure.nostr.identity.publish_to_relays",
new=AsyncMock(return_value={"wss://relay.test": True}),
) as mock_publish,
):
result = await manager.announce()
assert mock_publish.call_count == 2 # kind 0 + kind 31990
assert result.kind_0_ok is True
assert result.kind_31990_ok is True
assert result.relay_results["wss://relay.test"] is True
@pytest.mark.asyncio
async def test_degrades_gracefully_on_relay_failure(self, manager, kp):
mock_settings = MagicMock(
nostr_privkey=kp.privkey_hex,
nostr_relays="wss://relay.test",
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with (
patch("infrastructure.nostr.identity.settings", mock_settings),
patch(
"infrastructure.nostr.identity.publish_to_relays",
new=AsyncMock(return_value={"wss://relay.test": False}),
),
):
result = await manager.announce()
assert result.kind_0_ok is False
assert result.kind_31990_ok is False
@pytest.mark.asyncio
async def test_never_raises_on_exception(self, manager, kp):
mock_settings = MagicMock(
nostr_privkey=kp.privkey_hex,
nostr_relays="wss://relay.test",
nostr_profile_name="Timmy",
nostr_profile_about="",
nostr_profile_picture="",
nostr_nip05="",
)
with (
patch("infrastructure.nostr.identity.settings", mock_settings),
patch(
"infrastructure.nostr.identity.publish_to_relays",
new=AsyncMock(side_effect=Exception("relay exploded")),
),
):
# Must not raise
result = await manager.announce()
assert isinstance(result, AnnounceResult)

View File

@@ -0,0 +1,126 @@
"""Unit tests for infrastructure.nostr.keypair."""
from __future__ import annotations
import pytest
from infrastructure.nostr.keypair import (
NostrKeypair,
_bech32_decode,
_bech32_encode,
generate_keypair,
load_keypair,
pubkey_from_privkey,
)
class TestGenerateKeypair:
def test_returns_nostr_keypair(self):
kp = generate_keypair()
assert isinstance(kp, NostrKeypair)
def test_privkey_hex_is_64_chars(self):
kp = generate_keypair()
assert len(kp.privkey_hex) == 64
assert all(c in "0123456789abcdef" for c in kp.privkey_hex)
def test_pubkey_hex_is_64_chars(self):
kp = generate_keypair()
assert len(kp.pubkey_hex) == 64
assert all(c in "0123456789abcdef" for c in kp.pubkey_hex)
def test_nsec_starts_with_nsec1(self):
kp = generate_keypair()
assert kp.nsec.startswith("nsec1")
def test_npub_starts_with_npub1(self):
kp = generate_keypair()
assert kp.npub.startswith("npub1")
def test_two_keypairs_are_different(self):
kp1 = generate_keypair()
kp2 = generate_keypair()
assert kp1.privkey_hex != kp2.privkey_hex
assert kp1.pubkey_hex != kp2.pubkey_hex
def test_privkey_bytes_matches_hex(self):
kp = generate_keypair()
assert kp.privkey_bytes == bytes.fromhex(kp.privkey_hex)
def test_pubkey_bytes_matches_hex(self):
kp = generate_keypair()
assert kp.pubkey_bytes == bytes.fromhex(kp.pubkey_hex)
class TestLoadKeypair:
def test_round_trip_via_privkey_hex(self):
kp1 = generate_keypair()
kp2 = load_keypair(privkey_hex=kp1.privkey_hex)
assert kp2.privkey_hex == kp1.privkey_hex
assert kp2.pubkey_hex == kp1.pubkey_hex
def test_round_trip_via_nsec(self):
kp1 = generate_keypair()
kp2 = load_keypair(nsec=kp1.nsec)
assert kp2.privkey_hex == kp1.privkey_hex
assert kp2.pubkey_hex == kp1.pubkey_hex
def test_raises_if_both_supplied(self):
kp = generate_keypair()
with pytest.raises(ValueError, match="either"):
load_keypair(privkey_hex=kp.privkey_hex, nsec=kp.nsec)
def test_raises_if_neither_supplied(self):
with pytest.raises(ValueError, match="either"):
load_keypair()
def test_raises_on_invalid_hex(self):
with pytest.raises((ValueError, Exception)):
load_keypair(privkey_hex="zzzz")
def test_raises_on_wrong_length_hex(self):
with pytest.raises(ValueError):
load_keypair(privkey_hex="deadbeef") # too short
def test_raises_on_wrong_hrp_bech32(self):
kp = generate_keypair()
# npub is bech32 but with hrp "npub", not "nsec"
with pytest.raises(ValueError):
load_keypair(nsec=kp.npub)
def test_npub_derived_from_privkey(self):
kp1 = generate_keypair()
kp2 = load_keypair(privkey_hex=kp1.privkey_hex)
assert kp2.npub == kp1.npub
class TestPubkeyFromPrivkey:
def test_derives_correct_pubkey(self):
kp = generate_keypair()
derived = pubkey_from_privkey(kp.privkey_hex)
assert derived == kp.pubkey_hex
def test_is_deterministic(self):
kp = generate_keypair()
assert pubkey_from_privkey(kp.privkey_hex) == pubkey_from_privkey(kp.privkey_hex)
class TestBech32:
def test_encode_decode_round_trip(self):
data = bytes(range(32))
encoded = _bech32_encode("test", data)
hrp, decoded = _bech32_decode(encoded)
assert hrp == "test"
assert decoded == data
def test_invalid_checksum_raises(self):
kp = generate_keypair()
mangled = kp.npub[:-1] + ("q" if kp.npub[-1] != "q" else "p")
with pytest.raises(ValueError, match="checksum"):
_bech32_decode(mangled)
def test_npub_roundtrip(self):
kp = generate_keypair()
hrp, pub = _bech32_decode(kp.npub)
assert hrp == "npub"
assert pub.hex() == kp.pubkey_hex