diff --git a/app.js b/app.js index c4fcf682..fdcb5187 100644 --- a/app.js +++ b/app.js @@ -2310,6 +2310,24 @@ function connectHermes() { hermesWs = new WebSocket(wsUrl); hermesWs.onopen = () => { + // Check if we need to authenticate (external connection) + const isLocalhost = window.location.hostname === 'localhost' || + window.location.hostname === '127.0.0.1' || + window.location.hostname === '::1'; + + if (!isLocalhost) { + const authToken = localStorage.getItem('nexus-ws-auth-token'); + if (authToken) { + hermesWs.send(JSON.stringify({ + type: 'auth', + token: authToken + })); + console.log('Sent authentication token'); + } else { + console.warn('No authentication token found for external connection'); + } + } + console.log('Hermes connected.'); wsConnected = true; addChatMessage('system', 'Hermes link established.'); diff --git a/tests/test_websocket_security.py b/tests/test_websocket_security.py new file mode 100644 index 00000000..2bea1f8d --- /dev/null +++ b/tests/test_websocket_security.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +""" +Test WebSocket gateway security configuration. +Issue #1514: [Security] WebSocket gateway listens on 0.0.0.0 +""" + +import os +import re +import sys + + +def test_server_binding(): + """Test that server binds to correct address.""" + print("Testing server binding configuration...") + + server_path = os.path.join(os.path.dirname(__file__), '..', 'server.py') + + if not os.path.exists(server_path): + print(f"ERROR: server.py not found at {server_path}") + return False + + with open(server_path, 'r') as f: + content = f.read() + + # Check HOST uses env var with 127.0.0.1 default (flexible whitespace) + host_pattern = r'HOST\s*=\s*os\.environ\.get\(\s*"NEXUS_WS_HOST"\s*,\s*"127\.0\.0\.1"\s*\)' + if re.search(host_pattern, content): + print("HOST configured with env var, default 127.0.0.1") + else: + print("HOST not properly configured") + return False + + # Check for authentication implementation + if 'async def authenticate_connection' in content: + print("Server-side authentication implemented") + else: + print("Server-side authentication missing") + return False + + # Check for token validation + if 'token' in content and 'AUTH_TOKEN' in content: + print("Token validation present") + else: + print("Token validation missing") + return False + + return True + + +def test_environment_variables(): + """Test environment variable configuration.""" + print("\nTesting environment variable configuration...") + + server_path = os.path.join(os.path.dirname(__file__), '..', 'server.py') + + with open(server_path, 'r') as f: + content = f.read() + + checks = [ + ('NEXUS_WS_HOST', "HOST environment variable"), + ('NEXUS_WS_PORT', "PORT environment variable"), + ('NEXUS_WS_TOKEN', "Auth token environment variable"), + ] + + for var, desc in checks: + if f'NEXUS_WS_{var.split("_")[-1]}' in content or var in content: + # More precise check + if f'os.environ.get("{var}"' in content or f"os.environ.get('{var}'" in content: + print(f"{desc} configured") + else: + print(f"{desc} not found via os.environ.get") + return False + else: + print(f"{desc} missing") + return False + + # Check 127.0.0.1 default + host_pattern = r'HOST\s*=\s*os\.environ\.get\(\s*"NEXUS_WS_HOST"\s*,\s*"127\.0\.0\.1"' + if re.search(host_pattern, content): + print("Default HOST is 127.0.0.1") + else: + print("Default HOST is not 127.0.0.1") + return False + + return True + + +def test_client_authentication(): + """Test client authentication code.""" + print("\nTesting client authentication code...") + + app_path = os.path.join(os.path.dirname(__file__), '..', 'app.js') + + if not os.path.exists(app_path): + print(f"ERROR: app.js not found at {app_path}") + return False + + with open(app_path, 'r') as f: + content = f.read() + + checks = [ + ('isLocalhost', 'Client checks for localhost'), + ('nexus-ws-auth-token', 'Client retrieves auth token'), + ("type: 'auth'", 'Client sends auth message'), + ('localStorage.getItem', 'Client reads from localStorage'), + ] + + for pattern, desc in checks: + if pattern in content: + print(desc) + else: + print(f"MISSING: {desc}") + return False + + return True + + +def main(): + """Run all tests.""" + print("=" * 60) + print("WebSocket Gateway Security Tests") + print("Issue: #1514") + print("=" * 60) + + tests = [ + ("Server binding configuration", test_server_binding), + ("Environment variable configuration", test_environment_variables), + ("Client authentication code", test_client_authentication), + ] + + results = [] + for name, test_func in tests: + print(f"\n{name}:") + try: + result = test_func() + results.append((name, result)) + except Exception as e: + print(f"Test failed with exception: {e}") + import traceback + traceback.print_exc() + results.append((name, False)) + + print("\n" + "=" * 60) + print("Test Results") + print("=" * 60) + + passed = 0 + for name, result in results: + status = "PASS" if result else "FAIL" + print(f" {status}: {name}") + if result: + passed += 1 + + print(f"\nPassed: {passed}/{len(results)}") + + if passed == len(results): + print("\nAll tests passed!") + return 0 + else: + print("\nSome tests failed!") + return 1 + + +if __name__ == '__main__': + sys.exit(main())