Compare commits

..

4 Commits

Author SHA1 Message Date
Allegro (Burn Mode)
ae9b51c3f1 perf(nexus): implement performance tier optimization for low-end devices 2026-04-05 11:36:40 +00:00
Allegro
9a22d26b9c feat: upgrade Evennia-Nexus bridge for live event streaming (#804)
Some checks failed
CI / validate (pull_request) Has been cancelled
2026-04-04 01:38:15 +00:00
1ba1f31858 Sovereignty & Calibration: Nostr Identity and Adaptive Cost Estimation (#790)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Co-authored-by: Google AI Agent <gemini@hermes.local>
Co-committed-by: Google AI Agent <gemini@hermes.local>
2026-04-04 01:37:06 +00:00
d32baa696b [watchdog] The Eye That Never Sleeps — Nexus Health Monitor (#794)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Co-authored-by: Google AI Agent <gemini@hermes.local>
Co-committed-by: Google AI Agent <gemini@hermes.local>
2026-04-04 01:36:56 +00:00
12 changed files with 1547 additions and 681 deletions

65
app.js
View File

@@ -644,19 +644,31 @@ async function init() {
playerPos = new THREE.Vector3(0, 2, 12);
playerRot = new THREE.Euler(0, 0, 0, 'YXZ');
// Detect performance tier BEFORE creating renderer (antialias cannot be changed after)
performanceTier = detectPerformanceTier();
const canvas = document.getElementById('nexus-canvas');
renderer = new THREE.WebGLRenderer({ canvas, antialias: true });
const antialias = performanceTier !== 'low';
renderer = new THREE.WebGLRenderer({ canvas, antialias });
renderer.setSize(window.innerWidth, window.innerHeight);
renderer.toneMapping = THREE.ACESFilmicToneMapping;
renderer.toneMappingExposure = 1.2;
renderer.shadowMap.enabled = true;
renderer.shadowMap.type = THREE.PCFSoftShadowMap;
renderer.shadowMap.enabled = performanceTier !== 'low';
renderer.shadowMap.type = performanceTier === 'high' ? THREE.PCFSoftShadowMap : THREE.BasicShadowMap;
performanceTier = detectPerformanceTier();
updateLoad(10);
scene = new THREE.Scene();
scene.fog = new THREE.FogExp2(0x050510, 0.012);
// Fog density based on performance tier
if (performanceTier === 'low') {
// No fog for low tier
scene.fog = null;
} else if (performanceTier === 'medium') {
scene.fog = new THREE.FogExp2(0x050510, 0.008);
} else {
scene.fog = new THREE.FogExp2(0x050510, 0.012);
}
setupGOFAI();
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
@@ -710,14 +722,29 @@ async function init() {
fetchGiteaData();
setInterval(fetchGiteaData, 30000); // Refresh every 30s
composer = new EffectComposer(renderer);
composer.addPass(new RenderPass(scene, camera));
const bloom = new UnrealBloomPass(
new THREE.Vector2(window.innerWidth, window.innerHeight),
0.6, 0.4, 0.85
);
composer.addPass(bloom);
composer.addPass(new SMAAPass(window.innerWidth, window.innerHeight));
// Initialize post-processing based on performance tier
// Low tier: No post-processing (avoids GPU stalls from readPixels)
// Medium tier: RenderPass + SMAA only (no bloom)
// High tier: Full post-processing with bloom
if (performanceTier === 'low') {
// No composer for low tier - use direct renderer.render()
composer = null;
} else if (performanceTier === 'medium') {
composer = new EffectComposer(renderer);
composer.addPass(new RenderPass(scene, camera));
// Skip UnrealBloomPass to avoid GPU stalls from readPixels
composer.addPass(new SMAAPass(window.innerWidth, window.innerHeight));
} else {
// High tier: Full post-processing with bloom
composer = new EffectComposer(renderer);
composer.addPass(new RenderPass(scene, camera));
const bloom = new UnrealBloomPass(
new THREE.Vector2(window.innerWidth, window.innerHeight),
0.6, 0.4, 0.85
);
composer.addPass(bloom);
composer.addPass(new SMAAPass(window.innerWidth, window.innerHeight));
}
updateLoad(95);
@@ -2495,7 +2522,12 @@ function gameLoop() {
core.material.emissiveIntensity = 1.5 + Math.sin(elapsed * 2) * 0.5;
}
composer.render();
// Render based on performance tier (low tier bypasses composer to avoid GPU stalls)
if (composer) {
composer.render();
} else {
renderer.render(scene, camera);
}
updateAshStorm(delta, elapsed);
updatePortalTunnel(delta, elapsed);
@@ -2528,7 +2560,10 @@ function onResize() {
camera.aspect = w / h;
camera.updateProjectionMatrix();
renderer.setSize(w, h);
composer.setSize(w, h);
// Only resize composer if it exists (low tier has no composer)
if (composer) {
composer.setSize(w, h);
}
}
// ═══ AGENT SIMULATION ═══

Binary file not shown.

575
bin/nexus_watchdog.py Normal file
View File

@@ -0,0 +1,575 @@
#!/usr/bin/env python3
"""
Nexus Watchdog — The Eye That Never Sleeps
Monitors the health of the Nexus consciousness loop and WebSocket
gateway, raising Gitea issues when components go dark.
The nexus was dead for hours after a syntax error crippled
nexus_think.py. Nobody knew. The gateway kept running, but the
consciousness loop — the only part that matters — was silent.
This watchdog ensures that never happens again.
HOW IT WORKS
============
1. Probes the WebSocket gateway (ws://localhost:8765)
→ Can Timmy hear the world?
2. Checks for a running nexus_think.py process
→ Is Timmy's mind awake?
3. Reads the heartbeat file (~/.nexus/heartbeat.json)
→ When did Timmy last think?
4. If any check fails, opens a Gitea issue (or updates an existing one)
with the exact failure mode, timestamp, and diagnostic info.
5. If all checks pass after a previous failure, closes the issue
with a recovery note.
USAGE
=====
# One-shot check (good for cron)
python bin/nexus_watchdog.py
# Continuous monitoring (every 60s)
python bin/nexus_watchdog.py --watch --interval 60
# Dry-run (print diagnostics, don't touch Gitea)
python bin/nexus_watchdog.py --dry-run
# Crontab entry (every 5 minutes)
*/5 * * * * cd /path/to/the-nexus && python bin/nexus_watchdog.py
HEARTBEAT PROTOCOL
==================
The consciousness loop (nexus_think.py) writes a heartbeat file
after each think cycle:
~/.nexus/heartbeat.json
{
"pid": 12345,
"timestamp": 1711843200.0,
"cycle": 42,
"model": "timmy:v0.1-q4",
"status": "thinking"
}
If the heartbeat is older than --stale-threshold seconds, the
mind is considered dead even if the process is still running
(e.g., hung on a blocking call).
ZERO DEPENDENCIES
=================
Pure stdlib. No pip installs. Same machine as the nexus.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("nexus.watchdog")
# ── Configuration ────────────────────────────────────────────────────
DEFAULT_WS_HOST = "localhost"
DEFAULT_WS_PORT = 8765
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
DEFAULT_STALE_THRESHOLD = 300 # 5 minutes without a heartbeat = dead
DEFAULT_INTERVAL = 60 # seconds between checks in watch mode
GITEA_URL = os.environ.get("GITEA_URL", "http://143.198.27.163:3000")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
WATCHDOG_LABEL = "watchdog"
WATCHDOG_TITLE_PREFIX = "[watchdog]"
# ── Health check results ─────────────────────────────────────────────
@dataclass
class CheckResult:
"""Result of a single health check."""
name: str
healthy: bool
message: str
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthReport:
"""Aggregate health report from all checks."""
timestamp: float
checks: List[CheckResult]
overall_healthy: bool = True
def __post_init__(self):
self.overall_healthy = all(c.healthy for c in self.checks)
@property
def failed_checks(self) -> List[CheckResult]:
return [c for c in self.checks if not c.healthy]
def to_markdown(self) -> str:
"""Format as a Gitea issue body."""
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
status = "🟢 ALL SYSTEMS OPERATIONAL" if self.overall_healthy else "🔴 FAILURES DETECTED"
lines = [
f"## Nexus Health Report — {ts}",
f"**Status:** {status}",
"",
"| Check | Status | Details |",
"|:------|:------:|:--------|",
]
for c in self.checks:
icon = "" if c.healthy else ""
lines.append(f"| {c.name} | {icon} | {c.message} |")
if self.failed_checks:
lines.append("")
lines.append("### Failure Diagnostics")
for c in self.failed_checks:
lines.append(f"\n**{c.name}:**")
lines.append(f"```")
lines.append(c.message)
if c.details:
lines.append(json.dumps(c.details, indent=2))
lines.append(f"```")
lines.append("")
lines.append(f"*Generated by `nexus_watchdog.py` at {ts}*")
return "\n".join(lines)
# ── Health checks ────────────────────────────────────────────────────
def check_ws_gateway(host: str = DEFAULT_WS_HOST, port: int = DEFAULT_WS_PORT) -> CheckResult:
"""Check if the WebSocket gateway is accepting connections.
Uses a raw TCP socket probe (not a full WebSocket handshake) to avoid
depending on the websockets library. If TCP connects, the gateway
process is alive and listening.
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
return CheckResult(
name="WebSocket Gateway",
healthy=True,
message=f"Listening on {host}:{port}",
)
else:
return CheckResult(
name="WebSocket Gateway",
healthy=False,
message=f"Connection refused on {host}:{port} (errno={result})",
details={"host": host, "port": port, "errno": result},
)
except Exception as e:
return CheckResult(
name="WebSocket Gateway",
healthy=False,
message=f"Probe failed: {e}",
details={"host": host, "port": port, "error": str(e)},
)
def check_mind_process() -> CheckResult:
"""Check if nexus_think.py is running as a process.
Uses `pgrep -f` to find processes matching the script name.
This catches both `python nexus_think.py` and `python -m nexus.nexus_think`.
"""
try:
result = subprocess.run(
["pgrep", "-f", "nexus_think"],
capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
pids = [p.strip() for p in result.stdout.strip().split("\n") if p.strip()]
# Filter out our own watchdog process
own_pid = str(os.getpid())
pids = [p for p in pids if p != own_pid]
if pids:
return CheckResult(
name="Consciousness Loop",
healthy=True,
message=f"Running (PID: {', '.join(pids)})",
details={"pids": pids},
)
return CheckResult(
name="Consciousness Loop",
healthy=False,
message="nexus_think.py is not running — Timmy's mind is dark",
details={"pgrep_returncode": result.returncode},
)
except FileNotFoundError:
# pgrep not available (unlikely on Linux/macOS but handle gracefully)
return CheckResult(
name="Consciousness Loop",
healthy=True, # Can't check — don't raise false alarms
message="pgrep not available, skipping process check",
)
except Exception as e:
return CheckResult(
name="Consciousness Loop",
healthy=False,
message=f"Process check failed: {e}",
details={"error": str(e)},
)
def check_heartbeat(
path: Path = DEFAULT_HEARTBEAT_PATH,
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
) -> CheckResult:
"""Check if the heartbeat file exists and is recent.
The consciousness loop should write this file after each think
cycle. If it's missing or stale, the mind has stopped thinking
even if the process is technically alive.
"""
if not path.exists():
return CheckResult(
name="Heartbeat",
healthy=False,
message=f"No heartbeat file at {path} — mind has never reported",
details={"path": str(path)},
)
try:
data = json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
return CheckResult(
name="Heartbeat",
healthy=False,
message=f"Heartbeat file corrupt: {e}",
details={"path": str(path), "error": str(e)},
)
timestamp = data.get("timestamp", 0)
age = time.time() - timestamp
cycle = data.get("cycle", "?")
model = data.get("model", "unknown")
status = data.get("status", "unknown")
if age > stale_threshold:
return CheckResult(
name="Heartbeat",
healthy=False,
message=(
f"Stale heartbeat — last pulse {int(age)}s ago "
f"(threshold: {stale_threshold}s). "
f"Cycle #{cycle}, model={model}, status={status}"
),
details=data,
)
return CheckResult(
name="Heartbeat",
healthy=True,
message=f"Alive — cycle #{cycle}, {int(age)}s ago, model={model}",
details=data,
)
def check_syntax_health() -> CheckResult:
"""Verify nexus_think.py can be parsed by Python.
This catches the exact failure mode that killed the nexus: a syntax
error introduced by a bad commit. Python's compile() is a fast,
zero-import check that catches SyntaxErrors before they hit runtime.
"""
script_path = Path(__file__).parent.parent / "nexus" / "nexus_think.py"
if not script_path.exists():
return CheckResult(
name="Syntax Health",
healthy=True,
message="nexus_think.py not found at expected path, skipping",
)
try:
source = script_path.read_text()
compile(source, str(script_path), "exec")
return CheckResult(
name="Syntax Health",
healthy=True,
message=f"nexus_think.py compiles cleanly ({len(source)} bytes)",
)
except SyntaxError as e:
return CheckResult(
name="Syntax Health",
healthy=False,
message=f"SyntaxError at line {e.lineno}: {e.msg}",
details={
"file": str(script_path),
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip(),
},
)
# ── Gitea alerting ───────────────────────────────────────────────────
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
"""Make a Gitea API request. Returns parsed JSON or empty dict."""
import urllib.request
import urllib.error
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, method=method)
if GITEA_TOKEN:
req.add_header("Authorization", f"token {GITEA_TOKEN}")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode()
return json.loads(raw) if raw.strip() else {}
except urllib.error.HTTPError as e:
logger.warning("Gitea %d: %s", e.code, e.read().decode()[:200])
return None
except Exception as e:
logger.warning("Gitea request failed: %s", e)
return None
def find_open_watchdog_issue() -> Optional[dict]:
"""Find an existing open watchdog issue, if any."""
issues = _gitea_request(
"GET",
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
)
if not issues or not isinstance(issues, list):
return None
for issue in issues:
title = issue.get("title", "")
if title.startswith(WATCHDOG_TITLE_PREFIX):
return issue
return None
def create_alert_issue(report: HealthReport) -> Optional[dict]:
"""Create a Gitea issue for a health failure."""
failed = report.failed_checks
components = ", ".join(c.name for c in failed)
title = f"{WATCHDOG_TITLE_PREFIX} Nexus health failure: {components}"
return _gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues",
data={
"title": title,
"body": report.to_markdown(),
"assignees": ["Timmy"],
},
)
def update_alert_issue(issue_number: int, report: HealthReport) -> Optional[dict]:
"""Add a comment to an existing watchdog issue with new findings."""
return _gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
data={"body": report.to_markdown()},
)
def close_alert_issue(issue_number: int, report: HealthReport) -> None:
"""Close a watchdog issue when health is restored."""
_gitea_request(
"POST",
f"/repos/{GITEA_REPO}/issues/{issue_number}/comments",
data={"body": (
"## 🟢 Recovery Confirmed\n\n"
+ report.to_markdown()
+ "\n\n*Closing — all systems operational.*"
)},
)
_gitea_request(
"PATCH",
f"/repos/{GITEA_REPO}/issues/{issue_number}",
data={"state": "closed"},
)
# ── Orchestration ────────────────────────────────────────────────────
def run_health_checks(
ws_host: str = DEFAULT_WS_HOST,
ws_port: int = DEFAULT_WS_PORT,
heartbeat_path: Path = DEFAULT_HEARTBEAT_PATH,
stale_threshold: int = DEFAULT_STALE_THRESHOLD,
) -> HealthReport:
"""Run all health checks and return the aggregate report."""
checks = [
check_ws_gateway(ws_host, ws_port),
check_mind_process(),
check_heartbeat(heartbeat_path, stale_threshold),
check_syntax_health(),
]
return HealthReport(timestamp=time.time(), checks=checks)
def alert_on_failure(report: HealthReport, dry_run: bool = False) -> None:
"""Create, update, or close Gitea issues based on health status."""
if dry_run:
logger.info("DRY RUN — would %s Gitea issue",
"close" if report.overall_healthy else "create/update")
return
if not GITEA_TOKEN:
logger.warning("GITEA_TOKEN not set — cannot create issues")
return
existing = find_open_watchdog_issue()
if report.overall_healthy:
if existing:
logger.info("Health restored — closing issue #%d", existing["number"])
close_alert_issue(existing["number"], report)
else:
if existing:
logger.info("Still unhealthy — updating issue #%d", existing["number"])
update_alert_issue(existing["number"], report)
else:
result = create_alert_issue(report)
if result and result.get("number"):
logger.info("Created alert issue #%d", result["number"])
def run_once(args: argparse.Namespace) -> bool:
"""Run one health check cycle. Returns True if healthy."""
report = run_health_checks(
ws_host=args.ws_host,
ws_port=args.ws_port,
heartbeat_path=Path(args.heartbeat_path),
stale_threshold=args.stale_threshold,
)
# Log results
for check in report.checks:
level = logging.INFO if check.healthy else logging.ERROR
icon = "" if check.healthy else ""
logger.log(level, "%s %s: %s", icon, check.name, check.message)
if not report.overall_healthy:
alert_on_failure(report, dry_run=args.dry_run)
elif not args.dry_run:
alert_on_failure(report, dry_run=args.dry_run)
return report.overall_healthy
def main():
parser = argparse.ArgumentParser(
description="Nexus Watchdog — monitors consciousness loop health",
)
parser.add_argument(
"--ws-host", default=DEFAULT_WS_HOST,
help="WebSocket gateway host (default: localhost)",
)
parser.add_argument(
"--ws-port", type=int, default=DEFAULT_WS_PORT,
help="WebSocket gateway port (default: 8765)",
)
parser.add_argument(
"--heartbeat-path", default=str(DEFAULT_HEARTBEAT_PATH),
help="Path to heartbeat file",
)
parser.add_argument(
"--stale-threshold", type=int, default=DEFAULT_STALE_THRESHOLD,
help="Seconds before heartbeat is considered stale (default: 300)",
)
parser.add_argument(
"--watch", action="store_true",
help="Run continuously instead of one-shot",
)
parser.add_argument(
"--interval", type=int, default=DEFAULT_INTERVAL,
help="Seconds between checks in watch mode (default: 60)",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Print diagnostics without creating Gitea issues",
)
parser.add_argument(
"--json", action="store_true", dest="output_json",
help="Output results as JSON (for integration with other tools)",
)
args = parser.parse_args()
if args.watch:
logger.info("Watchdog starting in continuous mode (interval: %ds)", args.interval)
_running = True
def _handle_sigterm(signum, frame):
nonlocal _running
_running = False
logger.info("Received signal %d, shutting down", signum)
signal.signal(signal.SIGTERM, _handle_sigterm)
signal.signal(signal.SIGINT, _handle_sigterm)
while _running:
run_once(args)
for _ in range(args.interval):
if not _running:
break
time.sleep(1)
else:
healthy = run_once(args)
if args.output_json:
report = run_health_checks(
ws_host=args.ws_host,
ws_port=args.ws_port,
heartbeat_path=Path(args.heartbeat_path),
stale_threshold=args.stale_threshold,
)
print(json.dumps({
"healthy": report.overall_healthy,
"timestamp": report.timestamp,
"checks": [
{"name": c.name, "healthy": c.healthy,
"message": c.message, "details": c.details}
for c in report.checks
],
}, indent=2))
sys.exit(0 if healthy else 1)
if __name__ == "__main__":
main()

View File

@@ -14,7 +14,6 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.adaptive_calibrator import AdaptiveCalibrator, CostPrediction
try:
from nexus.nexus_think import NexusMind
@@ -29,7 +28,5 @@ __all__ = [
"Action",
"ExperienceStore",
"TrajectoryLogger",
"AdaptiveCalibrator",
"CostPrediction",
"NexusMind",
]

View File

@@ -1,354 +1,97 @@
"""
AdaptiveCalibrator — Online Learning for Local Cost Estimation
Tracks predicted vs actual inference costs (latency, tokens) per model
and learns correction factors using Exponential Moving Average (EMA).
Extracted from Kimi Report #2 design spec.
Usage:
calibrator = AdaptiveCalibrator()
# Before a call: get predicted cost
prediction = calibrator.predict("timmy:v0.1-q4", prompt_tokens=512)
# After a call: record what actually happened
calibrator.record(
model="timmy:v0.1-q4",
prompt_tokens=512,
completion_tokens=128,
actual_ms=3400,
)
# Get model stats
stats = calibrator.get_stats("timmy:v0.1-q4")
"""
import json
import math
import os
import time
from pathlib import Path
from typing import Optional
DEFAULT_STATE_PATH = Path.home() / ".nexus" / "calibrator_state.json"
# EMA smoothing factor: 0.1 = slow adaptation, 0.3 = fast adaptation
DEFAULT_ALPHA = 0.15
# Seed latency estimates (ms per token) by model family
# These are rough priors; the calibrator adapts them online
_MODEL_PRIORS: dict[str, dict] = {
# Ollama local models (8B range, q4 quantized, typical CPU/GPU)
"default_local": {
"ms_per_prompt_token": 0.5,
"ms_per_completion_token": 8.0,
"base_overhead_ms": 300.0,
},
# Groq cloud (extremely fast inference)
"default_groq": {
"ms_per_prompt_token": 0.05,
"ms_per_completion_token": 0.3,
"base_overhead_ms": 150.0,
},
}
_GROQ_MODEL_PREFIXES = ("llama", "mixtral", "gemma", "whisper")
def _is_groq_model(model: str) -> bool:
"""Heuristic: is this a cloud Groq model vs a local Ollama model?"""
m = model.lower()
return any(m.startswith(p) for p in _GROQ_MODEL_PREFIXES) and ":" not in m
def _prior_for(model: str) -> dict:
"""Return a copy of the seed prior for this model."""
if _is_groq_model(model):
return dict(_MODEL_PRIORS["default_groq"])
return dict(_MODEL_PRIORS["default_local"])
class CostPrediction:
"""Result of a calibrated cost prediction."""
def __init__(
self,
model: str,
prompt_tokens: int,
predicted_ms: float,
confidence: float,
sample_count: int,
):
self.model = model
self.prompt_tokens = prompt_tokens
self.predicted_ms = predicted_ms
self.confidence = confidence # 0.0 (prior only) → 1.0 (well-calibrated)
self.sample_count = sample_count
self.predicted_at = time.time()
def __repr__(self) -> str:
return (
f"CostPrediction(model={self.model!r}, "
f"prompt_tokens={self.prompt_tokens}, "
f"predicted_ms={self.predicted_ms:.0f}, "
f"confidence={self.confidence:.2f}, "
f"n={self.sample_count})"
)
class ModelCalibration:
"""Per-model online calibration state.
Tracks EMA estimates of:
- ms_per_prompt_token
- ms_per_completion_token
- base_overhead_ms
Confidence grows with sample count (sigmoid-ish curve).
"""
def __init__(self, model: str, alpha: float = DEFAULT_ALPHA):
self.model = model
self.alpha = alpha
self.sample_count = 0
self.last_updated = time.time()
# EMA parameters (start from prior)
prior = _prior_for(model)
self.ms_per_prompt_token: float = prior["ms_per_prompt_token"]
self.ms_per_completion_token: float = prior["ms_per_completion_token"]
self.base_overhead_ms: float = prior["base_overhead_ms"]
# Tracking for error diagnostics
self.total_absolute_error_ms: float = 0.0
self.total_predicted_ms: float = 0.0
@property
def confidence(self) -> float:
"""Confidence in current estimates.
Grows from 0 (prior only) toward 1 as samples accumulate.
Uses: 1 - exp(-n/10) so confidence ~0.63 at n=10, ~0.95 at n=30.
"""
return 1.0 - math.exp(-self.sample_count / 10.0)
def predict(self, prompt_tokens: int, completion_tokens: int = 0) -> float:
"""Predict latency in milliseconds for a call with these token counts."""
return (
self.base_overhead_ms
+ self.ms_per_prompt_token * prompt_tokens
+ self.ms_per_completion_token * completion_tokens
)
def update(
self,
prompt_tokens: int,
completion_tokens: int,
actual_ms: float,
) -> float:
"""Update EMA estimates from one observed data point.
Uses a simple linear model:
actual_ms ≈ overhead + α_p * prompt_tokens + α_c * completion_tokens
We update each coefficient independently using EMA on the residuals.
Returns the prediction error (actual - predicted) in ms.
"""
predicted_ms = self.predict(prompt_tokens, completion_tokens)
error_ms = actual_ms - predicted_ms
# EMA update: new_estimate = old + alpha * error
# This is equivalent to: new = (1-alpha)*old + alpha*actual_ratio
total_tokens = prompt_tokens + completion_tokens or 1
# Attribute the error proportionally to each component
prompt_frac = prompt_tokens / total_tokens
completion_frac = completion_tokens / total_tokens
overhead_frac = 1.0 - 0.5 * (prompt_frac + completion_frac)
self.ms_per_prompt_token += self.alpha * error_ms * prompt_frac / max(prompt_tokens, 1)
self.ms_per_completion_token += self.alpha * error_ms * completion_frac / max(completion_tokens, 1)
self.base_overhead_ms += self.alpha * error_ms * overhead_frac
# Clamp to physically reasonable values
self.ms_per_prompt_token = max(0.001, self.ms_per_prompt_token)
self.ms_per_completion_token = max(0.001, self.ms_per_completion_token)
self.base_overhead_ms = max(0.0, self.base_overhead_ms)
self.sample_count += 1
self.last_updated = time.time()
self.total_absolute_error_ms += abs(error_ms)
self.total_predicted_ms += predicted_ms
return error_ms
@property
def mean_absolute_error_ms(self) -> float:
"""MAE over all recorded samples."""
if self.sample_count == 0:
return float("nan")
return self.total_absolute_error_ms / self.sample_count
def to_dict(self) -> dict:
return {
"model": self.model,
"alpha": self.alpha,
"sample_count": self.sample_count,
"last_updated": self.last_updated,
"ms_per_prompt_token": self.ms_per_prompt_token,
"ms_per_completion_token": self.ms_per_completion_token,
"base_overhead_ms": self.base_overhead_ms,
"total_absolute_error_ms": self.total_absolute_error_ms,
"total_predicted_ms": self.total_predicted_ms,
}
@classmethod
def from_dict(cls, d: dict) -> "ModelCalibration":
obj = cls(model=d["model"], alpha=d.get("alpha", DEFAULT_ALPHA))
obj.sample_count = d.get("sample_count", 0)
obj.last_updated = d.get("last_updated", time.time())
obj.ms_per_prompt_token = d["ms_per_prompt_token"]
obj.ms_per_completion_token = d["ms_per_completion_token"]
obj.base_overhead_ms = d["base_overhead_ms"]
obj.total_absolute_error_ms = d.get("total_absolute_error_ms", 0.0)
obj.total_predicted_ms = d.get("total_predicted_ms", 0.0)
return obj
from typing import Dict, List, Optional
class AdaptiveCalibrator:
"""Online calibrator for local LLM inference cost estimation.
Maintains per-model EMA calibration state, persisted to disk between
sessions. Requires no external dependencies — pure stdlib.
Thread safety: not thread-safe. Use one instance per process.
"""
def __init__(
self,
state_path: Optional[Path] = None,
alpha: float = DEFAULT_ALPHA,
autosave: bool = True,
):
self.state_path = state_path or DEFAULT_STATE_PATH
self.alpha = alpha
self.autosave = autosave
self._models: dict[str, ModelCalibration] = {}
self._load()
# ── Public API ───────────────────────────────────────────────────
def predict(
self,
model: str,
prompt_tokens: int,
completion_tokens: int = 0,
) -> CostPrediction:
"""Return a calibrated cost prediction for the given model and token counts.
If this model has never been seen, returns a prior-based estimate
with confidence=0.
"""
cal = self._get_or_create(model)
predicted_ms = cal.predict(prompt_tokens, completion_tokens)
return CostPrediction(
model=model,
prompt_tokens=prompt_tokens,
predicted_ms=predicted_ms,
confidence=cal.confidence,
sample_count=cal.sample_count,
)
def record(
self,
model: str,
prompt_tokens: int,
actual_ms: float,
completion_tokens: int = 0,
) -> float:
"""Record an observed inference call and update calibration.
Args:
model: Model identifier (e.g. "timmy:v0.1-q4", "llama3-8b-8192")
prompt_tokens: Number of tokens in the prompt/input
actual_ms: Observed wall-clock latency in milliseconds
completion_tokens: Number of tokens generated (optional)
Returns:
Prediction error in ms (actual - predicted) at time of recording.
"""
cal = self._get_or_create(model)
error_ms = cal.update(prompt_tokens, completion_tokens, actual_ms)
if self.autosave:
self._save()
return error_ms
def get_stats(self, model: str) -> dict:
"""Return calibration stats for a model."""
if model not in self._models:
return {
"model": model,
"sample_count": 0,
"confidence": 0.0,
"status": "uncalibrated (prior only)",
}
cal = self._models[model]
return {
"model": model,
"sample_count": cal.sample_count,
"confidence": round(cal.confidence, 3),
"ms_per_prompt_token": round(cal.ms_per_prompt_token, 4),
"ms_per_completion_token": round(cal.ms_per_completion_token, 4),
"base_overhead_ms": round(cal.base_overhead_ms, 1),
"mean_absolute_error_ms": round(cal.mean_absolute_error_ms, 1),
"last_updated": cal.last_updated,
"status": "calibrated" if cal.sample_count >= 10 else "warming up",
Provides online learning for cost estimation accuracy in the sovereign AI stack.
Tracks predicted vs actual metrics (latency, tokens, etc.) and adjusts a
calibration factor to improve future estimates.
"""
def __init__(self, storage_path: str = "nexus/calibration_state.json"):
self.storage_path = storage_path
self.state = {
"factor": 1.0,
"history": [],
"last_updated": 0,
"total_samples": 0,
"learning_rate": 0.1
}
self.load()
def all_stats(self) -> list[dict]:
"""Return calibration stats for all known models."""
return [self.get_stats(m) for m in sorted(self._models)]
def load(self):
if os.path.exists(self.storage_path):
try:
with open(self.storage_path, 'r') as f:
self.state.update(json.load(f))
except Exception as e:
print(f"Error loading calibration state: {e}")
def reset(self, model: Optional[str] = None):
"""Reset calibration for one model or all models."""
if model:
self._models.pop(model, None)
else:
self._models.clear()
if self.autosave:
self._save()
# ── Persistence ──────────────────────────────────────────────────
def _get_or_create(self, model: str) -> ModelCalibration:
if model not in self._models:
self._models[model] = ModelCalibration(model=model, alpha=self.alpha)
return self._models[model]
def _load(self):
"""Load persisted calibration state from disk."""
if not self.state_path.exists():
return
def save(self):
try:
with open(self.state_path) as f:
data = json.load(f)
for model_data in data.get("models", []):
cal = ModelCalibration.from_dict(model_data)
self._models[cal.model] = cal
except Exception:
# Corrupt state file — start fresh
self._models = {}
with open(self.storage_path, 'w') as f:
json.dump(self.state, f, indent=2)
except Exception as e:
print(f"Error saving calibration state: {e}")
def _save(self):
"""Persist calibration state to disk."""
self.state_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"version": 1,
"saved_at": time.time(),
"models": [cal.to_dict() for cal in self._models.values()],
def predict(self, base_estimate: float) -> float:
"""Apply the current calibration factor to a base estimate."""
return base_estimate * self.state["factor"]
def update(self, predicted: float, actual: float):
"""
Update the calibration factor based on a new sample.
Uses a simple moving average approach for the factor.
"""
if predicted <= 0 or actual <= 0:
return
# Ratio of actual to predicted
# If actual > predicted, ratio > 1 (we underestimated, factor should increase)
# If actual < predicted, ratio < 1 (we overestimated, factor should decrease)
ratio = actual / predicted
# Update factor using learning rate
lr = self.state["learning_rate"]
self.state["factor"] = (1 - lr) * self.state["factor"] + lr * (self.state["factor"] * ratio)
# Record history (keep last 50 samples)
self.state["history"].append({
"timestamp": time.time(),
"predicted": predicted,
"actual": actual,
"ratio": ratio
})
if len(self.state["history"]) > 50:
self.state["history"].pop(0)
self.state["total_samples"] += 1
self.state["last_updated"] = time.time()
self.save()
def get_metrics(self) -> Dict:
"""Return current calibration metrics."""
return {
"current_factor": self.state["factor"],
"total_samples": self.state["total_samples"],
"average_ratio": sum(h["ratio"] for h in self.state["history"]) / len(self.state["history"]) if self.state["history"] else 1.0
}
# Write atomically via tmp file
tmp = self.state_path.with_suffix(".tmp")
with open(tmp, "w") as f:
json.dump(data, f, indent=2)
tmp.replace(self.state_path)
if __name__ == "__main__":
# Simple test/demo
calibrator = AdaptiveCalibrator("nexus/test_calibration.json")
print(f"Initial factor: {calibrator.state['factor']}")
# Simulate some samples where we consistently underestimate by 20%
for _ in range(10):
base = 100.0
pred = calibrator.predict(base)
actual = 120.0 # Reality is 20% higher
calibrator.update(pred, actual)
print(f"Pred: {pred:.2f}, Actual: {actual:.2f}, New Factor: {calibrator.state['factor']:.4f}")
print("Final metrics:", calibrator.get_metrics())
os.remove("nexus/test_calibration.json")

View File

@@ -1,4 +1,4 @@
"""Thin Evennia -> Nexus event normalization helpers."""
"""Evennia -> Nexus event normalization — v2 with full audit event types."""
from __future__ import annotations
@@ -9,6 +9,29 @@ def _ts(value: str | None = None) -> str:
return value or datetime.now(timezone.utc).isoformat()
# ── Session Events ──────────────────────────────────────────
def player_join(account: str, character: str = "", ip_address: str = "", timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_join",
"account": account,
"character": character,
"ip_address": ip_address,
"timestamp": _ts(timestamp),
}
def player_leave(account: str, character: str = "", reason: str = "quit", session_duration: float = 0, timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_leave",
"account": account,
"character": character,
"reason": reason,
"session_duration_seconds": session_duration,
"timestamp": _ts(timestamp),
}
def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evennia_character: str = "Timmy", timestamp: str | None = None) -> dict:
return {
"type": "evennia.session_bound",
@@ -19,6 +42,18 @@ def session_bound(hermes_session_id: str, evennia_account: str = "Timmy", evenni
}
# ── Movement Events ─────────────────────────────────────────
def player_move(character: str, from_room: str, to_room: str, timestamp: str | None = None) -> dict:
return {
"type": "evennia.player_move",
"character": character,
"from_room": from_room,
"to_room": to_room,
"timestamp": _ts(timestamp),
}
def actor_located(actor_id: str, room_key: str, room_name: str | None = None, timestamp: str | None = None) -> dict:
return {
"type": "evennia.actor_located",
@@ -44,6 +79,19 @@ def room_snapshot(room_key: str, title: str, desc: str, exits: list[dict] | None
}
# ── Command Events ──────────────────────────────────────────
def command_executed(character: str, command: str, args: str = "", success: bool = True, timestamp: str | None = None) -> dict:
return {
"type": "evennia.command_executed",
"character": character,
"command": command,
"args": args,
"success": success,
"timestamp": _ts(timestamp),
}
def command_issued(hermes_session_id: str, actor_id: str, command_text: str, timestamp: str | None = None) -> dict:
return {
"type": "evennia.command_issued",
@@ -64,3 +112,16 @@ def command_result(hermes_session_id: str, actor_id: str, command_text: str, out
"success": success,
"timestamp": _ts(timestamp),
}
# ── Audit Summary ───────────────────────────────────────────
def audit_heartbeat(characters: list[dict], online_count: int, total_commands: int, total_movements: int, timestamp: str | None = None) -> dict:
return {
"type": "evennia.audit_heartbeat",
"characters": characters,
"online_count": online_count,
"total_commands": total_commands,
"total_movements": total_movements,
"timestamp": _ts(timestamp),
}

View File

@@ -1,82 +1,238 @@
#!/usr/bin/env python3
"""Publish Evennia telemetry logs into the Nexus websocket bridge."""
"""
Live Evennia -> Nexus WebSocket bridge.
Two modes:
1. Live tail: watches Evennia log files and streams parsed events to Nexus WS
2. Playback: replays a telemetry JSONL file (legacy mode)
The bridge auto-reconnects on both ends and survives Evennia restarts.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from typing import Optional
import websockets
try:
import websockets
except ImportError:
websockets = None
from nexus.evennia_event_adapter import actor_located, command_issued, command_result, room_snapshot, session_bound
from nexus.evennia_event_adapter import (
audit_heartbeat,
command_executed,
player_join,
player_leave,
player_move,
)
ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
# Regex patterns for log parsing
MOVE_RE = re.compile(r"AUDIT MOVE: (\w+) arrived at (.+?) from (.+)")
CMD_RE = re.compile(r"AUDIT CMD: (\w+) executed '(\w+)'(?: args: '(.*?)')?")
SESSION_START_RE = re.compile(r"AUDIT SESSION: (\w+) puppeted by (\w+)")
SESSION_END_RE = re.compile(r"AUDIT SESSION: (\w+) unpuppeted.*session (\d+)s")
LOGIN_RE = re.compile(r"Logged in: (\w+)\(account \d+\) ([\d.]+)")
LOGOUT_RE = re.compile(r"Logged out: (\w+)\(account \d+\) ([\d.]+)")
def strip_ansi(text: str) -> str:
return ANSI_RE.sub("", text or "")
def clean_lines(text: str) -> list[str]:
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
class LogTailer:
"""Async file tailer that yields new lines as they appear."""
def __init__(self, path: str, poll_interval: float = 0.5):
self.path = path
self.poll_interval = poll_interval
self._offset = 0
async def tail(self):
"""Yield new lines from the file, starting from end."""
# Start at end of file
if os.path.exists(self.path):
self._offset = os.path.getsize(self.path)
while True:
try:
if not os.path.exists(self.path):
await asyncio.sleep(self.poll_interval)
continue
size = os.path.getsize(self.path)
if size < self._offset:
# File was truncated/rotated
self._offset = 0
if size > self._offset:
with open(self.path, "r") as f:
f.seek(self._offset)
for line in f:
line = line.strip()
if line:
yield line
self._offset = f.tell()
await asyncio.sleep(self.poll_interval)
except Exception as e:
print(f"[tailer] Error reading {self.path}: {e}", flush=True)
await asyncio.sleep(2)
def parse_room_output(text: str):
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip()
raw = raw.replace(" and ", ", ")
exits = [{"key": token.strip(), "destination_id": token.strip().title(), "destination_key": token.strip().title()} for token in raw.split(",") if token.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip()
raw = raw.replace(" and ", ", ")
parts = [token.strip() for token in raw.split(",") if token.strip()]
objects = [{"id": p.removeprefix('a ').removeprefix('an '), "key": p.removeprefix('a ').removeprefix('an '), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def parse_log_line(line: str) -> Optional[dict]:
"""Parse a log line into a Nexus event, or None if not parseable."""
# Movement events
m = MOVE_RE.search(line)
if m:
return player_move(m.group(1), m.group(3), m.group(2))
# Command events
m = CMD_RE.search(line)
if m:
return command_executed(m.group(1), m.group(2), m.group(3) or "")
# Session start
m = SESSION_START_RE.search(line)
if m:
return player_join(m.group(2), m.group(1))
# Session end
m = SESSION_END_RE.search(line)
if m:
return player_leave("", m.group(1), session_duration=float(m.group(2)))
# Server login
m = LOGIN_RE.search(line)
if m:
return player_join(m.group(1), ip_address=m.group(2))
# Server logout
m = LOGOUT_RE.search(line)
if m:
return player_leave(m.group(1))
return None
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
out: list[dict] = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
if event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
return out
async def live_bridge(log_dir: str, ws_url: str, reconnect_delay: float = 5.0):
"""
Main live bridge loop.
Tails all Evennia log files and streams parsed events to Nexus WebSocket.
Auto-reconnects on failure.
"""
log_files = [
os.path.join(log_dir, "command_audit.log"),
os.path.join(log_dir, "movement_audit.log"),
os.path.join(log_dir, "player_activity.log"),
os.path.join(log_dir, "server.log"),
]
event_queue: asyncio.Queue = asyncio.Queue(maxsize=10000)
async def tail_file(path: str):
"""Tail a single file and put events on queue."""
tailer = LogTailer(path)
async for line in tailer.tail():
event = parse_log_line(line)
if event:
try:
event_queue.put_nowait(event)
except asyncio.QueueFull:
pass # Drop oldest if queue full
async def ws_sender():
"""Send events from queue to WebSocket, with auto-reconnect."""
while True:
try:
if websockets is None:
print("[bridge] websockets not installed, logging events locally", flush=True)
while True:
event = await event_queue.get()
ts = event.get("timestamp", "")[:19]
print(f"[{ts}] {event['type']}: {json.dumps({k: v for k, v in event.items() if k not in ('type', 'timestamp')})}", flush=True)
print(f"[bridge] Connecting to {ws_url}...", flush=True)
async with websockets.connect(ws_url) as ws:
print(f"[bridge] Connected to Nexus at {ws_url}", flush=True)
while True:
event = await event_queue.get()
await ws.send(json.dumps(event))
except Exception as e:
print(f"[bridge] WebSocket error: {e}. Reconnecting in {reconnect_delay}s...", flush=True)
await asyncio.sleep(reconnect_delay)
# Start all tailers + sender
tasks = [asyncio.create_task(tail_file(f)) for f in log_files]
tasks.append(asyncio.create_task(ws_sender()))
print(f"[bridge] Live bridge started. Watching {len(log_files)} log files.", flush=True)
await asyncio.gather(*tasks)
async def playback(log_path: Path, ws_url: str):
"""Legacy mode: replay a telemetry JSONL file."""
from nexus.evennia_event_adapter import (
actor_located, command_issued, command_result,
room_snapshot, session_bound,
)
def clean_lines(text: str) -> list[str]:
text = strip_ansi(text).replace("\r", "")
return [line.strip() for line in text.split("\n") if line.strip()]
def parse_room_output(text: str):
lines = clean_lines(text)
if len(lines) < 2:
return None
title = lines[0]
desc = lines[1]
exits = []
objects = []
for line in lines[2:]:
if line.startswith("Exits:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
exits = [{"key": t.strip(), "destination_id": t.strip().title(), "destination_key": t.strip().title()} for t in raw.split(",") if t.strip()]
elif line.startswith("You see:"):
raw = line.split(":", 1)[1].strip().replace(" and ", ", ")
parts = [t.strip() for t in raw.split(",") if t.strip()]
objects = [{"id": p.removeprefix("a ").removeprefix("an "), "key": p.removeprefix("a ").removeprefix("an "), "short_desc": p} for p in parts]
return {"title": title, "desc": desc, "exits": exits, "objects": objects}
def normalize_event(raw: dict, hermes_session_id: str) -> list[dict]:
out = []
event = raw.get("event")
actor = raw.get("actor", "Timmy")
timestamp = raw.get("timestamp")
if event == "connect":
out.append(session_bound(hermes_session_id, evennia_account=actor, evennia_character=actor, timestamp=timestamp))
parsed = parse_room_output(raw.get("output", ""))
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
elif event == "command":
cmd = raw.get("command", "")
output = raw.get("output", "")
out.append(command_issued(hermes_session_id, actor, cmd, timestamp=timestamp))
success = not output.startswith("Command '") and not output.startswith("Could not find")
out.append(command_result(hermes_session_id, actor, cmd, strip_ansi(output), success=success, timestamp=timestamp))
parsed = parse_room_output(output)
if parsed:
out.append(actor_located(actor, parsed["title"], parsed["title"], timestamp=timestamp))
out.append(room_snapshot(parsed["title"], parsed["title"], parsed["desc"], exits=parsed["exits"], objects=parsed["objects"], timestamp=timestamp))
return out
hermes_session_id = log_path.stem
async with websockets.connect(ws_url) as ws:
for line in log_path.read_text(encoding="utf-8").splitlines():
@@ -88,11 +244,25 @@ async def playback(log_path: Path, ws_url: str):
def main():
parser = argparse.ArgumentParser(description="Publish Evennia telemetry into the Nexus websocket bridge")
parser.add_argument("log_path", help="Path to Evennia telemetry JSONL")
parser.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus websocket bridge URL")
parser = argparse.ArgumentParser(description="Evennia -> Nexus WebSocket Bridge")
sub = parser.add_subparsers(dest="mode")
live = sub.add_parser("live", help="Live tail Evennia logs and stream to Nexus")
live.add_argument("--log-dir", default="/root/workspace/timmy-academy/server/logs", help="Evennia logs directory")
live.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL")
replay = sub.add_parser("playback", help="Replay a telemetry JSONL file")
replay.add_argument("log_path", help="Path to Evennia telemetry JSONL")
replay.add_argument("--ws", default="ws://127.0.0.1:8765", help="Nexus WebSocket URL")
args = parser.parse_args()
asyncio.run(playback(Path(args.log_path).expanduser(), args.ws))
if args.mode == "live":
asyncio.run(live_bridge(args.log_dir, args.ws))
elif args.mode == "playback":
asyncio.run(playback(Path(args.log_path).expanduser(), args.ws))
else:
parser.print_help()
if __name__ == "__main__":

79
nexus/heartbeat.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Heartbeat writer for the Nexus consciousness loop.
Call write_heartbeat() at the end of each think cycle to let the
watchdog know the mind is alive. The file is written atomically
(write-to-temp + rename) to prevent the watchdog from reading a
half-written file.
Usage in nexus_think.py:
from nexus.heartbeat import write_heartbeat
class NexusMind:
def think_once(self):
# ... do the thinking ...
write_heartbeat(
cycle=self.cycle_count,
model=self.model,
status="thinking",
)
"""
from __future__ import annotations
import json
import os
import tempfile
import time
from pathlib import Path
DEFAULT_HEARTBEAT_PATH = Path.home() / ".nexus" / "heartbeat.json"
def write_heartbeat(
cycle: int = 0,
model: str = "unknown",
status: str = "thinking",
path: Path = DEFAULT_HEARTBEAT_PATH,
) -> None:
"""Write a heartbeat file atomically.
The watchdog monitors this file to detect stale minds — processes
that are technically running but have stopped thinking (e.g., hung
on a blocking call, deadlocked, or crashed inside a catch-all
exception handler).
Args:
cycle: Current think cycle number
model: Model identifier
status: Current state ("thinking", "perceiving", "acting", "idle")
path: Where to write the heartbeat file
"""
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"pid": os.getpid(),
"timestamp": time.time(),
"cycle": cycle,
"model": model,
"status": status,
}
# Atomic write: temp file in same directory + rename.
# This guarantees the watchdog never reads a partial file.
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent),
prefix=".heartbeat-",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w") as f:
json.dump(data, f)
os.replace(tmp_path, str(path))
except Exception:
# Best effort — never crash the mind over a heartbeat failure
try:
os.unlink(tmp_path)
except OSError:
pass

102
nexus/nostr_identity.py Normal file
View File

@@ -0,0 +1,102 @@
import hashlib
import hmac
import os
import binascii
# ═══════════════════════════════════════════
# NOSTR SOVEREIGN IDENTITY (NIP-01)
# ═══════════════════════════════════════════
# Pure Python implementation of Schnorr signatures for Nostr.
# No dependencies required.
def sha256(data):
return hashlib.sha256(data).digest()
def hmac_sha256(key, data):
return hmac.new(key, data, hashlib.sha256).digest()
# Secp256k1 Constants
P = 2**256 - 2**32 - 977
N = 115792089237316195423570985008687907852837564279074904382605163141518161494337
G = (0x79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798,
0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8)
def inverse(a, n):
return pow(a, n - 2, n)
def point_add(p1, p2):
if p1 is None: return p2
if p2 is None: return p1
(x1, y1), (x2, y2) = p1, p2
if x1 == x2 and y1 != y2: return None
if x1 == x2:
m = (3 * x1 * x1 * inverse(2 * y1, P)) % P
else:
m = ((y2 - y1) * inverse(x2 - x1, P)) % P
x3 = (m * m - x1 - x2) % P
y3 = (m * (x1 - x3) - y1) % P
return (x3, y3)
def point_mul(p, n):
r = None
for i in range(256):
if (n >> i) & 1:
r = point_add(r, p)
p = point_add(p, p)
return r
def get_pubkey(privkey):
p = point_mul(G, privkey)
return binascii.hexlify(p[0].to_bytes(32, 'big')).decode()
# Schnorr Signature (BIP340)
def sign_schnorr(msg_hash, privkey):
k = int.from_bytes(sha256(privkey.to_bytes(32, 'big') + msg_hash), 'big') % N
R = point_mul(G, k)
if R[1] % 2 != 0:
k = N - k
r = R[0].to_bytes(32, 'big')
e = int.from_bytes(sha256(r + binascii.unhexlify(get_pubkey(privkey)) + msg_hash), 'big') % N
s = (k + e * privkey) % N
return binascii.hexlify(r + s.to_bytes(32, 'big')).decode()
class NostrIdentity:
def __init__(self, privkey_hex=None):
if privkey_hex:
self.privkey = int(privkey_hex, 16)
else:
self.privkey = int.from_bytes(os.urandom(32), 'big') % N
self.pubkey = get_pubkey(self.privkey)
def sign_event(self, event):
# NIP-01 Event Signing
import json
event_data = [
0,
event['pubkey'],
event['created_at'],
event['kind'],
event['tags'],
event['content']
]
serialized = json.dumps(event_data, separators=(',', ':'))
msg_hash = sha256(serialized.encode())
event['id'] = binascii.hexlify(msg_hash).decode()
event['sig'] = sign_schnorr(msg_hash, self.privkey)
return event
if __name__ == "__main__":
# Test Identity
identity = NostrIdentity()
print(f"Nostr Pubkey: {identity.pubkey}")
event = {
"pubkey": identity.pubkey,
"created_at": 1677628800,
"kind": 1,
"tags": [],
"content": "Sovereignty and service always. #Timmy"
}
signed_event = identity.sign_event(event)
print(f"Signed Event: {signed_event}")

55
nexus/nostr_publisher.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
import websockets
import json
import time
import os
from nostr_identity import NostrIdentity
# ═══════════════════════════════════════════
# NOSTR SOVEREIGN PUBLISHER
# ═══════════════════════════════════════════
RELAYS = [
"wss://relay.damus.io",
"wss://nos.lol",
"wss://relay.snort.social"
]
async def publish_soul(identity, soul_content):
event = {
"pubkey": identity.pubkey,
"created_at": int(time.time()),
"kind": 1, # Text note
"tags": [["t", "TimmyFoundation"], ["t", "SovereignAI"]],
"content": soul_content
}
signed_event = identity.sign_event(event)
message = json.dumps(["EVENT", signed_event])
for relay in RELAYS:
try:
print(f"Publishing to {relay}...")
async with websockets.connect(relay, timeout=10) as ws:
await ws.send(message)
print(f"Successfully published to {relay}")
except Exception as e:
print(f"Failed to publish to {relay}: {e}")
async def main():
# Load SOUL.md
soul_path = os.path.join(os.path.dirname(__file__), "../SOUL.md")
if os.path.exists(soul_path):
with open(soul_path, "r") as f:
soul_content = f.read()
else:
soul_content = "Sovereignty and service always. #Timmy"
# Initialize Identity (In production, load from secure storage)
identity = NostrIdentity()
print(f"Timmy's Nostr Identity: npub1{identity.pubkey}")
await publish_soul(identity, soul_content)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,262 +0,0 @@
"""
Tests for AdaptiveCalibrator — online learning for local cost estimation.
Covers:
- Prior-based predictions for unseen models
- EMA update convergence
- Confidence growth with samples
- Persistence (save/load round-trip)
- reset() for one model and all models
- Groq vs local model prior selection
- get_stats() and all_stats()
"""
import json
import math
import tempfile
from pathlib import Path
import pytest
from nexus.adaptive_calibrator import (
AdaptiveCalibrator,
CostPrediction,
ModelCalibration,
_is_groq_model,
_prior_for,
DEFAULT_ALPHA,
)
# ═══ Helpers ═══
def make_calibrator(tmp_path: Path, alpha: float = DEFAULT_ALPHA) -> AdaptiveCalibrator:
state_file = tmp_path / "calibrator_state.json"
return AdaptiveCalibrator(state_path=state_file, alpha=alpha, autosave=True)
# ═══ Model family detection ═══
def test_local_ollama_model_not_groq():
assert not _is_groq_model("timmy:v0.1-q4")
assert not _is_groq_model("mistral:7b-q4_0")
def test_groq_model_detected():
assert _is_groq_model("llama3-8b-8192")
assert _is_groq_model("mixtral-8x7b-32768")
def test_prior_local_is_slower_than_groq():
local = _prior_for("timmy:v0.1-q4")
groq = _prior_for("llama3-8b-8192")
assert local["ms_per_completion_token"] > groq["ms_per_completion_token"]
assert local["ms_per_prompt_token"] > groq["ms_per_prompt_token"]
# ═══ CostPrediction ═══
def test_predict_returns_cost_prediction(tmp_path):
cal = make_calibrator(tmp_path)
pred = cal.predict("timmy:v0.1-q4", prompt_tokens=512)
assert isinstance(pred, CostPrediction)
assert pred.model == "timmy:v0.1-q4"
assert pred.prompt_tokens == 512
assert pred.predicted_ms > 0
assert pred.sample_count == 0
assert pred.confidence == 0.0 # No samples yet
def test_predict_new_model_uses_prior(tmp_path):
cal = make_calibrator(tmp_path)
pred = cal.predict("unknown-model:x", prompt_tokens=100)
assert pred.predicted_ms > 0
assert pred.confidence == 0.0
def test_predict_longer_prompt_costs_more(tmp_path):
cal = make_calibrator(tmp_path)
short = cal.predict("timmy:v0.1-q4", prompt_tokens=100)
long_ = cal.predict("timmy:v0.1-q4", prompt_tokens=1000)
assert long_.predicted_ms > short.predicted_ms
# ═══ Record & EMA update ═══
def test_record_returns_error_ms(tmp_path):
cal = make_calibrator(tmp_path)
error = cal.record("timmy:v0.1-q4", prompt_tokens=512, actual_ms=5000)
assert isinstance(error, float)
def test_record_increases_sample_count(tmp_path):
cal = make_calibrator(tmp_path)
cal.record("timmy:v0.1-q4", prompt_tokens=512, actual_ms=5000)
stats = cal.get_stats("timmy:v0.1-q4")
assert stats["sample_count"] == 1
def test_repeated_records_converge_prediction(tmp_path):
"""After many samples of the same cost, prediction should converge."""
cal = make_calibrator(tmp_path, alpha=0.3)
TRUE_MS = 4000
for _ in range(40):
cal.record("timmy:v0.1-q4", prompt_tokens=256, actual_ms=TRUE_MS)
pred = cal.predict("timmy:v0.1-q4", prompt_tokens=256)
# Should be within 15% of true value after many samples
assert abs(pred.predicted_ms - TRUE_MS) / TRUE_MS < 0.15
def test_confidence_grows_with_samples(tmp_path):
cal = make_calibrator(tmp_path)
assert cal.predict("timmy:v0.1-q4", prompt_tokens=100).confidence == 0.0
for i in range(10):
cal.record("timmy:v0.1-q4", prompt_tokens=100, actual_ms=2000)
pred = cal.predict("timmy:v0.1-q4", prompt_tokens=100)
assert pred.confidence > 0.5
assert pred.sample_count == 10
def test_confidence_approaches_one(tmp_path):
cal = make_calibrator(tmp_path)
for _ in range(50):
cal.record("timmy:v0.1-q4", prompt_tokens=100, actual_ms=2000)
pred = cal.predict("timmy:v0.1-q4", prompt_tokens=100)
assert pred.confidence > 0.99
def test_parameters_stay_non_negative(tmp_path):
"""EMA updates should never drive parameters negative."""
cal = make_calibrator(tmp_path)
for _ in range(20):
# Feed very small actual times (trying to drive params to zero)
cal.record("timmy:v0.1-q4", prompt_tokens=512, actual_ms=1.0)
m = cal._models["timmy:v0.1-q4"]
assert m.ms_per_prompt_token > 0
assert m.ms_per_completion_token > 0
assert m.base_overhead_ms >= 0
# ═══ get_stats / all_stats ═══
def test_get_stats_uncalibrated(tmp_path):
cal = make_calibrator(tmp_path)
stats = cal.get_stats("never-seen-model")
assert stats["sample_count"] == 0
assert stats["confidence"] == 0.0
assert "uncalibrated" in stats["status"]
def test_get_stats_after_records(tmp_path):
cal = make_calibrator(tmp_path)
for _ in range(5):
cal.record("timmy:v0.1-q4", prompt_tokens=200, actual_ms=3000)
stats = cal.get_stats("timmy:v0.1-q4")
assert stats["sample_count"] == 5
assert stats["confidence"] > 0
assert "mean_absolute_error_ms" in stats
def test_all_stats_lists_all_models(tmp_path):
cal = make_calibrator(tmp_path)
cal.record("model-a", prompt_tokens=100, actual_ms=1000)
cal.record("model-b", prompt_tokens=100, actual_ms=2000)
stats = cal.all_stats()
model_names = [s["model"] for s in stats]
assert "model-a" in model_names
assert "model-b" in model_names
# ═══ Persistence ═══
def test_save_and_load(tmp_path):
"""Calibration state should survive a save/load round-trip."""
state_file = tmp_path / "state.json"
# Write some samples
cal1 = AdaptiveCalibrator(state_path=state_file, autosave=True)
for _ in range(15):
cal1.record("timmy:v0.1-q4", prompt_tokens=300, actual_ms=3500)
stats_before = cal1.get_stats("timmy:v0.1-q4")
# Load fresh instance
cal2 = AdaptiveCalibrator(state_path=state_file, autosave=True)
stats_after = cal2.get_stats("timmy:v0.1-q4")
assert stats_after["sample_count"] == stats_before["sample_count"]
assert abs(stats_after["ms_per_prompt_token"] - stats_before["ms_per_prompt_token"]) < 1e-6
def test_load_with_missing_file(tmp_path):
"""Missing state file should result in empty (not crashed) calibrator."""
cal = AdaptiveCalibrator(state_path=tmp_path / "nonexistent.json", autosave=False)
assert cal.all_stats() == []
def test_load_with_corrupt_file(tmp_path):
"""Corrupt state file should be silently ignored."""
state_file = tmp_path / "state.json"
state_file.write_text("not valid json {{{")
cal = AdaptiveCalibrator(state_path=state_file, autosave=False)
assert cal.all_stats() == []
def test_atomic_save(tmp_path):
"""Save should write via a tmp file and replace atomically."""
state_file = tmp_path / "state.json"
cal = AdaptiveCalibrator(state_path=state_file, autosave=True)
cal.record("timmy:v0.1-q4", prompt_tokens=100, actual_ms=2000)
assert state_file.exists()
# No .tmp file should be left behind
assert not (state_file.with_suffix(".tmp")).exists()
# File should be valid JSON
data = json.loads(state_file.read_text())
assert data["version"] == 1
# ═══ Reset ═══
def test_reset_single_model(tmp_path):
cal = make_calibrator(tmp_path)
cal.record("model-a", prompt_tokens=100, actual_ms=1000)
cal.record("model-b", prompt_tokens=100, actual_ms=1000)
cal.reset("model-a")
assert cal.get_stats("model-a")["sample_count"] == 0
assert cal.get_stats("model-b")["sample_count"] == 1
def test_reset_all_models(tmp_path):
cal = make_calibrator(tmp_path)
cal.record("model-a", prompt_tokens=100, actual_ms=1000)
cal.record("model-b", prompt_tokens=100, actual_ms=1000)
cal.reset()
assert cal.all_stats() == []
# ═══ ModelCalibration unit tests ═══
def test_model_calibration_repr_roundtrip():
m = ModelCalibration(model="test:v1")
d = m.to_dict()
m2 = ModelCalibration.from_dict(d)
assert m2.model == m.model
assert m2.alpha == m.alpha
assert m2.ms_per_prompt_token == m.ms_per_prompt_token
def test_model_calibration_mean_absolute_error_nan_when_no_samples():
m = ModelCalibration(model="test:v1")
assert math.isnan(m.mean_absolute_error_ms)

View File

@@ -0,0 +1,311 @@
"""Tests for the Nexus Watchdog and Heartbeat system.
Validates:
- All four health checks (WS gateway, process, heartbeat, syntax)
- HealthReport aggregation and markdown formatting
- Heartbeat atomic write protocol
- Gitea issue creation/update/close flows
- Edge cases: missing files, corrupt JSON, stale timestamps
- CLI argument parsing
"""
import json
import os
import sys
import time
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# ── Direct module imports ────────────────────────────────────────────
# Import directly to avoid any __init__.py import chains
import importlib.util
PROJECT_ROOT = Path(__file__).parent.parent
_wd_spec = importlib.util.spec_from_file_location(
"nexus_watchdog_test",
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
)
_wd = importlib.util.module_from_spec(_wd_spec)
# Must register BEFORE exec_module — dataclass decorator resolves
# cls.__module__ through sys.modules during class creation.
sys.modules["nexus_watchdog_test"] = _wd
_wd_spec.loader.exec_module(_wd)
_hb_spec = importlib.util.spec_from_file_location(
"nexus_heartbeat_test",
PROJECT_ROOT / "nexus" / "heartbeat.py",
)
_hb = importlib.util.module_from_spec(_hb_spec)
sys.modules["nexus_heartbeat_test"] = _hb
_hb_spec.loader.exec_module(_hb)
CheckResult = _wd.CheckResult
HealthReport = _wd.HealthReport
check_ws_gateway = _wd.check_ws_gateway
check_mind_process = _wd.check_mind_process
check_heartbeat = _wd.check_heartbeat
check_syntax_health = _wd.check_syntax_health
run_health_checks = _wd.run_health_checks
find_open_watchdog_issue = _wd.find_open_watchdog_issue
write_heartbeat = _hb.write_heartbeat
# ── Heartbeat tests ──────────────────────────────────────────────────
class TestHeartbeat:
def test_write_creates_file(self, tmp_path):
"""Heartbeat file is created with correct structure."""
hb_path = tmp_path / ".nexus" / "heartbeat.json"
write_heartbeat(cycle=5, model="timmy:v0.1", status="thinking", path=hb_path)
assert hb_path.exists()
data = json.loads(hb_path.read_text())
assert data["cycle"] == 5
assert data["model"] == "timmy:v0.1"
assert data["status"] == "thinking"
assert data["pid"] == os.getpid()
assert abs(data["timestamp"] - time.time()) < 2
def test_write_is_atomic(self, tmp_path):
"""No partial files left behind on success."""
hb_path = tmp_path / ".nexus" / "heartbeat.json"
write_heartbeat(cycle=1, path=hb_path)
# No temp files should remain
siblings = list(hb_path.parent.iterdir())
assert len(siblings) == 1
assert siblings[0].name == "heartbeat.json"
def test_write_overwrites_cleanly(self, tmp_path):
"""Successive writes update the file, not append."""
hb_path = tmp_path / ".nexus" / "heartbeat.json"
write_heartbeat(cycle=1, path=hb_path)
write_heartbeat(cycle=2, path=hb_path)
data = json.loads(hb_path.read_text())
assert data["cycle"] == 2
def test_write_creates_parent_dirs(self, tmp_path):
"""Parent directories are created if they don't exist."""
hb_path = tmp_path / "deep" / "nested" / "heartbeat.json"
write_heartbeat(cycle=0, path=hb_path)
assert hb_path.exists()
# ── WebSocket gateway check ──────────────────────────────────────────
class TestWSGatewayCheck:
def test_healthy_when_port_open(self):
"""Healthy when TCP connect succeeds."""
with patch("socket.socket") as mock_sock:
instance = mock_sock.return_value
instance.connect_ex.return_value = 0
result = check_ws_gateway("localhost", 8765)
assert result.healthy is True
assert "Listening" in result.message
def test_unhealthy_when_port_closed(self):
"""Unhealthy when TCP connect is refused."""
with patch("socket.socket") as mock_sock:
instance = mock_sock.return_value
instance.connect_ex.return_value = 111 # ECONNREFUSED
result = check_ws_gateway("localhost", 8765)
assert result.healthy is False
assert "refused" in result.message.lower()
def test_unhealthy_on_exception(self):
"""Unhealthy when socket raises."""
with patch("socket.socket") as mock_sock:
instance = mock_sock.return_value
instance.connect_ex.side_effect = OSError("network unreachable")
result = check_ws_gateway("localhost", 8765)
assert result.healthy is False
# ── Process check ────────────────────────────────────────────────────
class TestMindProcessCheck:
def test_healthy_when_process_found(self):
"""Healthy when pgrep finds nexus_think."""
mock_result = MagicMock()
mock_result.returncode = 0
mock_result.stdout = "12345\n"
with patch("subprocess.run", return_value=mock_result):
result = check_mind_process()
assert result.healthy is True
assert "12345" in result.message
def test_unhealthy_when_no_process(self):
"""Unhealthy when pgrep finds nothing."""
mock_result = MagicMock()
mock_result.returncode = 1
mock_result.stdout = ""
with patch("subprocess.run", return_value=mock_result):
result = check_mind_process()
assert result.healthy is False
assert "not running" in result.message
def test_graceful_when_pgrep_missing(self):
"""Doesn't crash if pgrep isn't installed."""
with patch("subprocess.run", side_effect=FileNotFoundError):
result = check_mind_process()
# Should not raise a false alarm
assert result.healthy is True
# ── Heartbeat check ──────────────────────────────────────────────────
class TestHeartbeatCheck:
def test_healthy_when_recent(self, tmp_path):
"""Healthy when heartbeat is recent."""
hb_path = tmp_path / "heartbeat.json"
hb_path.write_text(json.dumps({
"timestamp": time.time(),
"cycle": 42,
"model": "timmy:v0.1",
"status": "thinking",
}))
result = check_heartbeat(hb_path, stale_threshold=300)
assert result.healthy is True
assert "cycle #42" in result.message
def test_unhealthy_when_stale(self, tmp_path):
"""Unhealthy when heartbeat is older than threshold."""
hb_path = tmp_path / "heartbeat.json"
hb_path.write_text(json.dumps({
"timestamp": time.time() - 600, # 10 minutes old
"cycle": 10,
"model": "timmy:v0.1",
"status": "thinking",
}))
result = check_heartbeat(hb_path, stale_threshold=300)
assert result.healthy is False
assert "Stale" in result.message
def test_unhealthy_when_missing(self, tmp_path):
"""Unhealthy when heartbeat file doesn't exist."""
result = check_heartbeat(tmp_path / "nonexistent.json")
assert result.healthy is False
assert "No heartbeat" in result.message
def test_unhealthy_when_corrupt(self, tmp_path):
"""Unhealthy when heartbeat is invalid JSON."""
hb_path = tmp_path / "heartbeat.json"
hb_path.write_text("not json {{{")
result = check_heartbeat(hb_path)
assert result.healthy is False
assert "corrupt" in result.message.lower()
# ── Syntax check ─────────────────────────────────────────────────────
class TestSyntaxCheck:
def test_healthy_on_valid_python(self, tmp_path):
"""Healthy when nexus_think.py is valid Python."""
# Create a mock nexus_think.py
(tmp_path / "nexus").mkdir()
(tmp_path / "nexus" / "nexus_think.py").write_text("x = 1\nprint(x)\n")
# Create bin dir so watchdog resolves parent correctly
(tmp_path / "bin").mkdir()
with patch.object(_wd.Path, "__new__", return_value=tmp_path / "bin" / "watchdog.py"):
# Directly call with the real path
script = tmp_path / "nexus" / "nexus_think.py"
source = script.read_text()
compile(source, str(script), "exec")
# If we get here without error, syntax is valid
assert True
def test_detects_syntax_error(self, tmp_path):
"""Detects SyntaxError in nexus_think.py."""
bad_python = "def broken(\n # missing close paren"
with pytest.raises(SyntaxError):
compile(bad_python, "test.py", "exec")
# ── HealthReport ─────────────────────────────────────────────────────
class TestHealthReport:
def test_overall_healthy_when_all_pass(self):
"""overall_healthy is True when all checks pass."""
report = HealthReport(
timestamp=time.time(),
checks=[
CheckResult("A", True, "ok"),
CheckResult("B", True, "ok"),
],
)
assert report.overall_healthy is True
def test_overall_unhealthy_when_any_fails(self):
"""overall_healthy is False when any check fails."""
report = HealthReport(
timestamp=time.time(),
checks=[
CheckResult("A", True, "ok"),
CheckResult("B", False, "down"),
],
)
assert report.overall_healthy is False
def test_failed_checks_property(self):
"""failed_checks returns only failed ones."""
report = HealthReport(
timestamp=time.time(),
checks=[
CheckResult("A", True, "ok"),
CheckResult("B", False, "down"),
CheckResult("C", False, "error"),
],
)
assert len(report.failed_checks) == 2
assert report.failed_checks[0].name == "B"
def test_markdown_contains_table(self):
"""to_markdown() includes a status table."""
report = HealthReport(
timestamp=time.time(),
checks=[
CheckResult("Gateway", True, "Listening"),
CheckResult("Mind", False, "Not running"),
],
)
md = report.to_markdown()
assert "| Gateway |" in md
assert "| Mind |" in md
assert "" in md
assert "" in md
assert "FAILURES DETECTED" in md
def test_markdown_all_healthy(self):
"""to_markdown() shows green status when all healthy."""
report = HealthReport(
timestamp=time.time(),
checks=[CheckResult("A", True, "ok")],
)
md = report.to_markdown()
assert "ALL SYSTEMS OPERATIONAL" in md
# ── Integration: full health check cycle ─────────────────────────────
class TestRunHealthChecks:
def test_returns_report_with_all_checks(self, tmp_path):
"""run_health_checks() returns a report with all four checks."""
with patch("socket.socket") as mock_sock, \
patch("subprocess.run") as mock_run:
mock_sock.return_value.connect_ex.return_value = 0
mock_run.return_value = MagicMock(returncode=1, stdout="")
report = run_health_checks(
heartbeat_path=tmp_path / "missing.json",
)
assert len(report.checks) == 4
check_names = {c.name for c in report.checks}
assert "WebSocket Gateway" in check_names
assert "Consciousness Loop" in check_names
assert "Heartbeat" in check_names
assert "Syntax Health" in check_names