Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander Whitestone
17317aaf64 feat: bridge Nexus chat with Telegram polling relay (#1537)
Some checks failed
Review Approval Gate / verify-review (pull_request) Failing after 9s
CI / test (pull_request) Failing after 52s
CI / validate (pull_request) Failing after 52s
2026-04-22 12:18:32 -04:00
Alexander Whitestone
da258e772c test: add red coverage for Nexus Telegram bridge (#1537) 2026-04-22 12:10:21 -04:00
5 changed files with 325 additions and 39 deletions

34
app.js
View File

@@ -2148,7 +2148,7 @@ function setupControls() {
function sendChatMessage(overrideText = null) {
// Mine chat message to MemPalace
if (overrideText) {
if (overrideText && window.electronAPI?.execPython) {
window.electronAPI.execPython(`mempalace add_drawer "${this.wing}" "chat" "${overrideText}"`);
}
const input = document.getElementById('chat-input');
@@ -2156,19 +2156,25 @@ function sendChatMessage(overrideText = null) {
if (!text) return;
addChatMessage('user', text);
if (!overrideText) input.value = '';
setTimeout(() => {
const responses = [
'Processing your request through the harness...',
'I have noted this in my thought stream.',
'Acknowledged. Routing to appropriate agent loop.',
'The sovereign space recognizes your command.',
'Running analysis. Results will appear on the main terminal.',
'My crystal ball says... yes. Implementing.',
'Understood, Alexander. Adjusting priorities.',
];
const resp = responses[Math.floor(Math.random() * responses.length)];
addChatMessage('timmy', resp);
}, 500 + Math.random() * 1000);
if (!wsConnected || !hermesWs || hermesWs.readyState !== WebSocket.OPEN) {
addChatMessage('error', 'Message not sent. Nexus gateway offline.');
input.blur();
return;
}
try {
hermesWs.send(JSON.stringify({
type: 'chat',
text,
agent: 'nexus',
source: 'nexus_chat',
timestamp: Date.now(),
}));
} catch (err) {
console.error('Failed to send Nexus chat message:', err);
addChatMessage('error', 'Message not sent. Nexus gateway offline.');
}
input.blur();
}

113
nexus/telegram_bridge.py Normal file
View File

@@ -0,0 +1,113 @@
from __future__ import annotations
import asyncio
import json
import os
import urllib.parse
import urllib.request
from typing import Any, Callable, Optional
class TelegramBridgeConfig:
def __init__(self, bot_token: str, chat_id: str, poll_timeout: int = 4):
self.bot_token = bot_token
self.chat_id = chat_id
self.poll_timeout = poll_timeout
@property
def api_base(self) -> str:
return f"https://api.telegram.org/bot{self.bot_token}"
def config_from_env(env: Optional[dict[str, str]] = None) -> Optional[TelegramBridgeConfig]:
env = env or os.environ
bot_token = env.get("NEXUS_TELEGRAM_BOT_TOKEN") or env.get("TELEGRAM_BOT_TOKEN")
chat_id = (
env.get("NEXUS_TELEGRAM_CHAT_ID")
or env.get("TELEGRAM_HOME_CHANNEL")
or env.get("TELEGRAM_CHAT_ID")
)
if not bot_token or not chat_id:
return None
return TelegramBridgeConfig(bot_token=bot_token, chat_id=str(chat_id), poll_timeout=int(env.get("NEXUS_TELEGRAM_POLL_TIMEOUT", "4")))
def format_outbound_text(sender: str, text: str) -> str:
sender = (sender or "Nexus").strip() or "Nexus"
return f"[{sender}] {text}"
def extract_update_message(update: dict[str, Any], expected_chat_id: str) -> Optional[dict[str, Any]]:
message = update.get("message") or update.get("edited_message")
if not isinstance(message, dict):
return None
if str((message.get("chat") or {}).get("id")) != str(expected_chat_id):
return None
text = message.get("text")
if not text:
return None
sender_info = message.get("from") or {}
sender = sender_info.get("first_name") or sender_info.get("username") or "Telegram"
return {
"update_id": update.get("update_id", 0),
"sender": sender,
"text": text,
"timestamp": message.get("date"),
"source": "telegram",
}
def _default_get_json(url: str) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=15) as resp:
return json.loads(resp.read().decode())
def _default_post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
class TelegramBridge:
def __init__(
self,
config: TelegramBridgeConfig,
*,
get_json: Optional[Callable[[str], dict[str, Any]]] = None,
post_json: Optional[Callable[[str, dict[str, Any]], dict[str, Any]]] = None,
logger: Any = None,
) -> None:
self.config = config
self.get_json = get_json or _default_get_json
self.post_json = post_json or _default_post_json
self.logger = logger
async def send_chat(self, text: str, sender: str = "Nexus") -> dict[str, Any]:
payload = {
"chat_id": self.config.chat_id,
"text": format_outbound_text(sender, text),
}
result = await asyncio.to_thread(self.post_json, f"{self.config.api_base}/sendMessage", payload)
if self.logger and not result.get("ok", True):
self.logger.error("Telegram sendMessage failed: %s", result)
return result
async def poll_once(self, last_update_id: int) -> tuple[int, list[dict[str, Any]]]:
params = urllib.parse.urlencode({
"offset": last_update_id + 1,
"timeout": self.config.poll_timeout,
})
result = await asyncio.to_thread(self.get_json, f"{self.config.api_base}/getUpdates?{params}")
if not result.get("ok", False):
if self.logger:
self.logger.error("Telegram getUpdates failed: %s", result)
return last_update_id, []
messages: list[dict[str, Any]] = []
new_last_update_id = last_update_id
for update in result.get("result", []):
new_last_update_id = max(new_last_update_id, int(update.get("update_id", last_update_id)))
msg = extract_update_message(update, self.config.chat_id)
if msg:
messages.append(msg)
return new_last_update_id, messages

108
server.py
View File

@@ -23,6 +23,8 @@ import time
from typing import Set, Dict, Optional
from collections import defaultdict
from nexus.telegram_bridge import TelegramBridge, config_from_env
# Branch protected file - see POLICY.md
import websockets
@@ -47,6 +49,34 @@ logger = logging.getLogger("nexus-gateway")
clients: Set[websockets.WebSocketServerProtocol] = set()
connection_tracker: Dict[str, list] = defaultdict(list) # IP -> [timestamps]
message_tracker: Dict[int, list] = defaultdict(list) # connection_id -> [timestamps]
telegram_bridge: Optional[TelegramBridge] = None
async def broadcast_json(payload, exclude: Optional[websockets.WebSocketServerProtocol] = None):
"""Broadcast a payload to all connected clients except *exclude*."""
if not clients:
return
disconnected = set()
message = payload if isinstance(payload, str) else json.dumps(payload)
task_client_pairs = []
for client in clients:
if client != exclude and client.open:
task = asyncio.create_task(client.send(message))
task_client_pairs.append((task, client))
if not task_client_pairs:
return
tasks = [pair[0] for pair in task_client_pairs]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
target_client = task_client_pairs[i][1]
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
def check_rate_limit(ip: str) -> bool:
"""Check if IP has exceeded connection rate limit."""
@@ -215,6 +245,31 @@ async def pty_handler(websocket: websockets.WebSocketServerProtocol):
logger.info(f"[PTY] Shell session ended for {addr}")
async def telegram_bridge_loop() -> None:
"""Poll Telegram and rebroadcast inbound chat into the Nexus websocket fabric."""
if telegram_bridge is None:
return
last_update_id = 0
logger.info("Telegram bridge polling started")
while True:
try:
last_update_id, messages = await telegram_bridge.poll_once(last_update_id)
for msg in messages:
await broadcast_json({
"type": "chat",
"text": msg["text"],
"agent": msg["sender"],
"source": "telegram",
"timestamp": msg.get("timestamp"),
})
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Telegram bridge polling error: {e}")
await asyncio.sleep(1.0)
async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"""Handles individual client connections and message broadcasting."""
addr = websocket.remote_address
@@ -245,46 +300,33 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
"message": "Message rate limit exceeded"
}))
continue
data = None
msg_type = None
# Parse for logging/validation if it's JSON
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
# Optional: log specific important message types
if msg_type in ["agent_register", "thought", "action"]:
if msg_type in ["agent_register", "thought", "action", "chat"]:
logger.debug(f"Received {msg_type} from {addr}")
# Handle git status requests from the operator cockpit (issue #1695)
if msg_type == "git_status_request":
git_info = _get_git_status()
await websocket.send(json.dumps(git_info))
continue
if msg_type == "chat" and telegram_bridge and data.get("source") != "telegram":
text = str(data.get("text", "")).strip()
if text:
sender = data.get("agent") or data.get("user") or "Nexus"
await telegram_bridge.send_chat(text, sender=str(sender))
except (json.JSONDecodeError, TypeError):
pass
data = None
msg_type = None
# Broadcast to all OTHER clients
if not clients:
continue
disconnected = set()
# Create broadcast tasks, tracking which client each task targets
task_client_pairs = []
for client in clients:
if client != websocket and client.open:
task = asyncio.create_task(client.send(message))
task_client_pairs.append((task, client))
await broadcast_json(data if data is not None else message, exclude=websocket)
if task_client_pairs:
tasks = [pair[0] for pair in task_client_pairs]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
target_client = task_client_pairs[i][1]
logger.error(f"Failed to send to client {target_client.remote_address}: {result}")
disconnected.add(target_client)
if disconnected:
clients.difference_update(disconnected)
except websockets.exceptions.ConnectionClosed:
logger.debug(f"Connection closed by client {addr}")
except Exception as e:
@@ -295,6 +337,8 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
async def main():
"""Main server loop with graceful shutdown."""
global telegram_bridge
# Log security configuration
if AUTH_TOKEN:
logger.info("Authentication: ENABLED (token required)")
@@ -314,6 +358,15 @@ async def main():
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
stop = loop.create_future()
telegram_task = None
bridge_config = config_from_env(os.environ)
if bridge_config:
telegram_bridge = TelegramBridge(bridge_config, logger=logger)
telegram_task = asyncio.create_task(telegram_bridge_loop())
logger.info(f"Telegram bridge enabled for chat {bridge_config.chat_id}")
else:
logger.info("Telegram bridge disabled — set NEXUS_TELEGRAM_BOT_TOKEN/NEXUS_TELEGRAM_CHAT_ID to enable")
def shutdown():
if not stop.done():
@@ -332,6 +385,11 @@ async def main():
async with websockets.serve(pty_handler, "127.0.0.1", PTY_PORT):
logger.info(f"PTY shell gateway listening on ws://127.0.0.1:{PTY_PORT}/pty")
await stop
if telegram_task:
telegram_task.cancel()
await asyncio.gather(telegram_task, return_exceptions=True)
telegram_bridge = None
logger.info("Shutting down Nexus WS gateway...")
# Close any remaining client connections (handlers may have already cleaned up)

View File

@@ -0,0 +1,12 @@
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
APP_JS = ROOT / "app.js"
def test_send_chat_message_uses_gateway_not_mock_replies():
source = APP_JS.read_text(encoding="utf-8")
assert "Processing your request through the harness..." not in source
assert "Message not sent. Nexus gateway offline." in source
assert "hermesWs.send(JSON.stringify({" in source
assert "source: 'nexus_chat'" in source

View File

@@ -0,0 +1,97 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
import asyncio
ROOT = Path(__file__).resolve().parent.parent
MODULE_PATH = ROOT / "nexus" / "telegram_bridge.py"
def load_module():
spec = spec_from_file_location("nexus_telegram_bridge", MODULE_PATH)
module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_config_from_env_prefers_nexus_specific_values():
module = load_module()
cfg = module.config_from_env({
"NEXUS_TELEGRAM_BOT_TOKEN": "bridge-token",
"NEXUS_TELEGRAM_CHAT_ID": "-100123",
"TELEGRAM_BOT_TOKEN": "shared-token",
"TELEGRAM_HOME_CHANNEL": "-100999",
})
assert cfg.bot_token == "bridge-token"
assert cfg.chat_id == "-100123"
def test_extract_update_message_filters_wrong_chat_and_formats_sender():
module = load_module()
update = {
"update_id": 7,
"message": {
"chat": {"id": -100123},
"from": {"first_name": "Alex", "username": "alex"},
"text": "hello from telegram",
"date": 1710000000,
},
}
msg = module.extract_update_message(update, "-100123")
assert msg["sender"] == "Alex"
assert msg["text"] == "hello from telegram"
assert module.extract_update_message(update, "-100999") is None
def test_poll_once_normalizes_updates_and_advances_offset():
module = load_module()
cfg = module.TelegramBridgeConfig(bot_token="token", chat_id="-100123", poll_timeout=1)
async def run_test():
bridge = module.TelegramBridge(
cfg,
get_json=lambda url: {
"ok": True,
"result": [
{
"update_id": 11,
"message": {
"chat": {"id": -100123},
"from": {"username": "timmy-time"},
"text": "nexus ping",
"date": 1710000001,
},
}
],
},
post_json=lambda url, payload: {"ok": True},
)
last_id, messages = await bridge.poll_once(0)
assert last_id == 11
assert messages == [{
"update_id": 11,
"sender": "timmy-time",
"text": "nexus ping",
"timestamp": 1710000001,
"source": "telegram",
}]
asyncio.run(run_test())
def test_send_chat_formats_sender_prefix():
module = load_module()
sent = {}
cfg = module.TelegramBridgeConfig(bot_token="token", chat_id="-100123")
async def run_test():
bridge = module.TelegramBridge(
cfg,
get_json=lambda url: {"ok": True, "result": []},
post_json=lambda url, payload: sent.setdefault("payload", payload) or {"ok": True},
)
await bridge.send_chat("hello nexus", sender="Nexus")
asyncio.run(run_test())
assert sent["payload"]["chat_id"] == "-100123"
assert sent["payload"]["text"] == "[Nexus] hello nexus"