Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
17317aaf64 feat: bridge Nexus chat with Telegram polling relay (#1537)
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 9s
CI / test (pull_request) Failing after 52s
CI / validate (pull_request) Failing after 52s
2026-04-22 12:18:32 -04:00
Alexander Whitestone
da258e772c test: add red coverage for Nexus Telegram bridge (#1537) 2026-04-22 12:10:21 -04:00
9 changed files with 333 additions and 412 deletions

165
app.js
View File

@@ -104,13 +104,7 @@ const orbitState = {
let flyY = 2;
// ══ POV CAMERA SYSTEM ══
let povMode = false; // true when viewing through agent's eyes
let povAgentIdx = -1; // index into agents[] for POV target (-1 = none)
let savedCameraState = null; // { position: Vector3, rotation: Euler } to restore on exit
const DEFAULT_AGENT_FOV = 75; // default field-of-view for agent POV cameras
// ╡══ INIT ══╡
// ═══ INIT ══
import {
SymbolicEngine, AgentFSM, KnowledgeGraph, Blackboard,
@@ -1340,10 +1334,10 @@ function updateNexusCommand(state) {
// ═══ AGENT PRESENCE SYSTEM ═══
function createAgentPresences() {
const agentData = [
{ id: 'timmy', name: 'TIMMY', color: NEXUS.colors.primary, pos: { x: -4, z: -4 }, station: { x: -4, z: -4 }, fov: 70 },
{ id: 'kimi', name: 'KIMI', color: NEXUS.colors.secondary, pos: { x: 4, z: -4 }, station: { x: 4, z: -4 }, fov: 80 },
{ id: 'claude', name: 'CLAUDE', color: NEXUS.colors.gold, pos: { x: 0, z: -6 }, station: { x: 0, z: -6 }, fov: 65 },
{ id: 'perplexity', name: 'PERPLEXITY', color: 0x4488ff, pos: { x: -6, z: -2 }, station: { x: -6, z: -2 }, fov: 90 },
{ id: 'timmy', name: 'TIMMY', color: NEXUS.colors.primary, pos: { x: -4, z: -4 }, station: { x: -4, z: -4 } },
{ id: 'kimi', name: 'KIMI', color: NEXUS.colors.secondary, pos: { x: 4, z: -4 }, station: { x: 4, z: -4 } },
{ id: 'claude', name: 'CLAUDE', color: NEXUS.colors.gold, pos: { x: 0, z: -6 }, station: { x: 0, z: -6 } },
{ id: 'perplexity', name: 'PERPLEXITY', color: 0x4488ff, pos: { x: -6, z: -2 }, station: { x: -6, z: -2 } },
];
agentData.forEach(data => {
@@ -1399,8 +1393,7 @@ function createAgentPresences() {
color,
station: data.station,
targetPos: new THREE.Vector3(data.pos.x, 0, data.pos.z),
wanderTimer: 0,
fov: data.fov || DEFAULT_AGENT_FOV,
wanderTimer: 0
});
});
}
@@ -1964,97 +1957,7 @@ function updateNavModeUI(mode) {
if (el) el.textContent = mode.toUpperCase();
}
// ══ AGENT POV CAMERA TOGGLE ══
function toggleAgentPOV() {
if (!agents.length) {
addChatMessage('system', 'No agents present to observe.');
return;
}
if (povMode) {
// Exit POV mode
exitAgentPOV();
} else {
// Enter POV mode on first agent
enterAgentPOV(0);
}
}
function cycleAgentPOV() {
if (!agents.length) return;
if (!povMode) {
enterAgentPOV(0);
return;
}
const nextIdx = (povAgentIdx + 1) % agents.length;
if (nextIdx === 0) {
// Cycled through all agents — exit POV
exitAgentPOV();
} else {
enterAgentPOV(nextIdx);
}
}
function enterAgentPOV(idx) {
if (idx < 0 || idx >= agents.length) return;
// Save current camera state before switching
if (!povMode) {
savedCameraState = {
position: camera.position.clone(),
rotation: camera.rotation.clone(),
fov: camera.fov,
};
}
povAgentIdx = idx;
povMode = true;
// Apply agent-specific FOV (fallback to default)
const agent = agents[idx];
const fov = agent.fov || DEFAULT_AGENT_FOV;
camera.fov = fov;
camera.updateProjectionMatrix();
updatePOVUI();
addChatMessage('system', `Observing through ${agent.id.toUpperCase()}'s eyes. FOV: ${fov}°`);
}
function exitAgentPOV() {
if (!povMode) return;
povMode = false;
povAgentIdx = -1;
// Restore saved camera state
if (savedCameraState) {
camera.position.copy(savedCameraState.position);
camera.rotation.copy(savedCameraState.rotation);
camera.fov = savedCameraState.fov;
camera.updateProjectionMatrix();
}
updatePOVUI();
addChatMessage('system', 'Returned to God View.');
}
function updatePOVUI() {
const label = document.getElementById('pov-label');
const btn = document.getElementById('pov-toggle-btn');
if (!label || !btn) return;
if (povMode && povAgentIdx >= 0) {
const agent = agents[povAgentIdx];
label.textContent = agent.id.toUpperCase();
btn.classList.add('pov-active');
} else {
label.textContent = 'AGENT POV';
btn.classList.remove('pov-active');
}
}
// ╡══ CONTROLS ══╡
// ══ CONTROLS ══
function setupControls() {
document.addEventListener('keydown', (e) => {
keys[e.key.toLowerCase()] = true;
@@ -2081,9 +1984,6 @@ function setupControls() {
if (e.key.toLowerCase() === 'v' && document.activeElement !== document.getElementById('chat-input')) {
cycleNavMode();
}
if (e.key.toLowerCase() === 'p' && document.activeElement !== document.getElementById('chat-input')) {
cycleAgentPOV();
}
if (e.key.toLowerCase() === 'f' && activePortal && !portalOverlayActive) {
activatePortal(activePortal);
}
@@ -2233,7 +2133,6 @@ function setupControls() {
document.getElementById('vision-close-btn').addEventListener('click', closeVisionOverlay);
document.getElementById('mode-toggle-btn').addEventListener('click', toggleUIMode);
document.getElementById('pov-toggle-btn').addEventListener('click', cycleAgentPOV);
document.getElementById('atlas-toggle-btn').addEventListener('click', openPortalAtlas);
document.getElementById('atlas-close-btn').addEventListener('click', closePortalAtlas);
initAtlasControls();
@@ -2249,7 +2148,7 @@ function setupControls() {
function sendChatMessage(overrideText = null) {
// Mine chat message to MemPalace
if (overrideText) {
if (overrideText && window.electronAPI?.execPython) {
window.electronAPI.execPython(`mempalace add_drawer "${this.wing}" "chat" "${overrideText}"`);
}
const input = document.getElementById('chat-input');
@@ -2257,19 +2156,25 @@ function sendChatMessage(overrideText = null) {
if (!text) return;
addChatMessage('user', text);
if (!overrideText) input.value = '';
setTimeout(() => {
const responses = [
'Processing your request through the harness...',
'I have noted this in my thought stream.',
'Acknowledged. Routing to appropriate agent loop.',
'The sovereign space recognizes your command.',
'Running analysis. Results will appear on the main terminal.',
'My crystal ball says... yes. Implementing.',
'Understood, Alexander. Adjusting priorities.',
];
const resp = responses[Math.floor(Math.random() * responses.length)];
addChatMessage('timmy', resp);
}, 500 + Math.random() * 1000);
if (!wsConnected || !hermesWs || hermesWs.readyState !== WebSocket.OPEN) {
addChatMessage('error', 'Message not sent. Nexus gateway offline.');
input.blur();
return;
}
try {
hermesWs.send(JSON.stringify({
type: 'chat',
text,
agent: 'nexus',
source: 'nexus_chat',
timestamp: Date.now(),
}));
} catch (err) {
console.error('Failed to send Nexus chat message:', err);
addChatMessage('error', 'Message not sent. Nexus gateway offline.');
}
input.blur();
}
@@ -3471,21 +3376,7 @@ function gameLoop() {
const mode = NAV_MODES[navModeIdx];
const chatActive = document.activeElement === document.getElementById('chat-input');
// Agent POV mode overrides other camera modes
if (povMode && povAgentIdx >= 0 && agents[povAgentIdx]) {
const agent = agents[povAgentIdx];
const orbPos = agent.orb.getWorldPosition(new THREE.Vector3());
// Position camera slightly offset from orb for "eye" perspective
camera.position.copy(orbPos);
camera.position.y += 0.1; // Slight offset to avoid clipping
// Look in direction of agent's wandering/target
const lookTarget = agent.targetPos.clone();
lookTarget.y = camera.position.y;
camera.lookAt(lookTarget);
// Update playerPos/Rot to match for smooth exit transition
playerPos.copy(camera.position);
playerRot.y = Math.atan2(lookTarget.x - camera.position.x, lookTarget.z - camera.position.z);
} else if (mode === 'walk') {
if (mode === 'walk') {
if (!chatActive && !portalOverlayActive) {
const speed = 6 * delta;
const dir = new THREE.Vector3();

View File

@@ -1,98 +0,0 @@
# Nostr Migration Consolidation Plan
> Issue #862 | Canonical Epic: the-nexus #819
> Consolidated From: the-nexus #819 + timmy-config #138
---
## Problem
Two epics tracked the same Telegram -> Nostr migration with overlapping scope:
| Epic | Repo | Focus | Status |
|------|------|-------|--------|
| #819 | the-nexus | Client fork (Nostur), UI/UX, agent presence | **CANONICAL** |
| #138 | timmy-config | Relay/infrastructure, deployment, ops | Tracked child |
Neither was the parent. Work risked duplication and drift.
---
## Resolution
**#819 is the canonical parent epic.** All Nostr migration work rolls up here.
### Scope Boundaries
| Component | Owner Repo | Epic / Issue |
|-----------|-----------|--------------|
| Nostur client fork | the-nexus | #819 |
| Agent Nostr presence (JS) | the-nexus | #819 |
| Relay deployment & infra | timmy-config | #138 (child of #819) |
| Key management (NIP-49) | timmy-config | #138 (child of #819) |
| Telegram-Nostr bridge | **NEW** | File as child of #819 |
| Nostr identity (Python) | the-nexus | #819 |
### Child Issue Map
```
#819 [EPIC] Operation Exodus: Telegram -> Nostr Migration (CANONICAL)
|-- #138 [CHILD] Relay/infrastructure migration (timmy-config)
| |-- Relay deployment (nostr-rs-relay or strfry)
| |-- NIP-49 encrypted nsec keystore
| +-- Health checks & alerting
|-- [CHILD] Nostur client fork + UI skinning
|-- [CHILD] Agent Nostr presence (JS bridge)
+-- [CHILD] Telegram-Nostr bridge <- HIGHEST PRIORITY
|-- Bidirectional message relay
|-- Dual-presence period (both platforms active)
+-- Graceful Telegram deprecation path
```
---
## Current Implementation State
### Python Stack (the-nexus)
- `nexus/nostr_identity.py` - Pure-Python BIP340 Schnorr signatures
- WARNING **Timing side-channel vulnerabilities** (see FINDINGS-issue-801.md)
- Suitable for prototyping; production needs `coincurve` or constant-time rewrite
- `nexus/nostr_publisher.py` - Async WebSocket publisher to public relays
### Browser Stack (the-nexus)
- `app.js:NostrAgent` - Browser-side agent presence
- WARNING Uses **mock signatures** (`mock_id`, `mock_sig`)
- Needs real crypto integration or delegation to Python backend
### Infrastructure (timmy-config)
- `nostr-bridge.service` - Running but source file deleted, only `.pyc` remains
- `/root/nostr-relay/keystore.json` - NIP-49 encrypted nsec storage
---
## Highest Priority: Telegram-Nostr Bridge
The bridge is the critical path. Without it, migration strands users on Telegram.
**Requirements:**
1. Bidirectional message relay (Telegram <-> Nostr)
2. Dual-presence period: both platforms active during transition
3. Graceful deprecation: Telegram bot stays online until 90% of active users have Nostr handles
4. Channel/topic mapping: preserve conversation structure
**File this as a new child issue under #819.**
---
## Action Items
- [ ] Close #138 in timmy-config with comment: "Consolidated into the-nexus #819. Relay/infrastructure work tracked as child of canonical epic."
- [ ] Update #819 title/body to reference this consolidation plan
- [ ] File child issue: Telegram-Nostr bridge (bidirectional, dual-presence)
- [ ] File child issue: Fix timing side-channel in `nostr_identity.py` (or replace with `coincurve`)
- [ ] File child issue: Replace mock signatures in `app.js:NostrAgent` with real crypto
- [ ] Assign owners to each child issue
---
*Sovereignty and service always.*

View File

@@ -1,140 +0,0 @@
# Telegram-Nostr Bridge Specification
> Child of Epic #819 (Operation Exodus: Telegram -> Nostr Migration)
> Priority: HIGHEST
---
## Overview
Bidirectional message relay between Telegram and Nostr during the migration period.
Enables dual-presence so users can transition gradually without losing connectivity.
---
## Requirements
### Functional
1. **Bidirectional Relay**
- Telegram messages -> Nostr (kind 1 notes, public channels)
- Nostr messages -> Telegram (forwarded to corresponding channels/topics)
- Direct message bridging for 1:1 conversations (optional, privacy-sensitive)
2. **Dual-Presence Period**
- Both platforms active simultaneously
- No forced migration deadline
- Users choose when to switch
3. **Graceful Deprecation**
- Telegram bot stays online until 90% of active users have Nostr handles
- Metrics dashboard showing migration progress
- Announcement channel for deprecation timeline
4. **Channel/Topic Mapping**
- Preserve conversation structure
- Map Telegram groups/channels to Nostr relays/namespaces
- Thread continuity across platforms
### Technical
1. **Nostr Side**
- Publish to configured relays (damus.io, nos.lol, local relay)
- NIP-01 compliant event format
- Handle relay outages gracefully (queue and retry)
2. **Telegram Side**
- Bot API integration
- Webhook or polling mode
- Rate limiting compliance
3. **Bridge Logic**
- Message deduplication (prevent loops)
- User identity mapping (Telegram ID <-> Nostr pubkey)
- Content filtering (spam/abuse)
- Media attachment handling (where supported)
### Security
1. **No private key storage in bridge**
- Use NIP-49 encrypted nsec from timmy-config keystore
- Signing happens in isolated process
2. **Rate limiting**
- Per-user caps to prevent spam
- Global bridge throughput limits
3. **Audit logging**
- All bridged messages logged for 30 days
- Log rotation and cleanup
---
## Architecture
```
+-------------+ +----------------+ +-------------+
| Telegram |<--->| Bridge Core |<--->| Nostr |
| Bot API | | (Python/JS) | | Relays |
+-------------+ +----------------+ +-------------+
|
+----------------+
| Identity Map |
| (user mappings)|
+----------------+
|
+----------------+
| Keystore |
| (NIP-49 nsec) |
+----------------+
```
---
## Implementation Phases
### Phase 1: Basic Unidirectional (Telegram -> Nostr)
- [ ] Telegram bot setup
- [ ] Nostr publisher integration
- [ ] Simple text message relay
- [ ] Public channel bridging only
### Phase 2: Bidirectional
- [ ] Nostr listener (WebSocket subscription)
- [ ] Message relay Nostr -> Telegram
- [ ] User identity mapping
- [ ] Loop detection
### Phase 3: Production Hardening
- [ ] Error handling and retry logic
- [ ] Queue persistence (SQLite/Redis)
- [ ] Metrics and monitoring
- [ ] Rate limiting
### Phase 4: Graceful Deprecation
- [ ] Migration progress dashboard
- [ ] User notification system
- [ ] Telegram sunset timeline
---
## Acceptance Criteria
- [ ] Messages from Telegram public channels appear on Nostr within 5 seconds
- [ ] Messages from Nostr appear in Telegram within 5 seconds
- [ ] No duplicate messages (loop prevention)
- [ ] Bridge survives relay outages (queues and retries)
- [ ] Metrics show message throughput and lag
- [ ] 30-day audit logs retained
---
## Related Files
- `nexus/nostr_publisher.py` - Nostr publishing (reusable)
- `nexus/nostr_identity.py` - Signing (needs hardening)
- `docs/nostr-migration/CONSOLIDATION.md` - Parent epic context
---
*Part of Operation Exodus.*

View File

@@ -173,10 +173,6 @@
<span class="hud-icon">👁</span>
<span class="hud-btn-label" id="mode-label">VISITOR</span>
</button>
<button id="pov-toggle-btn" class="hud-icon-btn" title="Agent POV Camera">
<span class="hud-icon">👁</span>
<span class="hud-btn-label" id="pov-label">AGENT POV</span>
</button>
<button id="atlas-toggle-btn" class="hud-icon-btn" title="Portal Atlas">
<span class="hud-icon">🌐</span>
<span class="hud-btn-label">WORLDS</span>
@@ -233,7 +229,6 @@
<span>WASD</span> move &nbsp; <span>Mouse</span> look &nbsp; <span>Enter</span> chat &nbsp;
<span>V</span> mode: <span id="nav-mode-label">WALK</span>
<span id="nav-mode-hint" class="nav-mode-hint"></span>
&nbsp; <span>P</span> agent POV &nbsp;
&nbsp; <span>H</span> archive &nbsp;
<span class="ws-hud-status">HERMES: <span id="ws-status-dot" class="chat-status-dot"></span></span>
</div>

113
nexus/telegram_bridge.py Normal file
View File

@@ -0,0 +1,113 @@
from __future__ import annotations
import asyncio
import json
import os
import urllib.parse
import urllib.request
from typing import Any, Callable, Optional
class TelegramBridgeConfig:
def __init__(self, bot_token: str, chat_id: str, poll_timeout: int = 4):
self.bot_token = bot_token
self.chat_id = chat_id
self.poll_timeout = poll_timeout
@property
def api_base(self) -> str:
return f"https://api.telegram.org/bot{self.bot_token}"
def config_from_env(env: Optional[dict[str, str]] = None) -> Optional[TelegramBridgeConfig]:
env = env or os.environ
bot_token = env.get("NEXUS_TELEGRAM_BOT_TOKEN") or env.get("TELEGRAM_BOT_TOKEN")
chat_id = (
env.get("NEXUS_TELEGRAM_CHAT_ID")
or env.get("TELEGRAM_HOME_CHANNEL")
or env.get("TELEGRAM_CHAT_ID")
)
if not bot_token or not chat_id:
return None
return TelegramBridgeConfig(bot_token=bot_token, chat_id=str(chat_id), poll_timeout=int(env.get("NEXUS_TELEGRAM_POLL_TIMEOUT", "4")))
def format_outbound_text(sender: str, text: str) -> str:
sender = (sender or "Nexus").strip() or "Nexus"
return f"[{sender}] {text}"
def extract_update_message(update: dict[str, Any], expected_chat_id: str) -> Optional[dict[str, Any]]:
message = update.get("message") or update.get("edited_message")
if not isinstance(message, dict):
return None
if str((message.get("chat") or {}).get("id")) != str(expected_chat_id):
return None
text = message.get("text")
if not text:
return None
sender_info = message.get("from") or {}
sender = sender_info.get("first_name") or sender_info.get("username") or "Telegram"
return {
"update_id": update.get("update_id", 0),
"sender": sender,
"text": text,
"timestamp": message.get("date"),
"source": "telegram",
}
def _default_get_json(url: str) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=15) as resp:
return json.loads(resp.read().decode())
def _default_post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
class TelegramBridge:
def __init__(
self,
config: TelegramBridgeConfig,
*,
get_json: Optional[Callable[[str], dict[str, Any]]] = None,
post_json: Optional[Callable[[str, dict[str, Any]], dict[str, Any]]] = None,
logger: Any = None,
) -> None:
self.config = config
self.get_json = get_json or _default_get_json
self.post_json = post_json or _default_post_json
self.logger = logger
async def send_chat(self, text: str, sender: str = "Nexus") -> dict[str, Any]:
payload = {
"chat_id": self.config.chat_id,
"text": format_outbound_text(sender, text),
}
result = await asyncio.to_thread(self.post_json, f"{self.config.api_base}/sendMessage", payload)
if self.logger and not result.get("ok", True):
self.logger.error("Telegram sendMessage failed: %s", result)
return result
async def poll_once(self, last_update_id: int) -> tuple[int, list[dict[str, Any]]]:
params = urllib.parse.urlencode({
"offset": last_update_id + 1,
"timeout": self.config.poll_timeout,
})
result = await asyncio.to_thread(self.get_json, f"{self.config.api_base}/getUpdates?{params}")
if not result.get("ok", False):
if self.logger:
self.logger.error("Telegram getUpdates failed: %s", result)
return last_update_id, []
messages: list[dict[str, Any]] = []
new_last_update_id = last_update_id
for update in result.get("result", []):
new_last_update_id = max(new_last_update_id, int(update.get("update_id", last_update_id)))
msg = extract_update_message(update, self.config.chat_id)
if msg:
messages.append(msg)
return new_last_update_id, messages

108
server.py
View File

@@ -23,6 +23,8 @@ import time
from typing import Set, Dict, Optional
from collections import defaultdict
from nexus.telegram_bridge import TelegramBridge, config_from_env
# Branch protected file - see POLICY.md
import websockets
@@ -47,6 +49,34 @@ logger = logging.getLogger("nexus-gateway")
clients: Set[websockets.WebSocketServerProtocol] = set()
connection_tracker: Dict[str, list] = defaultdict(list) # IP -> [timestamps]
message_tracker: Dict[int, list] = defaultdict(list) # connection_id -> [timestamps]
telegram_bridge: Optional[TelegramBridge] = None
async def broadcast_json(payload, exclude: Optional[websockets.WebSocketServerProtocol] = None):
"""Broadcast a payload to all connected clients except *exclude*."""
if not clients:
return
disconnected = set()
message = payload if isinstance(payload, str) else json.dumps(payload)
task_client_pairs = []
for client in clients:
if client != exclude and client.open:
task = asyncio.create_task(client.send(message))
task_client_pairs.append((task, client))
if not task_client_pairs:
return
tasks = [pair[0] for pair in task_client_pairs]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
target_client = task_client_pairs[i][1]
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
def check_rate_limit(ip: str) -> bool:
"""Check if IP has exceeded connection rate limit."""
@@ -215,6 +245,31 @@ async def pty_handler(websocket: websockets.WebSocketServerProtocol):
logger.info(f"[PTY] Shell session ended for {addr}")
async def telegram_bridge_loop() -> None:
"""Poll Telegram and rebroadcast inbound chat into the Nexus websocket fabric."""
if telegram_bridge is None:
return
last_update_id = 0
logger.info("Telegram bridge polling started")
while True:
try:
last_update_id, messages = await telegram_bridge.poll_once(last_update_id)
for msg in messages:
await broadcast_json({
"type": "chat",
"text": msg["text"],
"agent": msg["sender"],
"source": "telegram",
"timestamp": msg.get("timestamp"),
})
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Telegram bridge polling error: {e}")
await asyncio.sleep(1.0)
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
addr = websocket.remote_address
@@ -245,46 +300,33 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"message": "Message rate limit exceeded"
}))
continue
data = None
msg_type = None
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
# Optional: log specific important message types
if msg_type in ["agent_register", "thought", "action"]:
if msg_type in ["agent_register", "thought", "action", "chat"]:
logger.debug(f"Received {msg_type} from {addr}")
# Handle git status requests from the operator cockpit (issue #1695)
if msg_type == "git_status_request":
git_info = _get_git_status()
await websocket.send(json.dumps(git_info))
continue
if msg_type == "chat" and telegram_bridge and data.get("source") != "telegram":
text = str(data.get("text", "")).strip()
if text:
sender = data.get("agent") or data.get("user") or "Nexus"
await telegram_bridge.send_chat(text, sender=str(sender))
except (json.JSONDecodeError, TypeError):
pass
data = None
msg_type = None
# Broadcast to all OTHER clients
if not clients:
continue
disconnected = set()
# Create broadcast tasks, tracking which client each task targets
task_client_pairs = []
for client in clients:
if client != websocket and client.open:
task = asyncio.create_task(client.send(message))
task_client_pairs.append((task, client))
await broadcast_json(data if data is not None else message, exclude=websocket)
if task_client_pairs:
tasks = [pair[0] for pair in task_client_pairs]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
target_client = task_client_pairs[i][1]
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
except websockets.exceptions.ConnectionClosed:
logger.debug(f"Connection closed by client {addr}")
except Exception as e:
@@ -295,6 +337,8 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
async def main():
"""Main server loop with graceful shutdown."""
global telegram_bridge
# Log security configuration
if AUTH_TOKEN:
logger.info("Authentication: ENABLED (token required)")
@@ -314,6 +358,15 @@ async def main():
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop = loop.create_future()
telegram_task = None
bridge_config = config_from_env(os.environ)
if bridge_config:
telegram_bridge = TelegramBridge(bridge_config, logger=logger)
telegram_task = asyncio.create_task(telegram_bridge_loop())
logger.info(f"Telegram bridge enabled for chat {bridge_config.chat_id}")
else:
logger.info("Telegram bridge disabled — set NEXUS_TELEGRAM_BOT_TOKEN/NEXUS_TELEGRAM_CHAT_ID to enable")
def shutdown():
if not stop.done():
@@ -332,6 +385,11 @@ async def main():
async with websockets.serve(pty_handler, "127.0.0.1", PTY_PORT):
logger.info(f"PTY shell gateway listening on ws://127.0.0.1:{PTY_PORT}/pty")
await stop
if telegram_task:
telegram_task.cancel()
await asyncio.gather(telegram_task, return_exceptions=True)
telegram_bridge = None
logger.info("Shutting down Nexus WS gateway...")
# Close any remaining client connections (handlers may have already cleaned up)

View File

@@ -200,13 +200,6 @@ canvas#nexus-canvas {
box-shadow: 0 0 20px var(--color-primary);
}
.hud-icon-btn.pov-active {
background: var(--color-gold);
border-color: var(--color-gold);
color: var(--color-bg);
box-shadow: 0 0 20px var(--color-gold);
}
.hud-status-item {
display: flex;
align-items: center;

View File

@@ -0,0 +1,12 @@
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
APP_JS = ROOT / "app.js"
def test_send_chat_message_uses_gateway_not_mock_replies():
source = APP_JS.read_text(encoding="utf-8")
assert "Processing your request through the harness..." not in source
assert "Message not sent. Nexus gateway offline." in source
assert "hermesWs.send(JSON.stringify({" in source
assert "source: 'nexus_chat'" in source

View File

@@ -0,0 +1,97 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import asyncio
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "telegram_bridge.py"
def load_module():
spec = spec_from_file_location("nexus_telegram_bridge", MODULE_PATH)
module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_config_from_env_prefers_nexus_specific_values():
module = load_module()
cfg = module.config_from_env({
"NEXUS_TELEGRAM_BOT_TOKEN": "bridge-token",
"NEXUS_TELEGRAM_CHAT_ID": "-100123",
"TELEGRAM_BOT_TOKEN": "shared-token",
"TELEGRAM_HOME_CHANNEL": "-100999",
})
assert cfg.bot_token == "bridge-token"
assert cfg.chat_id == "-100123"
def test_extract_update_message_filters_wrong_chat_and_formats_sender():
module = load_module()
update = {
"update_id": 7,
"message": {
"chat": {"id": -100123},
"from": {"first_name": "Alex", "username": "alex"},
"text": "hello from telegram",
"date": 1710000000,
},
}
msg = module.extract_update_message(update, "-100123")
assert msg["sender"] == "Alex"
assert msg["text"] == "hello from telegram"
assert module.extract_update_message(update, "-100999") is None
def test_poll_once_normalizes_updates_and_advances_offset():
module = load_module()
cfg = module.TelegramBridgeConfig(bot_token="token", chat_id="-100123", poll_timeout=1)
async def run_test():
bridge = module.TelegramBridge(
cfg,
get_json=lambda url: {
"ok": True,
"result": [
{
"update_id": 11,
"message": {
"chat": {"id": -100123},
"from": {"username": "timmy-time"},
"text": "nexus ping",
"date": 1710000001,
},
}
],
},
post_json=lambda url, payload: {"ok": True},
)
last_id, messages = await bridge.poll_once(0)
assert last_id == 11
assert messages == [{
"update_id": 11,
"sender": "timmy-time",
"text": "nexus ping",
"timestamp": 1710000001,
"source": "telegram",
}]
asyncio.run(run_test())
def test_send_chat_formats_sender_prefix():
module = load_module()
sent = {}
cfg = module.TelegramBridgeConfig(bot_token="token", chat_id="-100123")
async def run_test():
bridge = module.TelegramBridge(
cfg,
get_json=lambda url: {"ok": True, "result": []},
post_json=lambda url, payload: sent.setdefault("payload", payload) or {"ok": True},
)
await bridge.send_chat("hello nexus", sender="Nexus")
asyncio.run(run_test())
assert sent["payload"]["chat_id"] == "-100123"
assert sent["payload"]["text"] == "[Nexus] hello nexus"