"""Human Confirmation Daemon — HTTP server for two-factor action approval. Implements Vitalik's Pattern 1: "The new 'two-factor confirmation' is that the two factors are the human and the LLM." This daemon runs on localhost:6000 and provides a simple HTTP API for the agent to request human approval before executing high-risk actions. Threat model: - LLM jailbreaks: Remote content "hacking" the LLM to perform malicious actions - LLM accidents: LLM accidentally performing dangerous operations - The human acts as the second factor — the agent proposes, the human disposes Architecture: - Agent detects high-risk action → POST /confirm with action details - Daemon stores pending request, sends notification to user - User approves/denies via POST /respond (Telegram, CLI, or direct HTTP) - Agent receives decision and proceeds or aborts Usage: # Start daemon (usually managed by gateway) from tools.confirmation_daemon import ConfirmationDaemon daemon = ConfirmationDaemon(port=6000) daemon.start() # Request approval (from agent code) from tools.confirmation_daemon import request_confirmation approved = request_confirmation( action="send_email", description="Send email to alice@example.com", risk_level="high", payload={"to": "alice@example.com", "subject": "Meeting notes"}, timeout=300, ) """ from __future__ import annotations import asyncio import json import logging import os import threading import time import uuid from dataclasses import dataclass, field, asdict from enum import Enum, auto from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) class RiskLevel(Enum): """Risk classification for actions requiring confirmation.""" LOW = "low" # Log only, no confirmation needed MEDIUM = "medium" # Confirm for non-whitelisted targets HIGH = "high" # Always confirm CRITICAL = "critical" # Always confirm + require explicit reason class ConfirmationStatus(Enum): """Status of a pending confirmation request.""" PENDING = "pending" APPROVED = "approved" DENIED = "denied" EXPIRED = "expired" AUTO_APPROVED = "auto_approved" @dataclass class ConfirmationRequest: """A request for human confirmation of a high-risk action.""" request_id: str action: str # Action type: send_email, send_message, crypto_tx, etc. description: str # Human-readable description of what will happen risk_level: str # low, medium, high, critical payload: Dict[str, Any] # Action-specific data (sanitized) session_key: str = "" # Session that initiated the request created_at: float = 0.0 expires_at: float = 0.0 status: str = ConfirmationStatus.PENDING.value decided_at: float = 0.0 decided_by: str = "" # "human", "auto", "whitelist" reason: str = "" # Optional reason for denial def __post_init__(self): if not self.created_at: self.created_at = time.time() if not self.expires_at: self.expires_at = self.created_at + 300 # 5 min default if not self.request_id: self.request_id = str(uuid.uuid4())[:12] @property def is_expired(self) -> bool: return time.time() > self.expires_at @property def is_pending(self) -> bool: return self.status == ConfirmationStatus.PENDING.value and not self.is_expired def to_dict(self) -> Dict[str, Any]: d = asdict(self) d["is_expired"] = self.is_expired d["is_pending"] = self.is_pending return d # ========================================================================= # Action categories (Vitalik's threat model) # ========================================================================= ACTION_CATEGORIES = { # Messaging — outbound communication to external parties "send_email": RiskLevel.HIGH, "send_message": RiskLevel.MEDIUM, # Depends on recipient "send_signal": RiskLevel.HIGH, "send_telegram": RiskLevel.MEDIUM, "send_discord": RiskLevel.MEDIUM, "post_social": RiskLevel.HIGH, # Financial / crypto "crypto_tx": RiskLevel.CRITICAL, "sign_transaction": RiskLevel.CRITICAL, "access_wallet": RiskLevel.CRITICAL, "modify_balance": RiskLevel.CRITICAL, # System modification "install_software": RiskLevel.HIGH, "modify_system_config": RiskLevel.HIGH, "modify_firewall": RiskLevel.CRITICAL, "add_ssh_key": RiskLevel.CRITICAL, "create_user": RiskLevel.CRITICAL, # Data access "access_contacts": RiskLevel.MEDIUM, "access_calendar": RiskLevel.LOW, "read_private_files": RiskLevel.MEDIUM, "upload_data": RiskLevel.HIGH, "share_credentials": RiskLevel.CRITICAL, # Network "open_port": RiskLevel.HIGH, "modify_dns": RiskLevel.HIGH, "expose_service": RiskLevel.CRITICAL, } # Default: any unrecognized action is MEDIUM risk DEFAULT_RISK_LEVEL = RiskLevel.MEDIUM def classify_action(action: str) -> RiskLevel: """Classify an action by its risk level.""" return ACTION_CATEGORIES.get(action, DEFAULT_RISK_LEVEL) # ========================================================================= # Whitelist configuration # ========================================================================= _DEFAULT_WHITELIST = { "send_message": { "targets": [], # Contact names/IDs that don't need confirmation }, "send_email": { "targets": [], # Email addresses that don't need confirmation "self_only": True, # send-to-self always allowed }, } def _load_whitelist() -> Dict[str, Any]: """Load action whitelist from config.""" config_path = Path.home() / ".hermes" / "approval_whitelist.json" if config_path.exists(): try: with open(config_path) as f: return json.load(f) except Exception as e: logger.warning("Failed to load approval whitelist: %s", e) return dict(_DEFAULT_WHITELIST) def _is_whitelisted(action: str, payload: Dict[str, Any], whitelist: Dict) -> bool: """Check if an action is pre-approved by the whitelist.""" action_config = whitelist.get(action, {}) if not action_config: return False # Check target-based whitelist targets = action_config.get("targets", []) target = payload.get("to") or payload.get("recipient") or payload.get("target", "") if target and target in targets: return True # Self-only email if action_config.get("self_only") and action == "send_email": sender = payload.get("from", "") recipient = payload.get("to", "") if sender and recipient and sender.lower() == recipient.lower(): return True return False # ========================================================================= # Confirmation daemon # ========================================================================= class ConfirmationDaemon: """HTTP daemon for human confirmation of high-risk actions. Runs on localhost:PORT (default 6000). Provides: - POST /confirm — agent requests human approval - POST /respond — human approves/denies - GET /pending — list pending requests - GET /health — health check """ def __init__( self, host: str = "127.0.0.1", port: int = 6000, default_timeout: int = 300, notify_callback: Optional[Callable] = None, ): self.host = host self.port = port self.default_timeout = default_timeout self.notify_callback = notify_callback self._pending: Dict[str, ConfirmationRequest] = {} self._history: List[ConfirmationRequest] = [] self._lock = threading.Lock() self._whitelist = _load_whitelist() self._app = None self._runner = None def request( self, action: str, description: str, payload: Optional[Dict[str, Any]] = None, risk_level: Optional[str] = None, session_key: str = "", timeout: Optional[int] = None, ) -> ConfirmationRequest: """Create a confirmation request. Returns the request. Check .status to see if it was immediately auto-approved (whitelisted) or is pending human review. """ payload = payload or {} # Classify risk if not specified if risk_level is None: risk_level = classify_action(action).value # Check whitelist if risk_level in ("low",) or _is_whitelisted(action, payload, self._whitelist): req = ConfirmationRequest( request_id=str(uuid.uuid4())[:12], action=action, description=description, risk_level=risk_level, payload=payload, session_key=session_key, expires_at=time.time() + (timeout or self.default_timeout), status=ConfirmationStatus.AUTO_APPROVED.value, decided_at=time.time(), decided_by="whitelist", ) with self._lock: self._history.append(req) logger.info("Auto-approved whitelisted action: %s", action) return req # Create pending request req = ConfirmationRequest( request_id=str(uuid.uuid4())[:12], action=action, description=description, risk_level=risk_level, payload=payload, session_key=session_key, expires_at=time.time() + (timeout or self.default_timeout), ) with self._lock: self._pending[req.request_id] = req # Notify human if self.notify_callback: try: self.notify_callback(req.to_dict()) except Exception as e: logger.warning("Confirmation notify callback failed: %s", e) logger.info( "Confirmation request %s: %s (%s risk) — waiting for human", req.request_id, action, risk_level, ) return req def respond( self, request_id: str, approved: bool, decided_by: str = "human", reason: str = "", ) -> Optional[ConfirmationRequest]: """Record a human decision on a pending request.""" with self._lock: req = self._pending.get(request_id) if not req: logger.warning("Confirmation respond: unknown request %s", request_id) return None if not req.is_pending: logger.warning("Confirmation respond: request %s already decided", request_id) return req req.status = ( ConfirmationStatus.APPROVED.value if approved else ConfirmationStatus.DENIED.value ) req.decided_at = time.time() req.decided_by = decided_by req.reason = reason # Move to history del self._pending[request_id] self._history.append(req) logger.info( "Confirmation %s: %s by %s", request_id, "APPROVED" if approved else "DENIED", decided_by, ) return req def wait_for_decision( self, request_id: str, timeout: Optional[float] = None ) -> ConfirmationRequest: """Block until a decision is made or timeout expires.""" deadline = time.time() + (timeout or self.default_timeout) while time.time() < deadline: with self._lock: req = self._pending.get(request_id) if req and not req.is_pending: return req if req and req.is_expired: req.status = ConfirmationStatus.EXPIRED.value del self._pending[request_id] self._history.append(req) return req time.sleep(0.5) # Timeout with self._lock: req = self._pending.pop(request_id, None) if req: req.status = ConfirmationStatus.EXPIRED.value self._history.append(req) return req # Shouldn't reach here return ConfirmationRequest( request_id=request_id, action="unknown", description="Request not found", risk_level="high", payload={}, status=ConfirmationStatus.EXPIRED.value, ) def get_pending(self) -> List[Dict[str, Any]]: """Return list of pending confirmation requests.""" self._expire_old() with self._lock: return [r.to_dict() for r in self._pending.values() if r.is_pending] def get_history(self, limit: int = 50) -> List[Dict[str, Any]]: """Return recent confirmation history.""" with self._lock: return [r.to_dict() for r in self._history[-limit:]] def _expire_old(self) -> None: """Move expired requests to history.""" now = time.time() with self._lock: expired = [ rid for rid, req in self._pending.items() if now > req.expires_at ] for rid in expired: req = self._pending.pop(rid) req.status = ConfirmationStatus.EXPIRED.value self._history.append(req) # --- aiohttp HTTP API --- async def _handle_health(self, request): from aiohttp import web return web.json_response({ "status": "ok", "service": "hermes-confirmation-daemon", "pending": len(self._pending), }) async def _handle_confirm(self, request): from aiohttp import web try: body = await request.json() except Exception: return web.json_response({"error": "invalid JSON"}, status=400) action = body.get("action", "") description = body.get("description", "") if not action or not description: return web.json_response( {"error": "action and description required"}, status=400 ) req = self.request( action=action, description=description, payload=body.get("payload", {}), risk_level=body.get("risk_level"), session_key=body.get("session_key", ""), timeout=body.get("timeout"), ) # If auto-approved, return immediately if req.status != ConfirmationStatus.PENDING.value: return web.json_response({ "request_id": req.request_id, "status": req.status, "decided_by": req.decided_by, }) # Otherwise, wait for human decision (with timeout) timeout = min(body.get("timeout", self.default_timeout), 600) result = self.wait_for_decision(req.request_id, timeout=timeout) return web.json_response({ "request_id": result.request_id, "status": result.status, "decided_by": result.decided_by, "reason": result.reason, }) async def _handle_respond(self, request): from aiohttp import web try: body = await request.json() except Exception: return web.json_response({"error": "invalid JSON"}, status=400) request_id = body.get("request_id", "") approved = body.get("approved") if not request_id or approved is None: return web.json_response( {"error": "request_id and approved required"}, status=400 ) result = self.respond( request_id=request_id, approved=bool(approved), decided_by=body.get("decided_by", "human"), reason=body.get("reason", ""), ) if not result: return web.json_response({"error": "unknown request"}, status=404) return web.json_response({ "request_id": result.request_id, "status": result.status, }) async def _handle_pending(self, request): from aiohttp import web return web.json_response({"pending": self.get_pending()}) def _build_app(self): """Build the aiohttp application.""" from aiohttp import web app = web.Application() app.router.add_get("/health", self._handle_health) app.router.add_post("/confirm", self._handle_confirm) app.router.add_post("/respond", self._handle_respond) app.router.add_get("/pending", self._handle_pending) self._app = app return app async def start_async(self) -> None: """Start the daemon as an async server.""" from aiohttp import web app = self._build_app() self._runner = web.AppRunner(app) await self._runner.setup() site = web.TCPSite(self._runner, self.host, self.port) await site.start() logger.info("Confirmation daemon listening on %s:%d", self.host, self.port) async def stop_async(self) -> None: """Stop the daemon.""" if self._runner: await self._runner.cleanup() self._runner = None def start(self) -> None: """Start daemon in a background thread (blocking caller).""" def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.start_async()) loop.run_forever() t = threading.Thread(target=_run, daemon=True, name="confirmation-daemon") t.start() logger.info("Confirmation daemon started in background thread") def start_blocking(self) -> None: """Start daemon and block (for standalone use).""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.start_async()) try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.run_until_complete(self.stop_async()) # ========================================================================= # Convenience API for agent integration # ========================================================================= # Global singleton — initialized by gateway or CLI at startup _daemon: Optional[ConfirmationDaemon] = None def get_daemon() -> Optional[ConfirmationDaemon]: """Get the global confirmation daemon instance.""" return _daemon def init_daemon( host: str = "127.0.0.1", port: int = 6000, notify_callback: Optional[Callable] = None, ) -> ConfirmationDaemon: """Initialize the global confirmation daemon.""" global _daemon _daemon = ConfirmationDaemon( host=host, port=port, notify_callback=notify_callback ) return _daemon def request_confirmation( action: str, description: str, payload: Optional[Dict[str, Any]] = None, risk_level: Optional[str] = None, session_key: str = "", timeout: int = 300, ) -> bool: """Request human confirmation for a high-risk action. This is the primary integration point for agent code. It: 1. Classifies the action risk level 2. Checks the whitelist 3. If confirmation needed, blocks until human responds 4. Returns True if approved, False if denied/expired Args: action: Action type (send_email, crypto_tx, etc.) description: Human-readable description payload: Action-specific data risk_level: Override auto-classification session_key: Session requesting approval timeout: Seconds to wait for human response Returns: True if approved, False if denied or expired. """ daemon = get_daemon() if not daemon: logger.warning( "No confirmation daemon running — DENYING action %s by default. " "Start daemon with init_daemon() or --confirmation-daemon flag.", action, ) return False req = daemon.request( action=action, description=description, payload=payload, risk_level=risk_level, session_key=session_key, timeout=timeout, ) # Auto-approved (whitelisted) if req.status == ConfirmationStatus.AUTO_APPROVED.value: return True # Wait for human result = daemon.wait_for_decision(req.request_id, timeout=timeout) return result.status == ConfirmationStatus.APPROVED.value