From e030dda0193f4e71e1b0916c59e4437b685d77db Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Sun, 12 Apr 2026 20:51:03 -0400 Subject: [PATCH] ThreadingHTTPServer + conversation summaries Fixes single-threaded bottleneck (Experiment 4) Adds GET /bridge/session//summary Auto-saves conversation summaries on session expiry --- multi_user_bridge.py | 138 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 109 insertions(+), 29 deletions(-) diff --git a/multi_user_bridge.py b/multi_user_bridge.py index 5a515968..262e4675 100644 --- a/multi_user_bridge.py +++ b/multi_user_bridge.py @@ -26,7 +26,8 @@ import threading import hashlib import os import sys -from http.server import HTTPServer, BaseHTTPRequestHandler +from http.server import BaseHTTPRequestHandler +from socketserver import ThreadingHTTPServer from pathlib import Path from datetime import datetime from typing import Optional @@ -328,6 +329,27 @@ CRISIS PROTOCOL: record_latency(self.user_id, self.room, elapsed_ms) return f"*The green LED flickers.* (Error: {e})" + def get_summary(self) -> str: + """Generate a brief conversation summary using the LLM.""" + if not self.messages: + return "Empty session — no messages exchanged." + transcript_lines = [] + for m in self.messages[-20:]: # last 20 messages for brevity + role = m.get("role", "?").upper() + content = m.get("content", "") + transcript_lines.append(f"{role}: {content}") + transcript = "\n".join(transcript_lines) + prompt = ( + "Summarize this conversation in 1-3 sentences. " + "Focus on what the user discussed, asked about, or did.\n\n" + f"CONVERSATION:\n{transcript}\n\nSUMMARY:" + ) + try: + summary = self.agent.chat(prompt) + return summary.strip() + except Exception as e: + return f"Summary unavailable (error: {e}). {len(self.messages)} messages exchanged." + def get_context_summary(self) -> dict: """Get session summary for monitoring.""" return { @@ -341,12 +363,13 @@ CRISIS PROTOCOL: class SessionManager: """Manages all user sessions.""" - + def __init__(self, max_sessions: int = 20, session_timeout: int = 3600): self.sessions: dict[str, UserSession] = {} self.max_sessions = max_sessions self.session_timeout = session_timeout self._lock = threading.Lock() + self._summaries_path = WORLD_DIR / 'session_summaries.jsonl' def get_or_create(self, user_id: str, username: str, room: str = "The Threshold") -> UserSession: """Get existing session or create new one.""" @@ -364,26 +387,49 @@ class SessionManager: return session def _cleanup_stale(self): - """Remove sessions that timed out.""" + """Remove sessions that timed out, saving summaries first.""" now = time.time() - stale = [uid for uid, s in self.sessions.items() + stale = [uid for uid, s in self.sessions.items() if now - s.last_active > self.session_timeout] for uid in stale: + session = self.sessions[uid] + self._save_summary(session) del self.sessions[uid] - + def _evict_oldest(self): - """Evict the least recently active session.""" + """Evict the least recently active session, saving summary first.""" if not self.sessions: return oldest = min(self.sessions.items(), key=lambda x: x[1].last_active) + self._save_summary(oldest[1]) del self.sessions[oldest[0]] + + def _save_summary(self, session: UserSession): + """Generate summary via LLM and append to JSONL file.""" + try: + summary_text = session.get_summary() + record = { + "user_id": session.user_id, + "username": session.username, + "room": session.room, + "message_count": len(session.messages), + "created_at": session.created_at, + "ended_at": datetime.now().isoformat(), + "summary": summary_text, + } + with open(self._summaries_path, 'a') as f: + f.write(json.dumps(record) + '\n') + except Exception as e: + print(f"[SessionManager] Failed to save summary for {session.user_id}: {e}") def list_sessions(self) -> list: """List all active sessions.""" - return [s.get_context_summary() for s in self.sessions.values()] - + with self._lock: + return [s.get_context_summary() for s in self.sessions.values()] + def get_session_count(self) -> int: - return len(self.sessions) + with self._lock: + return len(self.sessions) # ── HTTP API ─────────────────────────────────────────────────────────── @@ -473,10 +519,11 @@ class BridgeHandler(BaseHTTPRequestHandler): for room_name, room_data in rooms.items(): players = presence_manager.get_players_in_room(room_name) # Count messages across all sessions in this room - conv_count = sum( - len(s.messages) for s in session_manager.sessions.values() - if s.room == room_name - ) + with session_manager._lock: + conv_count = sum( + len(s.messages) for s in session_manager.sessions.values() + if s.room == room_name + ) room_details[room_name] = { **room_data, "players": players, @@ -508,6 +555,37 @@ class BridgeHandler(BaseHTTPRequestHandler): "uptime_seconds": round(uptime_seconds, 1), "timestamp": datetime.now().isoformat(), }) + elif self.path.startswith('/bridge/session/') and self.path.endswith('/summary'): + # GET /bridge/session//summary + parts = self.path.split('/') + # ['', 'bridge', 'session', '', 'summary'] + user_id = parts[3] + with session_manager._lock: + session = session_manager.sessions.get(user_id) + if session: + summary = session.get_summary() + self._json_response({ + "user_id": user_id, + "username": session.username, + "room": session.room, + "message_count": len(session.messages), + "summary": summary, + }) + else: + # Check saved summaries in JSONL + saved = None + if session_manager._summaries_path.exists(): + for line in session_manager._summaries_path.read_text().splitlines(): + try: + entry = json.loads(line) + if entry.get("user_id") == user_id: + saved = entry + except json.JSONDecodeError: + continue + if saved: + self._json_response(saved) + else: + self._json_response({"error": "no session or saved summary"}, 404) else: self._json_response({"error": "not found"}, 404) @@ -542,21 +620,22 @@ class BridgeHandler(BaseHTTPRequestHandler): elif self.path == '/bridge/move': user_id = body.get('user_id') new_room = body.get('room') - if user_id in session_manager.sessions: - session = session_manager.sessions[user_id] - old_room = session.room - # Leave old room - leave_event = presence_manager.leave_room(user_id, old_room) - # Enter new room - session.room = new_room - enter_event = presence_manager.enter_room(user_id, session.username, new_room) - self._json_response({ - "ok": True, - "room": new_room, - "events": [e for e in [leave_event, enter_event] if e], - }) - else: - self._json_response({"error": "no session"}, 404) + with session_manager._lock: + if user_id in session_manager.sessions: + session = session_manager.sessions[user_id] + old_room = session.room + # Leave old room + leave_event = presence_manager.leave_room(user_id, old_room) + # Enter new room + session.room = new_room + enter_event = presence_manager.enter_room(user_id, session.username, new_room) + self._json_response({ + "ok": True, + "room": new_room, + "events": [e for e in [leave_event, enter_event] if e], + }) + else: + self._json_response({"error": "no session"}, 404) elif self.path == '/bridge/say': # POST /bridge/say — user says something visible to all in room @@ -605,12 +684,13 @@ def main(): print(f" GET /bridge/stats — Aggregate stats (sessions, messages, uptime)") print(f" GET /bridge/room//players — List players in a room") print(f" GET /bridge/room//events — Room events (presence + chat)") + print(f" GET /bridge/session//summary — Get conversation summary") print(f" POST /bridge/chat — Send message to Timmy") print(f" POST /bridge/say — Say something to room (visible to all)") print(f" POST /bridge/move — Move user to room (triggers presence)") print() - server = HTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler) + server = ThreadingHTTPServer((BRIDGE_HOST, BRIDGE_PORT), BridgeHandler) server.serve_forever()