Files
the-nexus/tests/test_websocket_security.py
Rockachopa eb5f17917c
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 9s
CI / test (pull_request) Failing after 1m1s
CI / validate (pull_request) Failing after 1m2s
fix(#1514): add client-side auth and security tests for WebSocket gateway
- Add client-side token authentication in app.js for external connections
- Skip auth for localhost (127.0.0.1, ::1, localhost)
- Token read from localStorage ('nexus-ws-auth-token')
- Add comprehensive security tests (tests/test_websocket_security.py)
- Tests verify server binding, env vars, and client auth implementation

Server-side fix already present on main via PR #1504:
- HOST defaults to 127.0.0.1
- AUTH_TOKEN support via server.py authenticate_connection()
- Rate limiting already implemented

Closes #1514
2026-04-26 01:22:43 -04:00

166 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
Test WebSocket gateway security configuration.
Issue #1514: [Security] WebSocket gateway listens on 0.0.0.0
"""
import os
import re
import sys
def test_server_binding():
"""Test that server binds to correct address."""
print("Testing server binding configuration...")
server_path = os.path.join(os.path.dirname(__file__), '..', 'server.py')
if not os.path.exists(server_path):
print(f"ERROR: server.py not found at {server_path}")
return False
with open(server_path, 'r') as f:
content = f.read()
# Check HOST uses env var with 127.0.0.1 default (flexible whitespace)
host_pattern = r'HOST\s*=\s*os\.environ\.get\(\s*"NEXUS_WS_HOST"\s*,\s*"127\.0\.0\.1"\s*\)'
if re.search(host_pattern, content):
print("HOST configured with env var, default 127.0.0.1")
else:
print("HOST not properly configured")
return False
# Check for authentication implementation
if 'async def authenticate_connection' in content:
print("Server-side authentication implemented")
else:
print("Server-side authentication missing")
return False
# Check for token validation
if 'token' in content and 'AUTH_TOKEN' in content:
print("Token validation present")
else:
print("Token validation missing")
return False
return True
def test_environment_variables():
"""Test environment variable configuration."""
print("\nTesting environment variable configuration...")
server_path = os.path.join(os.path.dirname(__file__), '..', 'server.py')
with open(server_path, 'r') as f:
content = f.read()
checks = [
('NEXUS_WS_HOST', "HOST environment variable"),
('NEXUS_WS_PORT', "PORT environment variable"),
('NEXUS_WS_TOKEN', "Auth token environment variable"),
]
for var, desc in checks:
if f'NEXUS_WS_{var.split("_")[-1]}' in content or var in content:
# More precise check
if f'os.environ.get("{var}"' in content or f"os.environ.get('{var}'" in content:
print(f"{desc} configured")
else:
print(f"{desc} not found via os.environ.get")
return False
else:
print(f"{desc} missing")
return False
# Check 127.0.0.1 default
host_pattern = r'HOST\s*=\s*os\.environ\.get\(\s*"NEXUS_WS_HOST"\s*,\s*"127\.0\.0\.1"'
if re.search(host_pattern, content):
print("Default HOST is 127.0.0.1")
else:
print("Default HOST is not 127.0.0.1")
return False
return True
def test_client_authentication():
"""Test client authentication code."""
print("\nTesting client authentication code...")
app_path = os.path.join(os.path.dirname(__file__), '..', 'app.js')
if not os.path.exists(app_path):
print(f"ERROR: app.js not found at {app_path}")
return False
with open(app_path, 'r') as f:
content = f.read()
checks = [
('isLocalhost', 'Client checks for localhost'),
('nexus-ws-auth-token', 'Client retrieves auth token'),
("type: 'auth'", 'Client sends auth message'),
('localStorage.getItem', 'Client reads from localStorage'),
]
for pattern, desc in checks:
if pattern in content:
print(desc)
else:
print(f"MISSING: {desc}")
return False
return True
def main():
"""Run all tests."""
print("=" * 60)
print("WebSocket Gateway Security Tests")
print("Issue: #1514")
print("=" * 60)
tests = [
("Server binding configuration", test_server_binding),
("Environment variable configuration", test_environment_variables),
("Client authentication code", test_client_authentication),
]
results = []
for name, test_func in tests:
print(f"\n{name}:")
try:
result = test_func()
results.append((name, result))
except Exception as e:
print(f"Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append((name, False))
print("\n" + "=" * 60)
print("Test Results")
print("=" * 60)
passed = 0
for name, result in results:
status = "PASS" if result else "FAIL"
print(f" {status}: {name}")
if result:
passed += 1
print(f"\nPassed: {passed}/{len(results)}")
if passed == len(results):
print("\nAll tests passed!")
return 0
else:
print("\nSome tests failed!")
return 1
if __name__ == '__main__':
sys.exit(main())