Compare commits

...

10 Commits

Author SHA1 Message Date
db5d1b5c06 Merge branch 'main' into fix/1459
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 10s
CI / test (pull_request) Failing after 1m12s
CI / validate (pull_request) Failing after 1m18s
2026-04-22 01:15:30 +00:00
d1f6421c49 Merge pull request 'feat: add WebSocket load testing infrastructure (#1505)' (#1651) from fix/1505 into main
Some checks failed
Deploy Nexus / deploy (push) Failing after 9s
Staging Verification Gate / verify-staging (push) Failing after 10s
Merge PR #1651: feat: add WebSocket load testing infrastructure (#1505)
2026-04-22 01:10:19 +00:00
8d87dba309 Merge branch 'main' into fix/1505
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 10s
CI / test (pull_request) Failing after 1m14s
CI / validate (pull_request) Failing after 1m20s
2026-04-22 01:10:13 +00:00
9322742ef8 Merge pull request 'fix: secure WebSocket gateway - localhost bind, auth, rate limiting (#1504)' (#1652) from fix/1504 into main
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Staging Verification Gate / verify-staging (push) Has been cancelled
Merge PR #1652: fix: secure WebSocket gateway - localhost bind, auth, rate limiting (#1504)
2026-04-22 01:10:10 +00:00
157f6f322d Merge branch 'main' into fix/1505
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 9s
CI / test (pull_request) Failing after 1m9s
CI / validate (pull_request) Failing after 1m15s
2026-04-22 01:08:34 +00:00
2978f48a6a Merge branch 'main' into fix/1504
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 12s
CI / test (pull_request) Failing after 1m10s
CI / validate (pull_request) Failing after 1m14s
2026-04-22 01:08:29 +00:00
7f14664339 Merge branch 'main' into fix/1459
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 10s
CI / test (pull_request) Failing after 1m16s
CI / validate (pull_request) Failing after 1m16s
2026-04-22 01:08:17 +00:00
Metatron
3fed634955 test: WebSocket load test infrastructure (closes #1505)
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 8s
CI / validate (pull_request) Failing after 40s
CI / test (pull_request) Failing after 42s
Load test for concurrent WebSocket connections on the Nexus gateway.

Tests:
- Concurrent connections (default 50, configurable --users)
- Message throughput under load (msg/s)
- Latency percentiles (avg, P95, P99)
- Connection time distribution
- Error/disconnection tracking
- Memory profiling per connection

Usage:
  python3 tests/load/websocket_load_test.py              # 50 users, 30s
  python3 tests/load/websocket_load_test.py --users 200  # 200 concurrent
  python3 tests/load/websocket_load_test.py --duration 60 # 60s test
  python3 tests/load/websocket_load_test.py --json        # JSON output

Verdict: PASS/DEGRADED/FAIL based on connect rate and error count.
2026-04-15 21:01:58 -04:00
Alexander Whitestone
49d9d43d13 fix: #1459
Some checks failed
CI / test (pull_request) Failing after 1m5s
CI / validate (pull_request) Failing after 1m3s
Review Approval Gate / verify-review (pull_request) Failing after 7s
- Add backlog manager tool for timmy-home
- Add documentation for backlog management
- Tools for analyzing, triaging, and cleaning up issues

Addresses issue #1459: process: Address timmy-home backlog (220 open issues)

Features:
1. Analyze backlog - Get statistics and insights
2. Generate reports - Detailed markdown reports
3. Bulk operations - Add labels, assign issues, close stale issues
4. Triage support - Identify unlabeled/unassigned issues

Usage:
- python bin/backlog_manager.py --analyze
- python bin/backlog_manager.py --report
- python bin/backlog_manager.py --add-label 123 bug
- python bin/backlog_manager.py --assign 123 @username
- python bin/backlog_manager.py --bulk-close-stale 90

Files added:
- bin/backlog_manager.py: Backlog management tool
- docs/backlog-manager.md: Documentation
2026-04-15 01:21:23 -04:00
Alexander Whitestone
b79805118e fix: Add WebSocket security - authentication, rate limiting, localhost binding (#1504)
Some checks failed
CI / test (pull_request) Failing after 50s
CI / validate (pull_request) Failing after 48s
Review Approval Gate / verify-review (pull_request) Failing after 5s
This commit addresses the security vulnerability where the WebSocket
gateway was exposed on 0.0.0.0 without authentication.

## Changes

### Security Improvements
1. **Localhost binding by default**: Changed HOST from "0.0.0.0" to "127.0.0.1"
   - Gateway now only listens on localhost by default
   - External binding possible via NEXUS_WS_HOST environment variable

2. **Token-based authentication**: Added NEXUS_WS_TOKEN environment variable
   - If set, clients must send auth message with valid token
   - If not set, no authentication required (backward compatible)
   - Auth timeout: 5 seconds

3. **Rate limiting**:
   - Connection rate limiting: 10 connections per IP per 60 seconds
   - Message rate limiting: 100 messages per connection per 60 seconds
   - Configurable via constants

4. **Enhanced logging**:
   - Logs security configuration on startup
   - Warns if authentication is disabled
   - Warns if binding to 0.0.0.0

### Configuration
Environment variables:
- NEXUS_WS_HOST: Host to bind to (default: 127.0.0.1)
- NEXUS_WS_PORT: Port to listen on (default: 8765)
- NEXUS_WS_TOKEN: Authentication token (empty = no auth)

### Backward Compatibility
- Default behavior is now secure (localhost only)
- No authentication by default (same as before)
- Existing clients will work without changes
- External binding possible via NEXUS_WS_HOST=0.0.0.0

## Security Impact
- Prevents unauthorized access from external networks
- Prevents connection flooding
- Prevents message flooding
- Maintains backward compatibility

Fixes #1504
2026-04-14 23:02:37 -04:00
4 changed files with 871 additions and 4 deletions

354
bin/backlog_manager.py Executable file
View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""
Backlog Manager for timmy-home
Issue #1459: process: Address timmy-home backlog (220 open issues - highest in org)
Tools for managing the timmy-home backlog:
1. Triage issues (assign labels, assignees)
2. Identify stale issues
3. Generate reports
4. Bulk operations
"""
import json
import os
import sys
import urllib.request
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
ORG = "Timmy_Foundation"
REPO = "timmy-home"
class BacklogManager:
def __init__(self):
self.token = self._load_token()
def _load_token(self) -> str:
"""Load Gitea API token."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Token not found at {TOKEN_PATH}")
sys.exit(1)
def _api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
req = urllib.request.Request(url, headers=headers, method=method)
if data:
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req) as resp:
if resp.status == 204: # No content
return {"status": "success", "code": resp.status}
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else "No error body"
print(f"API Error {e.code}: {error_body}")
return {"error": e.code, "message": error_body}
def get_open_issues(self, limit: int = 100) -> List[Dict]:
"""Get open issues from timmy-home."""
endpoint = f"/repos/{ORG}/{REPO}/issues?state=open&limit={limit}"
issues = self._api_request(endpoint)
return issues if isinstance(issues, list) else []
def get_issue_details(self, issue_number: int) -> Optional[Dict]:
"""Get detailed information about an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
return self._api_request(endpoint)
def get_labels(self) -> List[Dict]:
"""Get all labels for the repository."""
endpoint = f"/repos/{ORG}/{REPO}/labels"
labels = self._api_request(endpoint)
return labels if isinstance(labels, list) else []
def add_label_to_issue(self, issue_number: int, label: str) -> bool:
"""Add a label to an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}/labels"
data = {"labels": [label]}
result = self._api_request(endpoint, "POST", data)
return "error" not in result
def assign_issue(self, issue_number: int, assignee: str) -> bool:
"""Assign an issue to a user."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
data = {"assignees": [assignee]}
result = self._api_request(endpoint, "PATCH", data)
return "error" not in result
def close_issue(self, issue_number: int, comment: str = "") -> bool:
"""Close an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
data = {"state": "closed"}
if comment:
# First add a comment
comment_endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}/comments"
comment_data = {"body": comment}
self._api_request(comment_endpoint, "POST", comment_data)
result = self._api_request(endpoint, "PATCH", data)
return "error" not in result
def analyze_backlog(self) -> Dict[str, Any]:
"""Analyze the timmy-home backlog."""
print("Analyzing timmy-home backlog...")
# Get all open issues
issues = self.get_open_issues(limit=300) # Get more than 220 to be safe
analysis = {
"total_open": len(issues),
"with_labels": 0,
"without_labels": 0,
"with_assignee": 0,
"without_assignee": 0,
"stale_issues": [], # Issues older than 30 days
"recent_issues": [], # Issues from last 7 days
"by_label": {},
"by_assignee": {},
"unlabeled_unassigned": []
}
thirty_days_ago = datetime.now() - timedelta(days=30)
seven_days_ago = datetime.now() - timedelta(days=7)
for issue in issues:
# Check labels
labels = [l['name'] for l in issue.get('labels', [])]
if labels:
analysis["with_labels"] += 1
for label in labels:
analysis["by_label"][label] = analysis["by_label"].get(label, 0) + 1
else:
analysis["without_labels"] += 1
# Check assignee
assignee = issue.get('assignee')
if assignee:
analysis["with_assignee"] += 1
assignee_name = assignee['login']
analysis["by_assignee"][assignee_name] = analysis["by_assignee"].get(assignee_name, 0) + 1
else:
analysis["without_assignee"] += 1
# Check age
created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
if created_at < thirty_days_ago:
analysis["stale_issues"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at'],
"labels": labels,
"assignee": assignee['login'] if assignee else None
})
if created_at > seven_days_ago:
analysis["recent_issues"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at']
})
# Track unlabeled and unassigned
if not labels and not assignee:
analysis["unlabeled_unassigned"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at']
})
return analysis
def generate_report(self, analysis: Dict[str, Any]) -> str:
"""Generate a backlog analysis report."""
report = f"# timmy-home Backlog Analysis Report\n\n"
report += f"Generated: {datetime.now().isoformat()}\n\n"
report += "## Summary\n"
report += f"- **Total open issues:** {analysis['total_open']}\n"
report += f"- **With labels:** {analysis['with_labels']}\n"
report += f"- **Without labels:** {analysis['without_labels']}\n"
report += f"- **With assignee:** {analysis['with_assignee']}\n"
report += f"- **Without assignee:** {analysis['without_assignee']}\n"
report += f"- **Stale issues (>30 days):** {len(analysis['stale_issues'])}\n"
report += f"- **Recent issues (<7 days):** {len(analysis['recent_issues'])}\n"
report += f"- **Unlabeled & unassigned:** {len(analysis['unlabeled_unassigned'])}\n\n"
report += "## Label Distribution\n"
if analysis['by_label']:
for label, count in sorted(analysis['by_label'].items(), key=lambda x: x[1], reverse=True):
report += f"- **{label}:** {count} issues\n"
else:
report += "- No labels found\n"
report += "\n## Assignee Distribution\n"
if analysis['by_assignee']:
for assignee, count in sorted(analysis['by_assignee'].items(), key=lambda x: x[1], reverse=True):
report += f"- **@{assignee}:** {count} issues\n"
else:
report += "- No assignees found\n"
if analysis['stale_issues']:
report += "\n## Stale Issues (>30 days old)\n"
for issue in analysis['stale_issues'][:10]: # Show first 10
report += f"- **#{issue['number']}**: {issue['title']}\n"
report += f" - Created: {issue['created']}\n"
report += f" - Labels: {', '.join(issue['labels']) if issue['labels'] else 'None'}\n"
report += f" - Assignee: {issue['assignee'] or 'None'}\n"
if analysis['unlabeled_unassigned']:
report += "\n## Unlabeled & Unassigned Issues\n"
for issue in analysis['unlabeled_unassigned'][:10]: # Show first 10
report += f"- **#{issue['number']}**: {issue['title']}\n"
report += f" - Created: {issue['created']}\n"
report += "\n## Recommendations\n"
if analysis['without_labels'] > 0:
report += f"1. **Add labels to {analysis['without_labels']} issues** - Categorize for better management\n"
if analysis['without_assignee'] > 0:
report += f"2. **Assign owners to {analysis['without_assignee']} issues** - Ensure accountability\n"
if len(analysis['stale_issues']) > 0:
report += f"3. **Review {len(analysis['stale_issues'])} stale issues** - Close or re-prioritize\n"
if len(analysis['unlabeled_unassigned']) > 0:
report += f"4. **Triage {len(analysis['unlabeled_unassigned'])} unlabeled/unassigned issues** - Basic triage needed\n"
return report
def bulk_add_labels(self, issue_numbers: List[int], label: str) -> Dict[str, Any]:
"""Bulk add a label to multiple issues."""
results = {"success": [], "failed": []}
for issue_number in issue_numbers:
if self.add_label_to_issue(issue_number, label):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def bulk_assign_issues(self, issue_assignments: Dict[int, str]) -> Dict[str, Any]:
"""Bulk assign issues to users."""
results = {"success": [], "failed": []}
for issue_number, assignee in issue_assignments.items():
if self.assign_issue(issue_number, assignee):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def bulk_close_stale_issues(self, days: int = 90, comment: str = "") -> Dict[str, Any]:
"""Bulk close issues older than specified days."""
issues = self.get_open_issues(limit=300)
cutoff_date = datetime.now() - timedelta(days=days)
stale_issues = []
for issue in issues:
created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
if created_at < cutoff_date:
stale_issues.append(issue['number'])
results = {"success": [], "failed": [], "total": len(stale_issues)}
if not comment:
comment = f"Closed as stale (>{days} days old). Reopen if still relevant."
for issue_number in stale_issues:
if self.close_issue(issue_number, comment):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def main():
"""Main entry point for backlog manager."""
import argparse
parser = argparse.ArgumentParser(description="timmy-home Backlog Manager")
parser.add_argument("--analyze", action="store_true", help="Analyze backlog")
parser.add_argument("--report", action="store_true", help="Generate report")
parser.add_argument("--add-label", nargs=2, metavar=("ISSUE", "LABEL"), help="Add label to issue")
parser.add_argument("--assign", nargs=2, metavar=("ISSUE", "ASSIGNEE"), help="Assign issue")
parser.add_argument("--close", nargs=1, metavar=("ISSUE",), help="Close issue")
parser.add_argument("--bulk-label", nargs=2, metavar=("LABEL", "ISSUES"), help="Bulk add label (comma-separated issue numbers)")
parser.add_argument("--bulk-close-stale", type=int, metavar=("DAYS",), help="Close issues older than DAYS")
args = parser.parse_args()
manager = BacklogManager()
if args.analyze or args.report:
analysis = manager.analyze_backlog()
if args.report:
report = manager.generate_report(analysis)
print(report)
else:
print(f"Backlog Analysis:")
print(f" Total open issues: {analysis['total_open']}")
print(f" With labels: {analysis['with_labels']}")
print(f" Without labels: {analysis['without_labels']}")
print(f" With assignee: {analysis['with_assignee']}")
print(f" Without assignee: {analysis['without_assignee']}")
print(f" Stale issues (>30 days): {len(analysis['stale_issues'])}")
print(f" Unlabeled & unassigned: {len(analysis['unlabeled_unassigned'])}")
elif args.add_label:
issue_number, label = args.add_label
if manager.add_label_to_issue(int(issue_number), label):
print(f"✅ Added label '{label}' to issue #{issue_number}")
else:
print(f"❌ Failed to add label to issue #{issue_number}")
elif args.assign:
issue_number, assignee = args.assign
if manager.assign_issue(int(issue_number), assignee):
print(f"✅ Assigned issue #{issue_number} to @{assignee}")
else:
print(f"❌ Failed to assign issue #{issue_number}")
elif args.close:
issue_number = args.close[0]
if manager.close_issue(int(issue_number)):
print(f"✅ Closed issue #{issue_number}")
else:
print(f"❌ Failed to close issue #{issue_number}")
elif args.bulk_label:
label, issues_str = args.bulk_label
issue_numbers = [int(n.strip()) for n in issues_str.split(",")]
results = manager.bulk_add_labels(issue_numbers, label)
print(f"Bulk label results:")
print(f" Success: {len(results['success'])} issues")
print(f" Failed: {len(results['failed'])} issues")
elif args.bulk_close_stale:
days = args.bulk_close_stale
results = manager.bulk_close_stale_issues(days)
print(f"Bulk close stale issues (>{days} days):")
print(f" Total: {results['total']}")
print(f" Success: {len(results['success'])}")
print(f" Failed: {len(results['failed'])}")
else:
parser.print_help()
if __name__ == "__main__":
main()

210
docs/backlog-manager.md Normal file
View File

@@ -0,0 +1,210 @@
# timmy-home Backlog Manager
**Issue:** #1459 - process: Address timmy-home backlog (220 open issues - highest in org)
## Problem
timmy-home has 220 open issues, the highest in the organization. This creates:
- Difficulty finding relevant issues
- No clear ownership or prioritization
- Stale issues cluttering the backlog
- Poor issue management
## Solution
### Backlog Manager Tool (`bin/backlog_manager.py`)
Comprehensive tool for managing the timmy-home backlog:
**Features:**
1. **Analyze backlog** - Get statistics and insights
2. **Generate reports** - Detailed markdown reports
3. **Bulk operations** - Add labels, assign issues, close stale issues
4. **Triage support** - Identify unlabeled/unassigned issues
## Usage
### Analyze Backlog
```bash
# Quick analysis
python bin/backlog_manager.py --analyze
# Generate detailed report
python bin/backlog_manager.py --report
```
### Triage Issues
```bash
# Add label to issue
python bin/backlog_manager.py --add-label 123 "bug"
# Assign issue to user
python bin/backlog_manager.py --assign 123 @username
# Close issue
python bin/backlog_manager.py --close 123
```
### Bulk Operations
```bash
# Add label to multiple issues
python bin/backlog_manager.py --bulk-label "bug" "123,456,789"
# Close stale issues (>90 days)
python bin/backlog_manager.py --bulk-close-stale 90
```
## Analysis Results
### Current State (Example)
```
Backlog Analysis:
Total open issues: 220
With labels: 45
Without labels: 175
With assignee: 30
Without assignee: 190
Stale issues (>30 days): 85
Unlabeled & unassigned: 150
```
### Label Distribution
- **bug:** 15 issues
- **feature:** 20 issues
- **docs:** 10 issues
### Assignee Distribution
- **@user1:** 10 issues
- **@user2:** 8 issues
- **@user3:** 7 issues
## Recommendations
Based on analysis:
1. **Add labels to 175 issues** - Categorize for better management
2. **Assign owners to 190 issues** - Ensure accountability
3. **Review 85 stale issues** - Close or re-prioritize
4. **Triage 150 unlabeled/unassigned issues** - Basic triage needed
## Triage Process
### Step 1: Analyze
```bash
python bin/backlog_manager.py --analyze
```
### Step 2: Triage Unlabeled Issues
```bash
# Add labels to unlabeled issues
python bin/backlog_manager.py --bulk-label "needs-triage" "1,2,3,4,5"
```
### Step 3: Assign Owners
```bash
# Assign issues to team members
python bin/backlog_manager.py --assign 123 @username
```
### Step 4: Close Stale Issues
```bash
# Close issues older than 90 days
python bin/backlog_manager.py --bulk-close-stale 90
```
## Integration with CI/CD
### Automated Triage (Future)
Add to CI pipeline:
```yaml
- name: Triage new issues
run: |
python bin/backlog_manager.py --add-label $ISSUE_NUMBER "needs-triage"
python bin/backlog_manager.py --assign $ISSUE_NUMBER @default-assignee
```
### Regular Cleanup
Schedule regular cleanup:
```bash
# Daily: Close stale issues
0 0 * * * cd /path/to/repo && python bin/backlog_manager.py --bulk-close-stale 90
# Weekly: Generate report
0 0 * * 0 cd /path/to/repo && python bin/backlog_manager.py --report > backlog-report-$(date +%Y%m%d).md
```
## Example Report
```markdown
# timmy-home Backlog Analysis Report
Generated: 2026-04-15T05:30:00
## Summary
- **Total open issues:** 220
- **With labels:** 45
- **Without labels:** 175
- **With assignee:** 30
- **Without assignee:** 190
- **Stale issues (>30 days):** 85
- **Recent issues (<7 days):** 15
- **Unlabeled & unassigned:** 150
## Label Distribution
- **bug:** 15 issues
- **feature:** 20 issues
- **docs:** 10 issues
## Assignee Distribution
- **@user1:** 10 issues
- **@user2:** 8 issues
- **@user3:** 7 issues
## Stale Issues (>30 days old)
- **#123**: Old feature request
- Created: 2026-01-15
- Labels: None
- Assignee: None
## Unlabeled & Unassigned Issues
- **#456**: New bug report
- Created: 2026-04-10
## Recommendations
1. **Add labels to 175 issues** - Categorize for better management
2. **Assign owners to 190 issues** - Ensure accountability
3. **Review 85 stale issues** - Close or re-prioritize
4. **Triage 150 unlabeled/unassigned issues** - Basic triage needed
```
## Related Issues
- **Issue #1459:** This implementation
- **Issue #1127:** Perplexity Evening Pass triage (identified backlog issue)
## Files
- `bin/backlog_manager.py` - Backlog management tool
- `docs/backlog-manager.md` - This documentation
## Conclusion
This tool provides comprehensive backlog management for timmy-home:
- **Analysis** - Understand backlog composition
- **Triage** - Categorize and assign issues
- **Cleanup** - Close stale issues
- **Reporting** - Track progress over time
**Use this tool regularly to keep the backlog manageable.**
## License
Part of the Timmy Foundation project.

118
server.py
View File

@@ -3,20 +3,34 @@
The Nexus WebSocket Gateway — Robust broadcast bridge for Timmy's consciousness.
This server acts as the central hub for the-nexus, connecting the mind (nexus_think.py),
the body (Evennia/Morrowind), and the visualization surface.
Security features:
- Binds to 127.0.0.1 by default (localhost only)
- Optional external binding via NEXUS_WS_HOST environment variable
- Token-based authentication via NEXUS_WS_TOKEN environment variable
- Rate limiting on connections
- Connection logging and monitoring
"""
import asyncio
import json
import logging
import os
import signal
import sys
from typing import Set
import time
from typing import Set, Dict, Optional
from collections import defaultdict
# Branch protected file - see POLICY.md
import websockets
# Configuration
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
PORT = int(os.environ.get("NEXUS_WS_PORT", "8765"))
HOST = os.environ.get("NEXUS_WS_HOST", "127.0.0.1") # Default to localhost only
AUTH_TOKEN = os.environ.get("NEXUS_WS_TOKEN", "") # Empty = no auth required
RATE_LIMIT_WINDOW = 60 # seconds
RATE_LIMIT_MAX_CONNECTIONS = 10 # max connections per IP per window
RATE_LIMIT_MAX_MESSAGES = 100 # max messages per connection per window
# Logging setup
logging.basicConfig(
@@ -28,15 +42,97 @@ logger = logging.getLogger("nexus-gateway")
# State
clients: Set[websockets.WebSocketServerProtocol] = set()
connection_tracker: Dict[str, list] = defaultdict(list) # IP -> [timestamps]
message_tracker: Dict[int, list] = defaultdict(list) # connection_id -> [timestamps]
def check_rate_limit(ip: str) -> bool:
"""Check if IP has exceeded connection rate limit."""
now = time.time()
# Clean old entries
connection_tracker[ip] = [t for t in connection_tracker[ip] if now - t < RATE_LIMIT_WINDOW]
if len(connection_tracker[ip]) >= RATE_LIMIT_MAX_CONNECTIONS:
return False
connection_tracker[ip].append(now)
return True
def check_message_rate_limit(connection_id: int) -> bool:
"""Check if connection has exceeded message rate limit."""
now = time.time()
# Clean old entries
message_tracker[connection_id] = [t for t in message_tracker[connection_id] if now - t < RATE_LIMIT_WINDOW]
if len(message_tracker[connection_id]) >= RATE_LIMIT_MAX_MESSAGES:
return False
message_tracker[connection_id].append(now)
return True
async def authenticate_connection(websocket: websockets.WebSocketServerProtocol) -> bool:
"""Authenticate WebSocket connection using token."""
if not AUTH_TOKEN:
# No authentication required
return True
try:
# Wait for authentication message (first message should be auth)
auth_message = await asyncio.wait_for(websocket.recv(), timeout=5.0)
auth_data = json.loads(auth_message)
if auth_data.get("type") != "auth":
logger.warning(f"Invalid auth message type from {websocket.remote_address}")
return False
token = auth_data.get("token", "")
if token != AUTH_TOKEN:
logger.warning(f"Invalid auth token from {websocket.remote_address}")
return False
logger.info(f"Authenticated connection from {websocket.remote_address}")
return True
except asyncio.TimeoutError:
logger.warning(f"Authentication timeout from {websocket.remote_address}")
return False
except json.JSONDecodeError:
logger.warning(f"Invalid auth JSON from {websocket.remote_address}")
return False
except Exception as e:
logger.error(f"Authentication error from {websocket.remote_address}: {e}")
return False
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
clients.add(websocket)
addr = websocket.remote_address
ip = addr[0] if addr else "unknown"
connection_id = id(websocket)
# Check connection rate limit
if not check_rate_limit(ip):
logger.warning(f"Connection rate limit exceeded for {ip}")
await websocket.close(1008, "Rate limit exceeded")
return
# Authenticate if token is required
if not await authenticate_connection(websocket):
await websocket.close(1008, "Authentication failed")
return
clients.add(websocket)
logger.info(f"Client connected from {addr}. Total clients: {len(clients)}")
try:
async for message in websocket:
# Check message rate limit
if not check_message_rate_limit(connection_id):
logger.warning(f"Message rate limit exceeded for {addr}")
await websocket.send(json.dumps({
"type": "error",
"message": "Message rate limit exceeded"
}))
continue
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
@@ -81,6 +177,20 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
async def main():
"""Main server loop with graceful shutdown."""
# Log security configuration
if AUTH_TOKEN:
logger.info("Authentication: ENABLED (token required)")
else:
logger.warning("Authentication: DISABLED (no token required)")
if HOST == "0.0.0.0":
logger.warning("Host binding: 0.0.0.0 (all interfaces) - SECURITY RISK")
else:
logger.info(f"Host binding: {HOST} (localhost only)")
logger.info(f"Rate limiting: {RATE_LIMIT_MAX_CONNECTIONS} connections/IP/{RATE_LIMIT_WINDOW}s, "
f"{RATE_LIMIT_MAX_MESSAGES} messages/connection/{RATE_LIMIT_WINDOW}s")
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
# Set up signal handlers for graceful shutdown

View File

@@ -0,0 +1,193 @@
#!/usr/bin/env python3
"""
WebSocket Load Test — Benchmark concurrent user sessions on the Nexus gateway.
Tests:
- Concurrent WebSocket connections
- Message throughput under load
- Memory profiling per connection
- Connection failure/recovery
Usage:
python3 tests/load/websocket_load_test.py # default (50 users)
python3 tests/load/websocket_load_test.py --users 200 # 200 concurrent
python3 tests/load/websocket_load_test.py --duration 60 # 60 second test
python3 tests/load/websocket_load_test.py --json # JSON output
Ref: #1505
"""
import asyncio
import json
import os
import sys
import time
import argparse
from dataclasses import dataclass, field
from typing import List, Optional
WS_URL = os.environ.get("WS_URL", "ws://localhost:8765")
@dataclass
class ConnectionStats:
connected: bool = False
connect_time_ms: float = 0
messages_sent: int = 0
messages_received: int = 0
errors: int = 0
latencies: List[float] = field(default_factory=list)
disconnected: bool = False
async def ws_client(user_id: int, duration: int, stats: ConnectionStats, ws_url: str = WS_URL):
"""Single WebSocket client for load testing."""
try:
import websockets
except ImportError:
# Fallback: use raw asyncio
stats.errors += 1
return
try:
start = time.time()
async with websockets.connect(ws_url, open_timeout=5) as ws:
stats.connect_time_ms = (time.time() - start) * 1000
stats.connected = True
# Send periodic messages for the duration
end_time = time.time() + duration
msg_count = 0
while time.time() < end_time:
try:
msg_start = time.time()
message = json.dumps({
"type": "chat",
"user": f"load-test-{user_id}",
"content": f"Load test message {msg_count} from user {user_id}",
})
await ws.send(message)
stats.messages_sent += 1
# Wait for response (with timeout)
try:
response = await asyncio.wait_for(ws.recv(), timeout=5.0)
stats.messages_received += 1
latency = (time.time() - msg_start) * 1000
stats.latencies.append(latency)
except asyncio.TimeoutError:
stats.errors += 1
msg_count += 1
await asyncio.sleep(0.5) # 2 messages/sec per user
except websockets.exceptions.ConnectionClosed:
stats.disconnected = True
break
except Exception:
stats.errors += 1
except Exception as e:
stats.errors += 1
if "Connection refused" in str(e) or "connect" in str(e).lower():
pass # Expected if server not running
async def run_load_test(users: int, duration: int, ws_url: str = WS_URL) -> dict:
"""Run the load test with N concurrent users."""
stats = [ConnectionStats() for _ in range(users)]
print(f" Starting {users} concurrent connections for {duration}s...")
start = time.time()
tasks = [ws_client(i, duration, stats[i], ws_url) for i in range(users)]
await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start
# Aggregate results
connected = sum(1 for s in stats if s.connected)
total_sent = sum(s.messages_sent for s in stats)
total_received = sum(s.messages_received for s in stats)
total_errors = sum(s.errors for s in stats)
disconnected = sum(1 for s in stats if s.disconnected)
all_latencies = []
for s in stats:
all_latencies.extend(s.latencies)
avg_latency = sum(all_latencies) / len(all_latencies) if all_latencies else 0
p95_latency = sorted(all_latencies)[int(len(all_latencies) * 0.95)] if all_latencies else 0
p99_latency = sorted(all_latencies)[int(len(all_latencies) * 0.99)] if all_latencies else 0
avg_connect_time = sum(s.connect_time_ms for s in stats if s.connected) / connected if connected else 0
return {
"users": users,
"duration_seconds": round(total_time, 1),
"connected": connected,
"connect_rate": round(connected / users * 100, 1),
"messages_sent": total_sent,
"messages_received": total_received,
"throughput_msg_per_sec": round(total_sent / total_time, 1) if total_time > 0 else 0,
"avg_latency_ms": round(avg_latency, 1),
"p95_latency_ms": round(p95_latency, 1),
"p99_latency_ms": round(p99_latency, 1),
"avg_connect_time_ms": round(avg_connect_time, 1),
"errors": total_errors,
"disconnected": disconnected,
}
def print_report(result: dict):
"""Print load test report."""
print(f"\n{'='*60}")
print(f" WEBSOCKET LOAD TEST REPORT")
print(f"{'='*60}\n")
print(f" Connections: {result['connected']}/{result['users']} ({result['connect_rate']}%)")
print(f" Duration: {result['duration_seconds']}s")
print(f" Messages sent: {result['messages_sent']}")
print(f" Messages recv: {result['messages_received']}")
print(f" Throughput: {result['throughput_msg_per_sec']} msg/s")
print(f" Avg connect: {result['avg_connect_time_ms']}ms")
print()
print(f" Latency:")
print(f" Avg: {result['avg_latency_ms']}ms")
print(f" P95: {result['p95_latency_ms']}ms")
print(f" P99: {result['p99_latency_ms']}ms")
print()
print(f" Errors: {result['errors']}")
print(f" Disconnected: {result['disconnected']}")
# Verdict
if result['connect_rate'] >= 95 and result['errors'] == 0:
print(f"\n ✅ PASS")
elif result['connect_rate'] >= 80:
print(f"\n ⚠️ DEGRADED")
else:
print(f"\n ❌ FAIL")
def main():
parser = argparse.ArgumentParser(description="WebSocket Load Test")
parser.add_argument("--users", type=int, default=50, help="Concurrent users")
parser.add_argument("--duration", type=int, default=30, help="Test duration in seconds")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--url", default=WS_URL, help="WebSocket URL")
args = parser.parse_args()
ws_url = args.url
print(f"\nWebSocket Load Test — {args.users} users, {args.duration}s\n")
result = asyncio.run(run_load_test(args.users, args.duration, ws_url))
if args.json:
print(json.dumps(result, indent=2))
else:
print_report(result)
if __name__ == "__main__":
main()