#!/usr/bin/env python3 """ The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness. This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py), the body (Evennia/Morrowind), and the visualization surface. Serves HTTP alongside WebSocket: GET /api/briefing — heartbeat + morning report data for the HUD briefing panel """ import asyncio import json import logging import os import signal import sys from datetime import datetime, timezone from http.server import HTTPServer, SimpleHTTPRequestHandler from pathlib import Path from threading import Thread from typing import Any, Dict, Set # Branch protected file - see POLICY.md import websockets # ── HTTP Briefing Endpoint ───────────────────────────────────────────── HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json" REPORTS_DIR = Path.home() / ".local" / "timmy" / "reports" CRON_HEARTBEAT_DIR_PRIMARY = Path("/var/run/bezalel/heartbeats") CRON_HEARTBEAT_DIR_FALLBACK = Path.home() / ".bezalel" / "heartbeats" def _read_json_file(path: Path) -> Any: """Read and parse a JSON file. Returns None on failure.""" try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def _resolve_cron_dir() -> Path: """Return the first writable cron heartbeat directory.""" for d in [CRON_HEARTBEAT_DIR_PRIMARY, CRON_HEARTBEAT_DIR_FALLBACK]: if d.exists() and os.access(str(d), os.R_OK): return d return CRON_HEARTBEAT_DIR_FALLBACK def _read_cron_heartbeats() -> list: """Read all .last files from the cron heartbeat directory.""" hb_dir = _resolve_cron_dir() if not hb_dir.exists(): return [] now = datetime.now(timezone.utc).timestamp() jobs = [] for f in sorted(hb_dir.glob("*.last")): data = _read_json_file(f) if data is None: jobs.append({"job": f.stem, "healthy": False, "message": "corrupt"}) continue ts = float(data.get("timestamp", 0)) interval = int(data.get("interval", 3600)) age = now - ts is_stale = age > (2 * interval) jobs.append({ "job": f.stem, "healthy": not is_stale, "age_secs": round(age, 1), "interval": interval, "last_seen": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() if ts else None, "message": f"{'STALE' if is_stale else 'OK'} ({age:.0f}s / {interval}s)" if ts else "no timestamp", }) return jobs def _latest_morning_report() -> Dict[str, Any] | None: """Find the most recent morning report file.""" if not REPORTS_DIR.exists(): return None reports = sorted(REPORTS_DIR.glob("morning-*.json"), reverse=True) if not reports: return None return _read_json_file(reports[0]) def _build_briefing() -> Dict[str, Any]: """Assemble the full briefing payload from real files.""" now = datetime.now(timezone.utc) # Core heartbeat core_hb = _read_json_file(HEARTBEAT_PATH) if core_hb: beat_ts = float(core_hb.get("timestamp", 0)) core_hb["age_secs"] = round(now.timestamp() - beat_ts, 1) if beat_ts else None # Cron heartbeats cron_jobs = _read_cron_heartbeats() healthy_count = sum(1 for j in cron_jobs if j.get("healthy")) stale_count = sum(1 for j in cron_jobs if not j.get("healthy")) # Morning report report = _latest_morning_report() return { "generated_at": now.isoformat(), "core_heartbeat": core_hb, "cron_heartbeat": { "jobs": cron_jobs, "healthy_count": healthy_count, "stale_count": stale_count, }, "morning_report": report, } class BriefingHandler(SimpleHTTPRequestHandler): """Minimal HTTP handler that only serves /api/briefing.""" def do_GET(self): if self.path == "/api/briefing": try: data = _build_briefing() body = json.dumps(data).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) except Exception as e: self.send_error(500, str(e)) elif self.path == "/api/health": body = json.dumps({"status": "ok"}).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) else: self.send_error(404) def log_message(self, fmt, *args): pass # Suppress HTTP access logs — WS gateway logs are enough def start_http_server(port: int = 8766): """Run the HTTP server in a daemon thread.""" server = HTTPServer(("0.0.0.0", port), BriefingHandler) thread = Thread(target=server.serve_forever, daemon=True) thread.start() logger = logging.getLogger("nexus-gateway") logger.info(f"Briefing HTTP server started on http://0.0.0.0:{port}") return server # Configuration PORT = 8765 HOST = "0.0.0.0" # Allow external connections if needed # Logging setup logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger("nexus-gateway") # State clients: Set[websockets.WebSocketServerProtocol] = set() async def broadcast_handler(websocket: websockets.WebSocketServerProtocol): """Handles individual client connections and message broadcasting.""" clients.add(websocket) addr = websocket.remote_address logger.info(f"Client connected from {addr}. Total clients: {len(clients)}") try: async for message in websocket: # Parse for logging/validation if it's JSON try: data = json.loads(message) msg_type = data.get("type", "unknown") # Optional: log specific important message types if msg_type in ["agent_register", "thought", "action"]: logger.debug(f"Received {msg_type} from {addr}") except (json.JSONDecodeError, TypeError): pass # Broadcast to all OTHER clients if not clients: continue disconnected = set() # Create broadcast tasks for efficiency tasks = [] for client in clients: if client != websocket and client.open: tasks.append(asyncio.create_task(client.send(message))) if tasks: results = await asyncio.gather(*tasks, return_exceptions=True) for i, result in enumerate(results): if isinstance(result, Exception): # Find the client that failed target_client = [c for c in clients if c != websocket][i] logger.error(f"Failed to send to a client {target_client.remote_address}: {result}") disconnected.add(target_client) if disconnected: clients.difference_update(disconnected) except websockets.exceptions.ConnectionClosed: logger.debug(f"Connection closed by client {addr}") except Exception as e: logger.error(f"Error handling client {addr}: {e}") finally: clients.discard(websocket) logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}") async def main(): """Main server loop with graceful shutdown.""" # Start HTTP briefing endpoint alongside WS http_server = start_http_server(port=8766) logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}") # Set up signal handlers for graceful shutdown loop = asyncio.get_running_loop() stop = loop.create_future() def shutdown(): if not stop.done(): stop.set_result(None) for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, shutdown) except NotImplementedError: # Signal handlers not supported on Windows pass async with websockets.serve(broadcast_handler, HOST, PORT): logger.info("Gateway is ready and listening.") await stop logger.info("Shutting down Nexus WS gateway...") # Close all client connections if clients: logger.info(f"Closing {len(clients)} active connections...") close_tasks = [client.close() for client in clients] await asyncio.gather(*close_tasks, return_exceptions=True) logger.info("Shutdown complete.") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass except Exception as e: logger.critical(f"Fatal server error: {e}") sys.exit(1)