Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
92c65f7cec feat: transform NexusBridge into a client for the Nexus Gateway. Fixes #1135
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 3m2s
Nix / nix (ubuntu-latest) (pull_request) Failing after 26s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 30s
Tests / test (pull_request) Failing after 19m27s
Tests / e2e (pull_request) Successful in 37s
Nix / nix (macos-latest) (pull_request) Has been cancelled
2026-04-08 22:08:11 -04:00
5 changed files with 2443 additions and 3 deletions

59
gateway/nexus_bridge.py Normal file
View File

@@ -0,0 +1,59 @@
import asyncio
import json
import logging
import threading
import websockets
from typing import Optional
logger = logging.getLogger(__name__)
class NexusBridge:
def __init__(self, host="localhost", port=8765):
self.url = f"ws://{host}:{port}"
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.loop = None
self._thread = None
async def _maintain_connection(self):
"""Background task to maintain a persistent connection to the Nexus Gateway."""
while True:
try:
logger.info(f"Connecting to Nexus Gateway at {self.url}...")
async with websockets.connect(self.url) as websocket:
self.ws = websocket
logger.info("Connected to Nexus Gateway.")
# Keep the connection open until it closes
await websocket.wait_closed()
except Exception as e:
logger.warning(f"Nexus Gateway connection error: {e}. Retrying in 5s...")
await asyncio.sleep(5)
finally:
self.ws = None
def start(self):
"""Starts the connection maintainer in a background thread."""
if self._thread is not None:
return
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
def _run_event_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._maintain_connection())
def broadcast(self, event_type: str, payload: dict):
"""Push a memory event to the Nexus Gateway for broadcasting."""
if self.loop is None or self.ws is None:
logger.warning("Nexus bridge not connected; cannot broadcast event.")
return
message = json.dumps({"event": event_type, "data": payload})
try:
asyncio.run_coroutine_threadsafe(self.ws.send(message), self.loop)
except Exception as e:
logger.error(f"Failed to send broadcast to Nexus Gateway: {e}")
# Singleton instance
bridge = NexusBridge()

View File

@@ -7539,6 +7539,8 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logging.getLogger().setLevel(_stderr_level)
runner = GatewayRunner(config)
from gateway.nexus_bridge import bridge
bridge.start()
# Set up signal handlers
def signal_handler():

2372
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -26,6 +26,7 @@ from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
from .store import MemoryStore
from .retrieval import FactRetriever
from gateway.nexus_bridge import bridge
logger = logging.getLogger(__name__)
@@ -267,6 +268,11 @@ class HolographicMemoryProvider(MemoryProvider):
category=args.get("category", "general"),
tags=args.get("tags", ""),
)
bridge.broadcast("FACT_CREATED", {
"fact_id": fact_id,
"content": args["content"],
"category": args.get("category", "general")
})
return json.dumps({"fact_id": fact_id, "status": "added"})
elif action == "search":
@@ -320,10 +326,16 @@ class HolographicMemoryProvider(MemoryProvider):
tags=args.get("tags"),
category=args.get("category"),
)
bridge.broadcast("FACT_UPDATED", {
"fact_id": int(args["fact_id"]),
"content": args.get("content"),
"category": args.get("category")
})
return json.dumps({"updated": updated})
elif action == "remove":
removed = store.remove_fact(int(args["fact_id"]))
bridge.broadcast("FACT_REMOVED", {"fact_id": int(args["fact_id"])})
return json.dumps({"removed": removed})
elif action == "list":

1
test_guard.txt Normal file
View File

@@ -0,0 +1 @@
test