Files
the-nexus/server.py
Alexander Whitestone f83e103d27 feat: M6-P0 Foundation — cell spec, lazarus-pit daemon, health endpoint, issue template
Implements all P0 deliverables for the Lazarus Pit epic.

- docs/lazarus-pit/mission-cell-spec.md: Canonical `/var/missions/<uuid>/`
  directory layout, file schemas (cell.json, events.jsonl, heartbeat,
  daemon state), lifecycle state machine, and isolation guarantees.

- bin/lazarus_pit.py: Daemon skeleton with config loading, cell discovery,
  per-agent heartbeat polling, stale detection, resurrection stub (P3 placeholder),
  graceful signal handling, PID file management, and CLI subcommands
  (--status, --list-cells).

- config/lazarus-pit.toml: Documented config file with all tunable knobs:
  cells root, heartbeat thresholds, revive policy, gateway coordinates,
  Gitea notification settings.

- server.py: Added HTTP health heartbeat endpoint on NEXUS_HEALTH_PORT (default
  8766). GET /health returns JSON with status, uptime, connected_clients, ws_port,
  and timestamp. Consumed by lazarus-pit to verify gateway liveness.
  Runs in a background daemon thread — zero impact on existing WS logic.

- .gitea/ISSUE_TEMPLATE/mission-proposal.md: Gitea issue template for
  proposing new mission cells (agents, scope, isolation requirements,
  cell config, success criteria).

- tests/test_lazarus_pit.py: 25 unit tests covering config, cell discovery,
  heartbeat reading, health polling, resurrection stub, and CLI.

- tests/test_gateway_health.py: 6 unit tests covering health endpoint
  schema, 404 for unknown paths, uptime, and client count reflection.

All 31 new tests pass. Pre-existing test failures unchanged.

Fixes #879
Refs #878

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 14:16:45 -04:00

184 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
Health heartbeat endpoint (added in #879 — M6-P0 Foundation):
GET http://<host>:<HEALTH_PORT>/health
Returns 200 JSON with gateway status and connected-client count.
Used by the lazarus-pit daemon to verify the gateway is alive.
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from typing import Set
import websockets
# Configuration
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
# Health heartbeat endpoint — monitored by lazarus-pit daemon (#879)
HEALTH_PORT = int(os.environ.get("NEXUS_HEALTH_PORT", "8766"))
HEALTH_HOST = os.environ.get("NEXUS_HEALTH_HOST", "127.0.0.1")
# Gateway start time for uptime reporting
_start_time = time.time()
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("nexus-gateway")
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
# ---------------------------------------------------------------------------
# Health heartbeat HTTP endpoint — consumed by lazarus-pit (#879)
# ---------------------------------------------------------------------------
class _HealthHandler(BaseHTTPRequestHandler):
"""Minimal HTTP handler for the /health liveness endpoint."""
def do_GET(self): # noqa: N802
if self.path != "/health":
self.send_response(404)
self.end_headers()
return
payload = {
"status": "ok",
"service": "nexus-gateway",
"uptime_seconds": round(time.time() - _start_time, 1),
"connected_clients": len(clients),
"ws_port": PORT,
"ts": time.time(),
}
body = json.dumps(payload).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args): # noqa: N802
# Suppress default access log spam; use our own logger for errors only
if args and str(args[1]) not in ("200", "204"):
logger.debug("Health endpoint: " + fmt % args)
def _start_health_server() -> Thread:
"""Start the health HTTP server in a background daemon thread."""
server = HTTPServer((HEALTH_HOST, HEALTH_PORT), _HealthHandler)
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
logger.info(
"Health heartbeat endpoint listening on http://%s:%d/health",
HEALTH_HOST, HEALTH_PORT,
)
return thread
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
addr = websocket.remote_address
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
try:
async for message in websocket:
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
# Optional: log specific important message types
if msg_type in ["agent_register", "thought", "action"]:
logger.debug(f"Received {msg_type} from {addr}")
except (json.JSONDecodeError, TypeError):
pass
# Broadcast to all OTHER clients
if not clients:
continue
disconnected = set()
# Create broadcast tasks for efficiency
tasks = []
for client in clients:
if client != websocket and client.open:
tasks.append(asyncio.create_task(client.send(message)))
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
# Find the client that failed
target_client = [c for c in clients if c != websocket][i]
logger.error(f"Failed to send to a client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
except websockets.exceptions.ConnectionClosed:
logger.debug(f"Connection closed by client {addr}")
except Exception as e:
logger.error(f"Error handling client {addr}: {e}")
finally:
clients.discard(websocket)
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
async def main():
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
_start_health_server()
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop = loop.create_future()
def shutdown():
if not stop.done():
stop.set_result(None)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, shutdown)
except NotImplementedError:
# Signal handlers not supported on Windows
pass
async with websockets.serve(broadcast_handler, HOST, PORT):
logger.info("Gateway is ready and listening.")
await stop
logger.info("Shutting down Nexus WS gateway...")
# Close all client connections
if clients:
logger.info(f"Closing {len(clients)} active connections...")
close_tasks = [client.close() for client in clients]
await asyncio.gather(*close_tasks, return_exceptions=True)
logger.info("Shutdown complete.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
except Exception as e:
logger.critical(f"Fatal server error: {e}")
sys.exit(1)