Compare commits
1 Commits
step35/683
...
step35/469
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b30a42d72e |
@@ -1 +1,18 @@
|
||||
# Timmy core module
|
||||
|
||||
"""Sovereign identity and cryptographic key management."""
|
||||
from .identity import (
|
||||
KeyVault,
|
||||
Identity,
|
||||
create_nostr_identity,
|
||||
load_nostr_identity,
|
||||
list_identities,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"KeyVault",
|
||||
"Identity",
|
||||
"create_nostr_identity",
|
||||
"load_nostr_identity",
|
||||
"list_identities",
|
||||
]
|
||||
|
||||
198
src/timmy/identity/__init__.py
Normal file
198
src/timmy/identity/__init__.py
Normal file
@@ -0,0 +1,198 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sovereign Identity Key Management — local-first cryptographic identity.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
|
||||
IDENTITY_ROOT = Path(os.getenv("TIMMY_IDENTITY_DIR", os.path.expanduser("~/.timmy/identity")))
|
||||
SALT_SIZE = 16
|
||||
NONCE_SIZE = 12
|
||||
KEY_DERIVATION_ITERATIONS = 200_000
|
||||
|
||||
|
||||
@dataclass
|
||||
class Identity:
|
||||
name: str
|
||||
public_key: bytes
|
||||
created_at: str
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def npub(self) -> str:
|
||||
return self.public_key.hex()
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
d = asdict(self)
|
||||
d["public_key"] = base64.b64encode(self.public_key).decode()
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> Identity:
|
||||
return cls(
|
||||
name=d["name"],
|
||||
public_key=base64.b64decode(d["public_key"]),
|
||||
created_at=d["created_at"],
|
||||
metadata=d.get("metadata", {}),
|
||||
)
|
||||
|
||||
|
||||
class KeyVault:
|
||||
def __init__(self, root: Optional[Path] = None):
|
||||
self.root = Path(root) if root else IDENTITY_ROOT
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _now_iso(self) -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def _derive_key(self, passphrase: str, salt: bytes) -> bytes:
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=KEY_DERIVATION_ITERATIONS,
|
||||
)
|
||||
return kdf.derive(passphrase.encode("utf-8"))
|
||||
|
||||
def generate_nostr_identity(self, name: str, passphrase: str) -> Identity:
|
||||
private_key = ed25519.Ed25519PrivateKey.generate()
|
||||
public_key = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.Raw,
|
||||
format=serialization.PublicFormat.Raw,
|
||||
)
|
||||
private_bytes = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.Raw,
|
||||
format=serialization.PrivateFormat.Raw,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
salt = secrets.token_bytes(SALT_SIZE)
|
||||
aes_key = self._derive_key(passphrase, salt)
|
||||
aesgcm = AESGCM(aes_key)
|
||||
nonce = secrets.token_bytes(NONCE_SIZE)
|
||||
ct = aesgcm.encrypt(nonce, private_bytes, None)
|
||||
record = {
|
||||
"v": 1,
|
||||
"name": name,
|
||||
"public_key": base64.b64encode(public_key).decode(),
|
||||
"encrypted_private": base64.b64encode(ct).decode(),
|
||||
"salt": base64.b64encode(salt).decode(),
|
||||
"nonce": base64.b64encode(nonce).decode(),
|
||||
"created_at": self._now_iso(),
|
||||
"type": "nostr-ed25519",
|
||||
}
|
||||
path = self.root / f"{name}.json"
|
||||
with open(path, "w") as f:
|
||||
json.dump(record, f, indent=2)
|
||||
path.chmod(0o600)
|
||||
return Identity(name=name, public_key=public_key, created_at=record["created_at"])
|
||||
|
||||
def load_identity(self, name: str, passphrase: str) -> Identity:
|
||||
path = self.root / f"{name}.json"
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Identity '{name}' not found")
|
||||
with open(path) as f:
|
||||
record = json.load(f)
|
||||
salt = base64.b64decode(record["salt"])
|
||||
nonce = base64.b64decode(record["nonce"])
|
||||
ct = base64.b64decode(record["encrypted_private"])
|
||||
aes_key = self._derive_key(passphrase, salt)
|
||||
aesgcm = AESGCM(aes_key)
|
||||
try:
|
||||
aesgcm.decrypt(nonce, ct, None)
|
||||
except Exception as exc:
|
||||
raise ValueError("passphrase incorrect or data corrupted") from exc
|
||||
public_key = base64.b64decode(record["public_key"])
|
||||
return Identity(
|
||||
name=record["name"],
|
||||
public_key=public_key,
|
||||
created_at=record["created_at"],
|
||||
metadata={"type": record.get("type", "unknown")},
|
||||
)
|
||||
|
||||
def list_identities(self) -> list[str]:
|
||||
names = []
|
||||
for p in self.root.glob("*.json"):
|
||||
try:
|
||||
with open(p) as f:
|
||||
record = json.load(f)
|
||||
names.append(record.get("name", p.stem))
|
||||
except Exception:
|
||||
continue
|
||||
return sorted(names)
|
||||
|
||||
def delete_identity(self, name: str, passphrase: str) -> bool:
|
||||
path = self.root / f"{name}.json"
|
||||
if not path.exists():
|
||||
return False
|
||||
self.load_identity(name, passphrase)
|
||||
path.unlink()
|
||||
return True
|
||||
|
||||
def rotate_passphrase(self, name: str, old_pass: str, new_pass: str) -> bool:
|
||||
path = self.root / f"{name}.json"
|
||||
if not path.exists():
|
||||
return False
|
||||
self.load_identity(name, old_pass)
|
||||
with open(path) as f:
|
||||
record = json.load(f)
|
||||
salt_old = base64.b64decode(record["salt"])
|
||||
nonce_old = base64.b64decode(record["nonce"])
|
||||
ct_old = base64.b64decode(record["encrypted_private"])
|
||||
aes_key_old = self._derive_key(old_pass, salt_old)
|
||||
aesgcm_old = AESGCM(aes_key_old)
|
||||
private_bytes = aesgcm_old.decrypt(nonce_old, ct_old, None)
|
||||
salt_new = secrets.token_bytes(SALT_SIZE)
|
||||
aes_key_new = self._derive_key(new_pass, salt_new)
|
||||
aesgcm_new = AESGCM(aes_key_new)
|
||||
nonce_new = secrets.token_bytes(NONCE_SIZE)
|
||||
ct_new = aesgcm_new.encrypt(nonce_new, private_bytes, None)
|
||||
record.update(
|
||||
encrypted_private=base64.b64encode(ct_new).decode(),
|
||||
salt=base64.b64encode(salt_new).decode(),
|
||||
nonce=base64.b64encode(nonce_new).decode(),
|
||||
)
|
||||
with open(path, "w") as f:
|
||||
json.dump(record, f, indent=2)
|
||||
return True
|
||||
|
||||
|
||||
_vault: Optional[KeyVault] = None
|
||||
|
||||
def get_vault() -> KeyVault:
|
||||
global _vault
|
||||
if _vault is None:
|
||||
_vault = KeyVault()
|
||||
return _vault
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Identity",
|
||||
"KeyVault",
|
||||
"get_vault",
|
||||
"create_nostr_identity",
|
||||
"load_nostr_identity",
|
||||
"list_identities",
|
||||
]
|
||||
|
||||
# Convenience wrappers
|
||||
def create_nostr_identity(name: str, passphrase: str, vault: Optional[KeyVault] = None) -> Identity:
|
||||
return (vault or get_vault()).generate_nostr_identity(name, passphrase)
|
||||
|
||||
def load_nostr_identity(name: str, passphrase: str, vault: Optional[KeyVault] = None) -> Identity:
|
||||
return (vault or get_vault()).load_identity(name, passphrase)
|
||||
|
||||
def list_identities(vault: Optional[KeyVault] = None) -> list[str]:
|
||||
return (vault or get_vault()).list_identities()
|
||||
58
tests/timmy/test_identity.py
Normal file
58
tests/timmy/test_identity.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Tests for timmy.identity module."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.identity import KeyVault, Identity
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_vault(tmp_path: Path) -> KeyVault:
|
||||
return KeyVault(root=tmp_path / "identity")
|
||||
|
||||
|
||||
class TestKeyVault:
|
||||
def test_generate_nostr_identity_creates_files(self, tmp_vault: KeyVault) -> None:
|
||||
identity = tmp_vault.generate_nostr_identity("alice", "s3cr3t")
|
||||
assert identity.name == "alice"
|
||||
assert len(identity.public_key) == 32
|
||||
files = list(tmp_vault.root.glob("*.json"))
|
||||
assert len(files) == 1
|
||||
|
||||
def test_round_trip(self, tmp_vault: KeyVault) -> None:
|
||||
tmp_vault.generate_nostr_identity("bob", "pass")
|
||||
loaded = tmp_vault.load_identity("bob", "pass")
|
||||
assert loaded.public_key == tmp_vault.load_identity("bob", "pass").public_key
|
||||
|
||||
def test_wrong_passphrase_raises(self, tmp_vault: KeyVault) -> None:
|
||||
tmp_vault.generate_nostr_identity("carol", "right-pass")
|
||||
with pytest.raises(ValueError, match="passphrase incorrect"):
|
||||
tmp_vault.load_identity("carol", "wrong-pass")
|
||||
|
||||
def test_list_identities(self, tmp_vault: KeyVault) -> None:
|
||||
tmp_vault.generate_nostr_identity("dave", "p1")
|
||||
tmp_vault.generate_nostr_identity("eve", "p2")
|
||||
names = tmp_vault.list_identities()
|
||||
assert set(names) == {"dave", "eve"}
|
||||
|
||||
def test_delete_identity(self, tmp_vault: KeyVault) -> None:
|
||||
tmp_vault.generate_nostr_identity("frank", "secret")
|
||||
assert tmp_vault.delete_identity("frank", "secret") is True
|
||||
assert not (tmp_vault.root / "frank.json").exists()
|
||||
|
||||
def test_rotate_passphrase(self, tmp_vault: KeyVault) -> None:
|
||||
tmp_vault.generate_nostr_identity("grace", "old")
|
||||
ok = tmp_vault.rotate_passphrase("grace", "old", "new")
|
||||
assert ok is True
|
||||
with pytest.raises(ValueError):
|
||||
tmp_vault.load_identity("grace", "old")
|
||||
loaded = tmp_vault.load_identity("grace", "new")
|
||||
assert loaded.name == "grace"
|
||||
|
||||
def test_npub_format(self, tmp_vault: KeyVault) -> None:
|
||||
identity = tmp_vault.generate_nostr_identity("henry", "pw")
|
||||
npub = identity.npub()
|
||||
assert len(npub) == 64
|
||||
assert all(c in "0123456789abcdef" for c in npub)
|
||||
Reference in New Issue
Block a user