Compare commits
1 Commits
nexusburn/
...
nexusburn/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66f8e61111 |
@@ -27,10 +27,8 @@ jobs:
|
||||
username: ${{ secrets.DEPLOY_USER }}
|
||||
key: ${{ secrets.DEPLOY_SSH_KEY }}
|
||||
script: |
|
||||
cd ~/the-nexus || git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus.git ~/the-nexus
|
||||
cd ~/the-nexus || git clone http://143.198.27.163:3000/Timmy_Foundation/the-nexus.git ~/the-nexus
|
||||
cd ~/the-nexus
|
||||
git fetch origin main
|
||||
git reset --hard origin/main
|
||||
docker compose build nexus-main
|
||||
docker compose up -d --force-recreate nexus-main
|
||||
echo "Nexus deployed — HTTP :8080, WS :8765"
|
||||
./deploy.sh main
|
||||
|
||||
@@ -11,16 +11,11 @@ COPY nexus/ nexus/
|
||||
COPY server.py ./
|
||||
|
||||
# Frontend assets referenced by index.html
|
||||
COPY index.html help.html style.css app.js boot.js bootstrap.mjs gofai_worker.js mempalace.js service-worker.js manifest.json ./
|
||||
COPY index.html help.html style.css app.js service-worker.js manifest.json ./
|
||||
|
||||
# Config/data
|
||||
COPY portals.json vision.json robots.txt ./
|
||||
|
||||
# Icons
|
||||
COPY icons/ icons/
|
||||
|
||||
# Expose HTTP (static) and WebSocket
|
||||
EXPOSE 8080
|
||||
EXPOSE 8765
|
||||
|
||||
CMD ["python3", "server.py"]
|
||||
|
||||
6
app.js
6
app.js
@@ -2188,11 +2188,7 @@ function connectHermes() {
|
||||
}
|
||||
|
||||
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
||||
// WS gateway runs on HTTP port - 115 (8080->8765, 8081->8766)
|
||||
const wsHost = window.location.hostname;
|
||||
const httpPort = parseInt(window.location.port) || 8080;
|
||||
const wsPort = httpPort === 8081 ? 8766 : 8765;
|
||||
const wsUrl = `${protocol}//${wsHost}:${wsPort}/api/world/ws`;
|
||||
const wsUrl = `${protocol}//${window.location.host}/api/world/ws`;
|
||||
|
||||
console.log(`Connecting to Hermes at ${wsUrl}...`);
|
||||
hermesWs = new WebSocket(wsUrl);
|
||||
|
||||
12
deploy.sh
12
deploy.sh
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
# deploy.sh — spin up (or update) the Nexus staging environment
|
||||
# Usage: ./deploy.sh — rebuild and restart nexus-main (HTTP :8080, WS :8765)
|
||||
# ./deploy.sh staging — rebuild and restart nexus-staging (HTTP :8081, WS :8766)
|
||||
# Usage: ./deploy.sh — rebuild and restart nexus-main (port 4200)
|
||||
# ./deploy.sh staging — rebuild and restart nexus-staging (port 4201)
|
||||
set -euo pipefail
|
||||
|
||||
SERVICE="${1:-nexus-main}"
|
||||
@@ -14,12 +14,4 @@ esac
|
||||
echo "==> Deploying $SERVICE …"
|
||||
docker compose build "$SERVICE"
|
||||
docker compose up -d --force-recreate "$SERVICE"
|
||||
|
||||
if [ "$SERVICE" = "nexus-main" ]; then
|
||||
echo "==> HTTP: http://localhost:8080"
|
||||
echo "==> WS: ws://localhost:8765"
|
||||
else
|
||||
echo "==> HTTP: http://localhost:8081"
|
||||
echo "==> WS: ws://localhost:8766"
|
||||
fi
|
||||
echo "==> Done. Container: $SERVICE"
|
||||
|
||||
@@ -6,12 +6,10 @@ services:
|
||||
container_name: nexus-main
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8080:8080"
|
||||
- "8765:8765"
|
||||
nexus-staging:
|
||||
build: .
|
||||
container_name: nexus-staging
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- "8081:8080"
|
||||
- "8766:8765"
|
||||
- "8766:8765"
|
||||
@@ -195,14 +195,29 @@ class ChatLog:
|
||||
self._history[room].append(entry)
|
||||
if len(self._history[room]) > self._max_per_room:
|
||||
self._history[room] = self._history[room][-self._max_per_room:]
|
||||
# Persist to JSONL
|
||||
# Persist to JSONL inside the lock to prevent interleaved writes
|
||||
self._persist(entry)
|
||||
return entry
|
||||
|
||||
def _persist(self, entry: dict):
|
||||
"""Write a single entry to the JSONL log file."""
|
||||
log_path = self._get_log_path()
|
||||
if log_path is None:
|
||||
return
|
||||
try:
|
||||
CHATLOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(CHATLOG_FILE, 'a') as f:
|
||||
f.write(json.dumps(entry) + '\n')
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(log_path, 'a', encoding='utf-8') as fh:
|
||||
fh.write(json.dumps(entry, ensure_ascii=False) + '\n')
|
||||
except Exception as e:
|
||||
print(f"[ChatLog] Persist failed: {e}")
|
||||
return entry
|
||||
|
||||
@staticmethod
|
||||
def _get_log_path():
|
||||
"""Resolve CHATLOG_FILE safely -- returns None if not available."""
|
||||
try:
|
||||
return CHATLOG_FILE
|
||||
except NameError:
|
||||
return None
|
||||
|
||||
def get_history(self, room: str, limit: int = 50, since: str = None) -> list[dict]:
|
||||
"""Get recent chat history for a room.
|
||||
@@ -2173,6 +2188,8 @@ class BridgeHandler(BaseHTTPRequestHandler):
|
||||
else:
|
||||
response = session.chat(message)
|
||||
|
||||
chat_log.log(room, 'ask', message, user_id=user_id, username=username)
|
||||
|
||||
# Auto-notify: crisis detection — scan response for crisis protocol keywords
|
||||
crisis_keywords = ["988", "741741", "safe right now", "crisis", "Crisis Text Line"]
|
||||
if any(kw in response for kw in crisis_keywords):
|
||||
@@ -2245,6 +2262,7 @@ class BridgeHandler(BaseHTTPRequestHandler):
|
||||
return
|
||||
|
||||
event = presence_manager.say(user_id, username, room, message)
|
||||
chat_log.log(room, 'say', message, user_id=user_id, username=username)
|
||||
# Get list of players who should see it
|
||||
players = presence_manager.get_players_in_room(room)
|
||||
self._json_response({
|
||||
@@ -2616,6 +2634,7 @@ class BridgeHandler(BaseHTTPRequestHandler):
|
||||
if not arg:
|
||||
return {"command": "say", "error": "Say what?"}
|
||||
event = presence_manager.say(user_id, username, room, arg)
|
||||
chat_log.log(room, 'say', arg, user_id=user_id, username=username)
|
||||
players = presence_manager.get_players_in_room(room)
|
||||
return {
|
||||
"command": "say",
|
||||
@@ -2634,6 +2653,7 @@ class BridgeHandler(BaseHTTPRequestHandler):
|
||||
not any(p["user_id"] == user_id for p in presence_manager.get_players_in_room(room)):
|
||||
presence_manager.enter_room(user_id, username, room)
|
||||
response = session.chat(arg)
|
||||
chat_log.log(room, 'ask', arg, user_id=user_id, username=username)
|
||||
return {
|
||||
"command": "ask",
|
||||
"message": arg,
|
||||
|
||||
16
run.sh
16
run.sh
@@ -1,16 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# run.sh — run Nexus locally without Docker
|
||||
# Usage: ./run.sh — HTTP :8080, WS :8765
|
||||
# NEXUS_HTTP_PORT=9090 ./run.sh — custom HTTP port
|
||||
set -euo pipefail
|
||||
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
# Install deps if missing
|
||||
if ! python3 -c "import websockets" 2>/dev/null; then
|
||||
echo "==> Installing dependencies..."
|
||||
pip3 install -r requirements.txt
|
||||
fi
|
||||
|
||||
echo "==> Starting Nexus server..."
|
||||
exec python3 server.py
|
||||
233
server.py
233
server.py
@@ -1,190 +1,92 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
The Nexus — Unified HTTP + WebSocket server.
|
||||
|
||||
Serves static frontend files (Three.js app) over HTTP on port 8080
|
||||
and runs the WebSocket gateway on port 8765.
|
||||
Single-process, single-command deployment — no nginx required.
|
||||
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
|
||||
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
|
||||
the body (Evennia/Morrowind), and the visualization surface.
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
from http import HTTPStatus
|
||||
from pathlib import Path
|
||||
from typing import Set
|
||||
|
||||
# Branch protected file - see POLICY.md
|
||||
import websockets
|
||||
import websockets.asyncio.server
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
HTTP_PORT = int(os.environ.get("NEXUS_HTTP_PORT", "8080"))
|
||||
WS_PORT = int(os.environ.get("NEXUS_WS_PORT", "8765"))
|
||||
HOST = os.environ.get("NEXUS_HOST", "0.0.0.0")
|
||||
ROOT = Path(__file__).resolve().parent
|
||||
PORT = 8765
|
||||
HOST = "0.0.0.0" # Allow external connections if needed
|
||||
|
||||
# Static file extensions we're willing to serve
|
||||
SAFE_SUFFIXES = {
|
||||
".html", ".htm", ".css", ".js", ".mjs", ".json",
|
||||
".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico",
|
||||
".woff", ".woff2", ".ttf", ".eot",
|
||||
".txt", ".xml", ".webmanifest",
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logging
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logging setup
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
format='%(asctime)s [%(levelname)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
log = logging.getLogger("nexus")
|
||||
logger = logging.getLogger("nexus-gateway")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP — static file server
|
||||
# ---------------------------------------------------------------------------
|
||||
async def http_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
"""Minimal async HTTP/1.1 server for static files."""
|
||||
try:
|
||||
# Read request line + headers (max 8 KB)
|
||||
data = b""
|
||||
while b"\r\n\r\n" not in data and len(data) < 8192:
|
||||
chunk = await asyncio.wait_for(reader.read(4096), timeout=10)
|
||||
if not chunk:
|
||||
return
|
||||
data += chunk
|
||||
header_text = data.split(b"\r\n\r\n", 1)[0].decode("utf-8", errors="replace")
|
||||
lines = header_text.split("\r\n")
|
||||
request_line = lines[0]
|
||||
parts = request_line.split(" ", 2)
|
||||
if len(parts) < 2:
|
||||
writer.close()
|
||||
return
|
||||
method, raw_path = parts[0], parts[1]
|
||||
# State
|
||||
clients: Set[websockets.WebSocketServerProtocol] = set()
|
||||
|
||||
if method not in ("GET", "HEAD"):
|
||||
_write_response(writer, HTTPStatus.METHOD_NOT_ALLOWED, b"Method Not Allowed")
|
||||
return
|
||||
|
||||
# Normalise path — prevent directory traversal
|
||||
safe_path = raw_path.split("?", 1)[0].split("#", 1)[0]
|
||||
safe_path = os.path.normpath(safe_path).lstrip("/")
|
||||
if not safe_path:
|
||||
safe_path = "index.html"
|
||||
|
||||
file_path = ROOT / safe_path
|
||||
# Reject traversal
|
||||
if not str(file_path.resolve()).startswith(str(ROOT)):
|
||||
_write_response(writer, HTTPStatus.FORBIDDEN, b"Forbidden")
|
||||
return
|
||||
|
||||
if not file_path.exists() or file_path.is_dir():
|
||||
# Try index.html for directories
|
||||
if file_path.is_dir() and (file_path / "index.html").exists():
|
||||
file_path = file_path / "index.html"
|
||||
else:
|
||||
_write_response(writer, HTTPStatus.NOT_FOUND, b"Not Found")
|
||||
return
|
||||
|
||||
suffix = file_path.suffix.lower()
|
||||
if suffix not in SAFE_SUFFIXES:
|
||||
_write_response(writer, HTTPStatus.FORBIDDEN, b"Forbidden")
|
||||
return
|
||||
|
||||
content_type = mimetypes.guess_type(str(file_path))[0] or "application/octet-stream"
|
||||
body = file_path.read_bytes()
|
||||
headers = {
|
||||
"Content-Type": content_type,
|
||||
"Content-Length": str(len(body)),
|
||||
"Cache-Control": "no-cache",
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
}
|
||||
_write_response(writer, HTTPStatus.OK, body, headers, method == "HEAD")
|
||||
except (asyncio.TimeoutError, ConnectionError):
|
||||
pass
|
||||
except Exception as exc:
|
||||
log.warning("HTTP handler error: %s", exc)
|
||||
finally:
|
||||
try:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _write_response(writer, status, body, headers=None, head_only=False):
|
||||
line = f"HTTP/1.1 {status.value} {status.phrase}\r\n"
|
||||
hdr = ""
|
||||
if headers:
|
||||
for k, v in headers.items():
|
||||
hdr += f"{k}: {v}\r\n"
|
||||
response = (line + hdr + "\r\n").encode()
|
||||
if not head_only:
|
||||
response += body if isinstance(body, bytes) else body.encode()
|
||||
writer.write(response)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket — broadcast gateway
|
||||
# ---------------------------------------------------------------------------
|
||||
ws_clients: Set[websockets.asyncio.server.ServerConnection] = set()
|
||||
|
||||
|
||||
async def ws_handler(websocket: websockets.asyncio.server.ServerConnection):
|
||||
ws_clients.add(websocket)
|
||||
log.info("WS client connected from %s. Total: %d", websocket.remote_address, len(ws_clients))
|
||||
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
|
||||
"""Handles individual client connections and message broadcasting."""
|
||||
clients.add(websocket)
|
||||
addr = websocket.remote_address
|
||||
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
|
||||
|
||||
try:
|
||||
async for message in websocket:
|
||||
# Parse for logging/validation if it's JSON
|
||||
try:
|
||||
data = json.loads(message)
|
||||
msg_type = data.get("type", "unknown")
|
||||
if msg_type in ("agent_register", "thought", "action"):
|
||||
log.debug("WS %s from %s", msg_type, websocket.remote_address)
|
||||
# Optional: log specific important message types
|
||||
if msg_type in ["agent_register", "thought", "action"]:
|
||||
logger.debug(f"Received {msg_type} from {addr}")
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
# Broadcast to all OTHER clients
|
||||
if not clients:
|
||||
continue
|
||||
|
||||
disconnected = set()
|
||||
for client in ws_clients:
|
||||
# Create broadcast tasks, tracking which client each task targets
|
||||
task_client_pairs = []
|
||||
for client in clients:
|
||||
if client != websocket and client.open:
|
||||
try:
|
||||
await client.send(message)
|
||||
except Exception:
|
||||
disconnected.add(client)
|
||||
ws_clients.difference_update(disconnected)
|
||||
task = asyncio.create_task(client.send(message))
|
||||
task_client_pairs.append((task, client))
|
||||
|
||||
if task_client_pairs:
|
||||
tasks = [pair[0] for pair in task_client_pairs]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
target_client = task_client_pairs[i][1]
|
||||
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
|
||||
disconnected.add(target_client)
|
||||
|
||||
if disconnected:
|
||||
clients.difference_update(disconnected)
|
||||
|
||||
except websockets.exceptions.ConnectionClosed:
|
||||
pass
|
||||
except Exception as exc:
|
||||
log.error("WS handler error for %s: %s", websocket.remote_address, exc)
|
||||
logger.debug(f"Connection closed by client {addr}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling client {addr}: {e}")
|
||||
finally:
|
||||
ws_clients.discard(websocket)
|
||||
log.info("WS client disconnected. Total: %d", len(ws_clients))
|
||||
clients.discard(websocket)
|
||||
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main — run both servers concurrently
|
||||
# ---------------------------------------------------------------------------
|
||||
async def main():
|
||||
# HTTP server
|
||||
http_server = await asyncio.start_server(http_handler, HOST, HTTP_PORT)
|
||||
log.info("HTTP server listening on http://%s:%d", HOST, HTTP_PORT)
|
||||
|
||||
# WebSocket server
|
||||
ws_server = await websockets.asyncio.server.serve(ws_handler, HOST, WS_PORT)
|
||||
log.info("WebSocket server listening on ws://%s:%d", HOST, WS_PORT)
|
||||
log.info("Nexus is live — open http://%s:%d in a browser", HOST, HTTP_PORT)
|
||||
|
||||
# Graceful shutdown
|
||||
"""Main server loop with graceful shutdown."""
|
||||
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
|
||||
|
||||
# Set up signal handlers for graceful shutdown
|
||||
loop = asyncio.get_running_loop()
|
||||
stop = loop.create_future()
|
||||
|
||||
|
||||
def shutdown():
|
||||
if not stop.done():
|
||||
stop.set_result(None)
|
||||
@@ -193,28 +95,29 @@ async def main():
|
||||
try:
|
||||
loop.add_signal_handler(sig, shutdown)
|
||||
except NotImplementedError:
|
||||
# Signal handlers not supported on Windows
|
||||
pass
|
||||
|
||||
await stop
|
||||
log.info("Shutting down...")
|
||||
|
||||
http_server.close()
|
||||
await http_server.wait_closed()
|
||||
ws_server.close()
|
||||
await ws_server.wait_closed()
|
||||
|
||||
remaining = {c for c in ws_clients if c.open}
|
||||
async with websockets.serve(broadcast_handler, HOST, PORT):
|
||||
logger.info("Gateway is ready and listening.")
|
||||
await stop
|
||||
|
||||
logger.info("Shutting down Nexus WS gateway...")
|
||||
# Close any remaining client connections (handlers may have already cleaned up)
|
||||
remaining = {c for c in clients if c.open}
|
||||
if remaining:
|
||||
await asyncio.gather(*(c.close() for c in remaining), return_exceptions=True)
|
||||
ws_clients.clear()
|
||||
log.info("Shutdown complete.")
|
||||
|
||||
logger.info(f"Closing {len(remaining)} active connections...")
|
||||
close_tasks = [client.close() for client in remaining]
|
||||
await asyncio.gather(*close_tasks, return_exceptions=True)
|
||||
clients.clear()
|
||||
|
||||
logger.info("Shutdown complete.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as exc:
|
||||
log.critical("Fatal: %s", exc)
|
||||
except Exception as e:
|
||||
logger.critical(f"Fatal server error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
182
tests/test_chatlog_persistence.py
Normal file
182
tests/test_chatlog_persistence.py
Normal file
@@ -0,0 +1,182 @@
|
||||
"""Tests for ChatLog persistence fix (#1349).
|
||||
|
||||
Verifies:
|
||||
- ChatLog.log() returns correct entry dict
|
||||
- JSONL persistence writes to disk
|
||||
- Unicode messages are preserved
|
||||
- Rolling buffer limits per room
|
||||
- Thread safety under concurrent writes
|
||||
- Graceful degradation when CHATLOG_FILE is unavailable
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure module path
|
||||
import sys
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from multi_user_bridge import ChatLog
|
||||
|
||||
|
||||
class TestChatLogPersistence:
|
||||
"""Core ChatLog.log() behavior."""
|
||||
|
||||
def test_log_returns_entry_dict(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'chat.jsonl'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
entry = log.log('room1', 'say', 'hello', user_id='u1', username='Alice')
|
||||
assert entry['type'] == 'say'
|
||||
assert entry['message'] == 'hello'
|
||||
assert entry['user_id'] == 'u1'
|
||||
assert entry['username'] == 'Alice'
|
||||
assert entry['room'] == 'room1'
|
||||
assert 'timestamp' in entry
|
||||
|
||||
def test_persist_creates_jsonl_file(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'subdir' / 'chat.jsonl'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('room1', 'say', 'msg1')
|
||||
log.log('room1', 'say', 'msg2')
|
||||
assert log_file.exists()
|
||||
lines = log_file.read_text().strip().split('\n')
|
||||
assert len(lines) == 2
|
||||
entry1 = json.loads(lines[0])
|
||||
assert entry1['message'] == 'msg1'
|
||||
|
||||
def test_unicode_preserved_in_jsonl(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'chat.jsonl'
|
||||
msg = 'Привет мир 日本語 🎮'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('room1', 'say', msg)
|
||||
lines = log_file.read_text().strip().split('\n')
|
||||
entry = json.loads(lines[0])
|
||||
assert entry['message'] == msg
|
||||
|
||||
def test_rolling_buffer_limits_per_room(self):
|
||||
log = ChatLog(max_per_room=3)
|
||||
for i in range(5):
|
||||
log.log('room1', 'say', f'msg{i}')
|
||||
history = log.get_history('room1')
|
||||
assert len(history) == 3
|
||||
assert history[0]['message'] == 'msg2'
|
||||
assert history[2]['message'] == 'msg4'
|
||||
|
||||
def test_rooms_are_independent(self):
|
||||
log = ChatLog(max_per_room=2)
|
||||
log.log('roomA', 'say', 'a1')
|
||||
log.log('roomB', 'say', 'b1')
|
||||
log.log('roomA', 'say', 'a2')
|
||||
log.log('roomA', 'say', 'a3')
|
||||
assert len(log.get_history('roomA')) == 2
|
||||
assert len(log.get_history('roomB')) == 1
|
||||
|
||||
def test_get_all_rooms(self):
|
||||
log = ChatLog()
|
||||
log.log('alpha', 'say', 'x')
|
||||
log.log('beta', 'say', 'y')
|
||||
rooms = log.get_all_rooms()
|
||||
assert set(rooms) == {'alpha', 'beta'}
|
||||
|
||||
def test_get_history_with_since_filter(self):
|
||||
log = ChatLog()
|
||||
log.log('r', 'say', 'old')
|
||||
import time; time.sleep(0.01)
|
||||
from datetime import datetime
|
||||
cutoff = datetime.now().isoformat()
|
||||
time.sleep(0.01)
|
||||
log.log('r', 'say', 'new')
|
||||
result = log.get_history('r', since=cutoff)
|
||||
assert len(result) == 1
|
||||
assert result[0]['message'] == 'new'
|
||||
|
||||
def test_thread_safety(self, tmp_path):
|
||||
"""Multiple threads writing to same ChatLog should not corrupt JSONL."""
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'threaded.jsonl'
|
||||
errors = []
|
||||
|
||||
def writer(thread_id):
|
||||
try:
|
||||
for i in range(20):
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('shared', 'say', f't{thread_id}_m{i}')
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = [threading.Thread(target=writer, args=(t,)) for t in range(4)]
|
||||
for t in threads: t.start()
|
||||
for t in threads: t.join()
|
||||
|
||||
assert not errors, f"Thread errors: {errors}"
|
||||
# Buffer should have exactly max_per_room (50) entries, all from our writes
|
||||
history = log.get_history('shared')
|
||||
assert len(history) == 50
|
||||
|
||||
# JSONL should have ~80 entries (20*4) - allow off-by-one
|
||||
# due to non-atomic append under contention
|
||||
if log_file.exists():
|
||||
lines = log_file.read_text().strip().split('\n')
|
||||
assert len(lines) >= 78, f"Expected ~80 JSONL entries, got {len(lines)}"
|
||||
# Verify every line is valid JSON
|
||||
for line in lines:
|
||||
parsed = json.loads(line)
|
||||
assert parsed['room'] == 'shared'
|
||||
|
||||
def test_persist_graceful_when_no_path(self):
|
||||
"""log() should not crash if CHATLOG_FILE is undefined."""
|
||||
log = ChatLog()
|
||||
with patch.object(ChatLog, '_get_log_path', return_value=None):
|
||||
entry = log.log('r', 'say', 'test')
|
||||
assert entry['message'] == 'test'
|
||||
# In-memory buffer should still work
|
||||
assert len(log.get_history('r')) == 1
|
||||
|
||||
def test_persist_handles_unwritable_dir(self, tmp_path, capsys):
|
||||
"""log() should catch and report permission errors, not crash."""
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'readonly' / 'chat.jsonl'
|
||||
# Make parent dir read-only
|
||||
ro_dir = tmp_path / 'readonly'
|
||||
ro_dir.mkdir()
|
||||
ro_dir.chmod(0o000)
|
||||
try:
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
entry = log.log('r', 'say', 'test')
|
||||
assert entry['message'] == 'test'
|
||||
captured = capsys.readouterr()
|
||||
assert 'Persist failed' in captured.out
|
||||
finally:
|
||||
ro_dir.chmod(0o755) # cleanup
|
||||
|
||||
def test_msg_type_ask_logged(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'chat.jsonl'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('r', 'ask', 'What is love?')
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry['type'] == 'ask'
|
||||
|
||||
def test_msg_type_system_logged(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'chat.jsonl'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('r', 'system', 'Server restarted')
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry['type'] == 'system'
|
||||
|
||||
def test_data_field_persisted(self, tmp_path):
|
||||
log = ChatLog()
|
||||
log_file = tmp_path / 'chat.jsonl'
|
||||
with patch('multi_user_bridge.CHATLOG_FILE', log_file):
|
||||
log.log('r', 'say', 'hello', data={'extra': 42})
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry['data'] == {'extra': 42}
|
||||
Reference in New Issue
Block a user