Compare commits

..

1 Commits

Author SHA1 Message Date
72195ea957 fix: extract clean_lines/parse_room_output/normalize_event to module level (#1509)
Some checks failed
CI / test (pull_request) Failing after 1m21s
CI / validate (pull_request) Failing after 1m17s
Review Approval Gate / verify-review (pull_request) Successful in 12s
Test file imports clean_lines, parse_room_output, normalize_event from
nexus.evennia_ws_bridge, but they were nested inside playback() function.
Moved all three to module level so they're importable.

Also moved actor_located/command_issued/command_result/room_snapshot/session_bound
imports from nested playback() to module-level import block.

playback() now calls module-level functions directly.
2026-04-15 03:01:37 +00:00
4 changed files with 78 additions and 173 deletions

View File

@@ -37,8 +37,6 @@ import shutil
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
@@ -121,6 +119,8 @@ def _check_memory(threshold_pct: int = 90) -> tuple[str, str]:
def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]:
"""Return (status, detail) for Gitea HTTPS reachability."""
import urllib.request
import urllib.error
try:
with urllib.request.urlopen(gitea_url, timeout=10) as resp:
code = resp.status
@@ -131,21 +131,6 @@ def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhiteston
return "WARN", f"Gitea unreachable: {exc}"
def _check_llama_server(endpoint: str = "http://127.0.0.1:11435") -> tuple[str, str]:
"""Return (status, detail) for the local llama.cpp server health endpoint."""
health_url = f"{endpoint.rstrip('/')}/health"
try:
req = urllib.request.Request(health_url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
if data.get("status") == "ok":
model_name = Path(str(data.get("model_path", ""))).name or data.get("model", "unknown-model")
return "OK", f"llama-server healthy at {endpoint} ({model_name})"
return "WARN", f"llama-server unhealthy at {endpoint}: {data}"
except Exception as exc:
return "WARN", f"llama-server unreachable at {endpoint}: {exc}"
def _check_world_readable_secrets() -> tuple[str, str]:
"""Return (status, detail) for world-readable sensitive files."""
sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"]
@@ -187,9 +172,6 @@ def generate_report(date_str: str, checker_mod) -> str:
gitea_status, gitea_detail = _check_gitea_reachability()
rows.append(("Alpha VPS", gitea_status, gitea_detail))
llama_status, llama_detail = _check_llama_server()
rows.append(("Local LLM", llama_status, llama_detail))
sec_status, sec_detail = _check_world_readable_secrets()
rows.append(("Security", sec_status, sec_detail))

View File

@@ -40,9 +40,6 @@ Standardizes local LLM inference across the fleet using llama.cpp.
curl -sf http://localhost:11435/health
curl -s http://localhost:11435/v1/models
Night Watch integration:
- `bin/night_watch.py` probes the local llama.cpp `/health` endpoint and surfaces failures in the nightly report.
## Troubleshooting
- Won't start → smaller model / lower quant

View File

@@ -28,11 +28,16 @@ except ImportError:
websockets = None
from nexus.evennia_event_adapter import (
actor_located,
audit_heartbeat,
command_executed,
command_issued,
command_result,
player_join,
player_leave,
player_move,
room_snapshot,
session_bound,
)
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
@@ -49,31 +54,82 @@ def strip_ansi(text: str) -> str:
return ANSI_RE.sub("", text or "")
def clean_lines(text: str) -> list[str]:
"""Strip ANSI codes and split into non-empty lines."""
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str) -> dict | None:
"""Parse Evennia room output into structured data with title, desc, exits, objects."""
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
"""Normalize a raw Evennia event dict into a list of Nexus event dicts."""
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
class LogTailer:
"""Async file tailer that yields new lines as they appear."""
def __init__(self, path: str, poll_interval: float = 0.5):
self.path = path
self.poll_interval = poll_interval
self._offset = 0
async def tail(self):
"""Yield new lines from the file, starting from end."""
# Start at end of file
if os.path.exists(self.path):
self._offset = os.path.getsize(self.path)
while True:
try:
if not os.path.exists(self.path):
await asyncio.sleep(self.poll_interval)
continue
size = os.path.getsize(self.path)
if size < self._offset:
# File was truncated/rotated
self._offset = 0
if size > self._offset:
with open(self.path, "r") as f:
f.seek(self._offset)
@@ -82,7 +138,7 @@ class LogTailer:
if line:
yield line
self._offset = f.tell()
await asyncio.sleep(self.poll_interval)
except Exception as e:
print(f"[tailer] Error reading {self.path}: {e}", flush=True)
@@ -91,44 +147,44 @@ class LogTailer:
def parse_log_line(line: str) -> Optional[dict]:
"""Parse a log line into a Nexus event, or None if not parseable."""
# Movement events
m = MOVE_RE.search(line)
if m:
return player_move(m.group(1), m.group(3), m.group(2))
# Command events
m = CMD_RE.search(line)
if m:
return command_executed(m.group(1), m.group(2), m.group(3) or "")
# Session start
m = SESSION_START_RE.search(line)
if m:
return player_join(m.group(2), m.group(1))
# Session end
m = SESSION_END_RE.search(line)
if m:
return player_leave("", m.group(1), session_duration=float(m.group(2)))
# Server login
m = LOGIN_RE.search(line)
if m:
return player_join(m.group(1), ip_address=m.group(2))
# Server logout
m = LOGOUT_RE.search(line)
if m:
return player_leave(m.group(1))
return None
async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
"""
Main live bridge loop.
Tails all Evennia log files and streams parsed events to Nexus WebSocket.
Auto-reconnects on failure.
"""
@@ -138,9 +194,9 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
os.path.join(log_dir, "player_activity.log"),
os.path.join(log_dir, "server.log"),
]
event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
async def tail_file(path: str):
"""Tail a single file and put events on queue."""
tailer = LogTailer(path)
@@ -151,7 +207,7 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
event_queue.put_nowait(event)
except asyncio.QueueFull:
pass # Drop oldest if queue full
async def ws_sender():
"""Send events from queue to WebSocket, with auto-reconnect."""
while True:
@@ -162,7 +218,7 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
event = await event_queue.get()
ts = event.get("timestamp", "")[:19]
print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True)
print(f"[bridge] Connecting to {ws_url}...", flush=True)
async with websockets.connect(ws_url) as ws:
print(f"[bridge] Connected to Nexus at {ws_url}", flush=True)
@@ -172,67 +228,17 @@ async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
except Exception as e:
print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True)
await asyncio.sleep(reconnect_delay)
# Start all tailers + sender
tasks = [asyncio.create_task(tail_file(f)) for f in log_files]
tasks.append(asyncio.create_task(ws_sender()))
print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True)
await asyncio.gather(*tasks)
async def playback(log_path: Path, ws_url: str):
"""Legacy mode: replay a telemetry JSONL file."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
def clean_lines(text: str) -> list[str]:
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str):
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
hermes_session_id = log_path.stem
async with websockets.connect(ws_url) as ws:
for line in log_path.read_text(encoding="utf-8").splitlines():
@@ -245,11 +251,6 @@ async def playback(log_path: Path, ws_url: str):
async def inject_event(event_type: str, ws_url: str, **kwargs):
"""Inject a single Evennia event into the Nexus WS gateway. Dev/test use."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
builders = {
"room_snapshot": lambda: room_snapshot(
kwargs.get("room_key", "Gate"),

View File

@@ -1,75 +0,0 @@
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import patch
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
class _FakeResponse:
def __init__(self, payload: dict):
self.payload = json.dumps(payload).encode()
def read(self):
return self.payload
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class _FakeHeartbeatReport:
def to_panel_markdown(self):
return "## Heartbeat Panel\n\nAll jobs healthy."
class _FakeChecker:
@staticmethod
def build_report():
return _FakeHeartbeatReport()
def test_check_llama_server_reports_healthy_model():
import bin.night_watch as nw
with patch(
"bin.night_watch.urllib.request.urlopen",
return_value=_FakeResponse({"status": "ok", "model_path": "/opt/models/llama/Qwen2.5-7B-Instruct-Q4_K_M.gguf"}),
):
status, detail = nw._check_llama_server("http://127.0.0.1:11435")
assert status == "OK"
assert "127.0.0.1:11435" in detail
assert "Qwen2.5-7B-Instruct-Q4_K_M.gguf" in detail
def test_check_llama_server_reports_warning_on_failure():
import bin.night_watch as nw
with patch(
"bin.night_watch.urllib.request.urlopen",
side_effect=OSError("connection refused"),
):
status, detail = nw._check_llama_server("http://127.0.0.1:11435")
assert status == "WARN"
assert "connection refused" in detail
def test_generate_report_includes_local_llm_row():
import bin.night_watch as nw
with patch("bin.night_watch._check_service", return_value=("OK", "hermes-bezalel is active")), \
patch("bin.night_watch._check_disk", return_value=("OK", "disk usage 23%")), \
patch("bin.night_watch._check_memory", return_value=("OK", "memory usage 30%")), \
patch("bin.night_watch._check_gitea_reachability", return_value=("OK", "Gitea HTTPS is responding (200)")), \
patch("bin.night_watch._check_world_readable_secrets", return_value=("OK", "no sensitive recently-modified world-readable files found")), \
patch("bin.night_watch._check_llama_server", return_value=("OK", "llama-server healthy at http://127.0.0.1:11435 (Qwen2.5-7B-Instruct-Q4_K_M.gguf)")):
report = nw.generate_report("2026-04-15", _FakeChecker())
assert "| Local LLM | OK | llama-server healthy at http://127.0.0.1:11435 (Qwen2.5-7B-Instruct-Q4_K_M.gguf) |" in report
assert "## Heartbeat Panel" in report