Compare commits
7 Commits
groq/issue
...
groq/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b445c04037 | ||
| 60bd9a05ff | |||
| c7468a3c6a | |||
| 07a4be3bb9 | |||
| 804536a3f2 | |||
|
|
a0ee7858ff | ||
| 34ec13bc29 |
21
.gitea/workflows/review_gate.yml
Normal file
21
.gitea/workflows/review_gate.yml
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Review Approval Gate
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-review:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify PR has approving review
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
PR_NUMBER: ${{ gitea.event.pull_request.number }}
|
||||
run: |
|
||||
python3 scripts/review_gate.py
|
||||
20
.gitea/workflows/staging_gate.yml
Normal file
20
.gitea/workflows/staging_gate.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: Staging Verification Gate
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-staging:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify staging label on merge PR
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
run: |
|
||||
python3 scripts/staging_gate.py
|
||||
28
.gitea/workflows/weekly-audit.yml
Normal file
28
.gitea/workflows/weekly-audit.yml
Normal file
@@ -0,0 +1,28 @@
|
||||
name: Weekly Privacy Audit
|
||||
|
||||
# Runs every Monday at 05:00 UTC against a CI test fixture.
|
||||
# On production wizards this same script should be run via cron:
|
||||
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
|
||||
#
|
||||
# Refs: #1083, #1075
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 5 * * 1" # Monday 05:00 UTC
|
||||
workflow_dispatch: {} # allow manual trigger
|
||||
|
||||
jobs:
|
||||
privacy-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Run privacy audit against CI fixture
|
||||
run: |
|
||||
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
|
||||
326
bin/bezalel_heartbeat_check.py
Executable file
326
bin/bezalel_heartbeat_check.py
Executable file
@@ -0,0 +1,326 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bezalel Meta-Heartbeat Checker — stale cron detection (poka-yoke #1096)
|
||||
|
||||
Monitors all cron job heartbeat files and alerts P1 when any job has been
|
||||
silent for more than 2× its declared interval.
|
||||
|
||||
POKA-YOKE design:
|
||||
Prevention — cron-heartbeat-write.sh writes a .last file atomically after
|
||||
every successful cron job completion, stamping its interval.
|
||||
Detection — this script runs every 15 minutes (via systemd timer) and
|
||||
raises P1 on stderr + writes an alert file for any stale job.
|
||||
Correction — alerts are loud enough (P1 stderr + alert files) for
|
||||
monitoring/humans to intervene before the next run window.
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs.
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot check (default dir)
|
||||
python bin/bezalel_heartbeat_check.py
|
||||
|
||||
# Override heartbeat dir
|
||||
python bin/bezalel_heartbeat_check.py --heartbeat-dir /tmp/test-beats
|
||||
|
||||
# Dry-run (check + report, don't write alert files)
|
||||
python bin/bezalel_heartbeat_check.py --dry-run
|
||||
|
||||
# JSON output (for piping into other tools)
|
||||
python bin/bezalel_heartbeat_check.py --json
|
||||
|
||||
EXIT CODES
|
||||
==========
|
||||
0 — all jobs healthy (or no .last files found yet)
|
||||
1 — one or more stale beats detected
|
||||
2 — heartbeat dir unreadable
|
||||
|
||||
IMPORTABLE API
|
||||
==============
|
||||
from bin.bezalel_heartbeat_check import check_cron_heartbeats
|
||||
|
||||
result = check_cron_heartbeats("/var/run/bezalel/heartbeats")
|
||||
# Returns dict with keys: checked_at, jobs, stale_count, healthy_count
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.heartbeat")
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_HEARTBEAT_DIR = "/var/run/bezalel/heartbeats"
|
||||
|
||||
|
||||
# ── Core checker ─────────────────────────────────────────────────────
|
||||
|
||||
def check_cron_heartbeats(heartbeat_dir: str = DEFAULT_HEARTBEAT_DIR) -> Dict[str, Any]:
|
||||
"""
|
||||
Scan all .last files in heartbeat_dir and determine which jobs are stale.
|
||||
|
||||
Returns a dict:
|
||||
{
|
||||
"checked_at": "<ISO 8601 timestamp>",
|
||||
"jobs": [
|
||||
{
|
||||
"job": str,
|
||||
"healthy": bool,
|
||||
"age_secs": float,
|
||||
"interval": int,
|
||||
"last_seen": str or None, # ISO timestamp of last heartbeat
|
||||
"message": str,
|
||||
},
|
||||
...
|
||||
],
|
||||
"stale_count": int,
|
||||
"healthy_count": int,
|
||||
}
|
||||
|
||||
On empty dir (no .last files), returns jobs=[] with stale_count=0.
|
||||
On corrupt .last file, reports that job as stale with an error message.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
now_ts = time.time()
|
||||
checked_at = datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat()
|
||||
|
||||
hb_path = Path(heartbeat_dir)
|
||||
jobs: List[Dict[str, Any]] = []
|
||||
|
||||
if not hb_path.exists():
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": [],
|
||||
"stale_count": 0,
|
||||
"healthy_count": 0,
|
||||
}
|
||||
|
||||
last_files = sorted(hb_path.glob("*.last"))
|
||||
|
||||
for last_file in last_files:
|
||||
job_name = last_file.stem # filename without .last extension
|
||||
|
||||
# Read and parse the heartbeat file
|
||||
try:
|
||||
raw = last_file.read_text(encoding="utf-8")
|
||||
data = json.loads(raw)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": False,
|
||||
"age_secs": float("inf"),
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": f"CORRUPT: cannot read/parse heartbeat file: {exc}",
|
||||
})
|
||||
continue
|
||||
|
||||
# Extract fields with safe defaults
|
||||
beat_timestamp = float(data.get("timestamp", 0))
|
||||
interval = int(data.get("interval", 3600))
|
||||
pid = data.get("pid", "?")
|
||||
|
||||
age_secs = now_ts - beat_timestamp
|
||||
|
||||
# Convert beat_timestamp to a readable ISO string
|
||||
try:
|
||||
last_seen = datetime.fromtimestamp(beat_timestamp, tz=timezone.utc).isoformat()
|
||||
except (OSError, OverflowError, ValueError):
|
||||
last_seen = None
|
||||
|
||||
# Stale = silent for more than 2× the declared interval
|
||||
threshold = 2 * interval
|
||||
is_stale = age_secs > threshold
|
||||
|
||||
if is_stale:
|
||||
message = (
|
||||
f"STALE (last {age_secs:.0f}s ago, interval {interval}s"
|
||||
f" — exceeds 2x threshold of {threshold}s)"
|
||||
)
|
||||
else:
|
||||
message = f"OK (last {age_secs:.0f}s ago, interval {interval}s)"
|
||||
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": not is_stale,
|
||||
"age_secs": age_secs,
|
||||
"interval": interval,
|
||||
"last_seen": last_seen,
|
||||
"message": message,
|
||||
})
|
||||
|
||||
stale_count = sum(1 for j in jobs if not j["healthy"])
|
||||
healthy_count = sum(1 for j in jobs if j["healthy"])
|
||||
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": jobs,
|
||||
"stale_count": stale_count,
|
||||
"healthy_count": healthy_count,
|
||||
}
|
||||
|
||||
|
||||
# ── Alert file writer ────────────────────────────────────────────────
|
||||
|
||||
def write_alert(heartbeat_dir: str, job_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write an alert file for a stale job to <heartbeat_dir>/alerts/<job>.alert
|
||||
|
||||
Alert files are watched by external monitoring. They persist until the
|
||||
job runs again and clears stale status on the next check cycle.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
alerts_dir = Path(heartbeat_dir) / "alerts"
|
||||
try:
|
||||
alerts_dir.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as exc:
|
||||
logger.warning("Cannot create alerts dir %s: %s", alerts_dir, exc)
|
||||
return
|
||||
|
||||
alert_file = alerts_dir / f"{job_info['job']}.alert"
|
||||
now_str = datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
content = {
|
||||
"alert_level": "P1",
|
||||
"job": job_info["job"],
|
||||
"message": job_info["message"],
|
||||
"age_secs": job_info["age_secs"],
|
||||
"interval": job_info["interval"],
|
||||
"last_seen": job_info["last_seen"],
|
||||
"detected_at": now_str,
|
||||
}
|
||||
|
||||
# Atomic write via temp + rename (same poka-yoke pattern as the writer)
|
||||
tmp_file = alert_file.with_suffix(f".alert.tmp.{os.getpid()}")
|
||||
try:
|
||||
tmp_file.write_text(json.dumps(content, indent=2), encoding="utf-8")
|
||||
tmp_file.rename(alert_file)
|
||||
except OSError as exc:
|
||||
logger.warning("Failed to write alert file %s: %s", alert_file, exc)
|
||||
tmp_file.unlink(missing_ok=True)
|
||||
|
||||
|
||||
# ── Main runner ──────────────────────────────────────────────────────
|
||||
|
||||
def run_check(heartbeat_dir: str, dry_run: bool = False, output_json: bool = False) -> int:
|
||||
"""
|
||||
Run a full heartbeat check cycle. Returns exit code (0/1/2).
|
||||
|
||||
Exit codes:
|
||||
0 — all healthy (or no .last files found yet)
|
||||
1 — stale beats detected
|
||||
2 — heartbeat dir unreadable (permissions, etc.)
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
hb_path = Path(heartbeat_dir)
|
||||
|
||||
# Check if dir exists but is unreadable (permissions)
|
||||
if hb_path.exists() and not os.access(heartbeat_dir, os.R_OK):
|
||||
logger.error("Heartbeat dir unreadable: %s", heartbeat_dir)
|
||||
return 2
|
||||
|
||||
result = check_cron_heartbeats(heartbeat_dir)
|
||||
|
||||
if output_json:
|
||||
print(json.dumps(result, indent=2))
|
||||
return 1 if result["stale_count"] > 0 else 0
|
||||
|
||||
# Human-readable output
|
||||
if not result["jobs"]:
|
||||
logger.warning(
|
||||
"No .last files found in %s — bezalel not yet provisioned or no jobs registered.",
|
||||
heartbeat_dir,
|
||||
)
|
||||
return 0
|
||||
|
||||
for job in result["jobs"]:
|
||||
if job["healthy"]:
|
||||
logger.info(" + %s: %s", job["job"], job["message"])
|
||||
else:
|
||||
logger.error(" - %s: %s", job["job"], job["message"])
|
||||
|
||||
if result["stale_count"] > 0:
|
||||
for job in result["jobs"]:
|
||||
if not job["healthy"]:
|
||||
# P1 alert to stderr
|
||||
print(
|
||||
f"[P1-ALERT] STALE CRON JOB: {job['job']} — {job['message']}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if not dry_run:
|
||||
write_alert(heartbeat_dir, job)
|
||||
else:
|
||||
logger.info("DRY RUN — would write alert for stale job: %s", job["job"])
|
||||
|
||||
logger.error(
|
||||
"Heartbeat check FAILED: %d stale, %d healthy",
|
||||
result["stale_count"],
|
||||
result["healthy_count"],
|
||||
)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Heartbeat check PASSED: %d healthy, %d stale",
|
||||
result["healthy_count"],
|
||||
result["stale_count"],
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
# ── CLI entrypoint ───────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Bezalel Meta-Heartbeat Checker — detect silent cron failures (poka-yoke #1096)"
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-dir",
|
||||
default=DEFAULT_HEARTBEAT_DIR,
|
||||
help=f"Directory containing .last heartbeat files (default: {DEFAULT_HEARTBEAT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Check and report but do not write alert files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json",
|
||||
action="store_true",
|
||||
dest="output_json",
|
||||
help="Output results as JSON (for integration with other tools)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
exit_code = run_check(
|
||||
heartbeat_dir=args.heartbeat_dir,
|
||||
dry_run=args.dry_run,
|
||||
output_json=args.output_json,
|
||||
)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
246
docs/bezalel/evennia/cmd_palace.py
Normal file
246
docs/bezalel/evennia/cmd_palace.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""
|
||||
Palace commands — bridge Evennia to the local MemPalace memory system.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object, search_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
def _search_mempalace(query, wing=None, room=None, n=5, fleet=False):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _get_wing(caller):
|
||||
"""Return the caller's wing, defaulting to their key or 'general'."""
|
||||
return caller.db.wing if caller.attributes.has("wing") else (caller.key.lower() if caller.key else "general")
|
||||
|
||||
|
||||
class CmdPalaceSearch(Command):
|
||||
"""
|
||||
Search your memory palace.
|
||||
|
||||
Usage:
|
||||
palace/search <query>
|
||||
palace/search <query> [--room <room>]
|
||||
palace/recall <topic>
|
||||
palace/file <name> = <content>
|
||||
palace/status
|
||||
"""
|
||||
|
||||
key = "palace"
|
||||
aliases = ["pal"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: palace/search <query> | palace/recall <topic> | palace/file <name> = <content> | palace/status")
|
||||
return
|
||||
|
||||
parts = self.args.strip().split(" ", 1)
|
||||
subcmd = parts[0].lower()
|
||||
rest = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
if subcmd == "search":
|
||||
self._do_search(rest)
|
||||
elif subcmd == "recall":
|
||||
self._do_recall(rest)
|
||||
elif subcmd == "file":
|
||||
self._do_file(rest)
|
||||
elif subcmd == "status":
|
||||
self._do_status()
|
||||
else:
|
||||
self._do_search(self.args.strip())
|
||||
|
||||
def _do_search(self, query):
|
||||
if not query:
|
||||
self.caller.msg("Search for what?")
|
||||
return
|
||||
self.caller.msg(f"Searching the palace for: |c{query}|n...")
|
||||
wing = _get_wing(self.caller)
|
||||
results = _search_mempalace(query, wing=wing)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
lines.append(f"\n|g[{i}]|n |c{room}|n — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
def _do_recall(self, topic):
|
||||
if not topic:
|
||||
self.caller.msg("Recall what topic?")
|
||||
return
|
||||
results = _search_mempalace(topic, wing=_get_wing(self.caller), n=1)
|
||||
if not results:
|
||||
self.caller.msg("Nothing to recall.")
|
||||
return
|
||||
|
||||
r = results[0]
|
||||
content = r.get("content", "")
|
||||
source = r.get("source", "unknown")
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{topic}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = source
|
||||
obj.db.room_name = r.get("room", "general")
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() conjure() a memory shard from the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_file(self, rest):
|
||||
if "=" not in rest:
|
||||
self.caller.msg("Usage: palace/file <name> = <content>")
|
||||
return
|
||||
name, content = rest.split("=", 1)
|
||||
name = name.strip()
|
||||
content = content.strip()
|
||||
if not name or not content:
|
||||
self.caller.msg("Both name and content are required.")
|
||||
return
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{name}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = f"filed by {self.caller.key}"
|
||||
obj.db.room_name = self.caller.location.key if self.caller.location else "general"
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() file() a new memory in the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_status(self):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/mempalace",
|
||||
"--palace", "/root/wizards/bezalel/.mempalace/palace",
|
||||
"status"
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
self.caller.msg(result.stdout or result.stderr)
|
||||
except Exception as e:
|
||||
self.caller.msg(f"Could not reach the palace: {e}")
|
||||
|
||||
|
||||
class CmdRecall(Command):
|
||||
"""
|
||||
Recall a memory from the palace.
|
||||
|
||||
Usage:
|
||||
recall <query>
|
||||
recall <query> --fleet
|
||||
recall <query> --room <room>
|
||||
"""
|
||||
|
||||
key = "recall"
|
||||
aliases = ["remember", "mem"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Recall what? Usage: recall <query> [--fleet] [--room <room>]")
|
||||
return
|
||||
|
||||
args = self.args.strip()
|
||||
fleet = "--fleet" in args
|
||||
room = None
|
||||
|
||||
if "--room" in args:
|
||||
parts = args.split("--room")
|
||||
args = parts[0].strip()
|
||||
room = parts[1].strip().split()[0] if len(parts) > 1 else None
|
||||
|
||||
if "--fleet" in args:
|
||||
args = args.replace("--fleet", "").strip()
|
||||
|
||||
self.caller.msg(f"Recalling from the {'fleet' if fleet else 'personal'} palace: |c{args}|n...")
|
||||
|
||||
wing = None if fleet else _get_wing(self.caller)
|
||||
results = _search_mempalace(args, wing=wing, room=room, n=5, fleet=fleet)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room_name = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
wing_label = r.get("wing", "unknown")
|
||||
wing_tag = f" |y[{wing_label}]|n" if fleet else ""
|
||||
lines.append(f"\n|g[{i}]|n |c{room_name}|n{wing_tag} — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
|
||||
class CmdEnterRoom(Command):
|
||||
"""
|
||||
Enter a room in the mind palace by topic.
|
||||
|
||||
Usage:
|
||||
enter room <topic>
|
||||
"""
|
||||
|
||||
key = "enter room"
|
||||
aliases = ["enter palace", "go room"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Enter which room? Usage: enter room <topic>")
|
||||
return
|
||||
|
||||
topic = self.args.strip().lower().replace(" ", "-")
|
||||
wing = _get_wing(self.caller)
|
||||
room_key = f"palace:{wing}:{topic}"
|
||||
|
||||
# Search for existing room
|
||||
rooms = search_object(room_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if rooms:
|
||||
room = rooms[0]
|
||||
else:
|
||||
# Create the room dynamically
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
room = create_object(
|
||||
PalaceRoom,
|
||||
key=room_key,
|
||||
)
|
||||
room.db.memory_topic = topic
|
||||
room.db.wing = wing
|
||||
room.update_description()
|
||||
|
||||
self.caller.move_to(room, move_type="teleport")
|
||||
self.caller.msg(f"You step into the |c{topic}|n room of your mind palace.")
|
||||
166
docs/bezalel/evennia/cmd_record.py
Normal file
166
docs/bezalel/evennia/cmd_record.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
Live memory commands — write new memories into the palace from Evennia.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
ADDER_SCRIPT = "/root/wizards/bezalel/evennia/palace_add.py"
|
||||
|
||||
|
||||
def _add_drawer(content, wing, room, source):
|
||||
"""Add a verbatim drawer to the palace via the helper script."""
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
ADDER_SCRIPT,
|
||||
content,
|
||||
wing,
|
||||
room,
|
||||
source,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
return result.returncode == 0 and "OK" in result.stdout
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class CmdRecord(Command):
|
||||
"""
|
||||
Record a decision into the palace hall_facts.
|
||||
|
||||
Usage:
|
||||
record <text>
|
||||
record We decided to use PostgreSQL over MySQL.
|
||||
"""
|
||||
|
||||
key = "record"
|
||||
aliases = ["decide"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Record what decision? Usage: record <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"DECISION ({wing}): {text}\nRecorded by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() record() a decision in the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdNote(Command):
|
||||
"""
|
||||
Note a breakthrough into the palace hall_discoveries.
|
||||
|
||||
Usage:
|
||||
note <text>
|
||||
note The GraphQL schema can be auto-generated from our typeclasses.
|
||||
"""
|
||||
|
||||
key = "note"
|
||||
aliases = ["jot"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Note what? Usage: note <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"BREAKTHROUGH ({wing}): {text}\nNoted by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() inscribe() a breakthrough into the palace scrolls.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdEvent(Command):
|
||||
"""
|
||||
Log an event into the palace hall_events.
|
||||
|
||||
Usage:
|
||||
event <text>
|
||||
event Gitea runner came back online after being offline for 6 hours.
|
||||
"""
|
||||
|
||||
key = "event"
|
||||
aliases = ["log"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Log what event? Usage: event <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"EVENT ({wing}): {text}\nLogged by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() chronicle() an event in the palace records.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdPalaceWrite(Command):
|
||||
"""
|
||||
Directly write a memory into a specific palace room.
|
||||
|
||||
Usage:
|
||||
palace/write <room> = <text>
|
||||
"""
|
||||
|
||||
key = "palace/write"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if "=" not in self.args:
|
||||
self.caller.msg("Usage: palace/write <room> = <text>")
|
||||
return
|
||||
|
||||
room, text = self.args.split("=", 1)
|
||||
room = room.strip()
|
||||
text = text.strip()
|
||||
|
||||
if not room or not text:
|
||||
self.caller.msg("Both room and text are required.")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
full_text = f"MEMORY ({wing}/{room}): {text}\nWritten by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, room, f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() etch() a memory into the |c{room}|n room of the palace.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
105
docs/bezalel/evennia/cmd_steward.py
Normal file
105
docs/bezalel/evennia/cmd_steward.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""
|
||||
Steward commands — ask a palace steward about memories.
|
||||
"""
|
||||
|
||||
from evennia.commands.command import Command
|
||||
from evennia import search_object
|
||||
|
||||
|
||||
class CmdAskSteward(Command):
|
||||
"""
|
||||
Ask a steward NPC about a topic from the palace memory.
|
||||
|
||||
Usage:
|
||||
ask <steward> about <topic>
|
||||
ask <steward> about <topic> --fleet
|
||||
|
||||
Example:
|
||||
ask bezalel-steward about nightly watch
|
||||
ask bezalel-steward about runner outage --fleet
|
||||
"""
|
||||
|
||||
key = "ask"
|
||||
aliases = ["question"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def parse(self):
|
||||
"""Parse 'ask <target> about <topic>' syntax."""
|
||||
raw = self.args.strip()
|
||||
fleet = "--fleet" in raw
|
||||
if fleet:
|
||||
raw = raw.replace("--fleet", "").strip()
|
||||
|
||||
if " about " in raw.lower():
|
||||
parts = raw.split(" about ", 1)
|
||||
self.target_name = parts[0].strip()
|
||||
self.topic = parts[1].strip()
|
||||
else:
|
||||
self.target_name = ""
|
||||
self.topic = raw
|
||||
self.fleet = fleet
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: ask <steward> about <topic> [--fleet]")
|
||||
return
|
||||
|
||||
self.parse()
|
||||
|
||||
if not self.target_name:
|
||||
self.caller.msg("Ask whom? Usage: ask <steward> about <topic>")
|
||||
return
|
||||
|
||||
# Find steward NPC in current room
|
||||
stewards = [
|
||||
obj for obj in self.caller.location.contents
|
||||
if hasattr(obj, "respond_to_question")
|
||||
and self.target_name.lower() in obj.key.lower()
|
||||
]
|
||||
|
||||
if not stewards:
|
||||
self.caller.msg(f"There is no steward here matching '{self.target_name}'.")
|
||||
return
|
||||
|
||||
steward = stewards[0]
|
||||
self.caller.msg(f"You ask |c{steward.key}|n about '{self.topic}'...")
|
||||
steward.respond_to_question(self.topic, self.caller, fleet=self.fleet)
|
||||
|
||||
|
||||
class CmdSummonSteward(Command):
|
||||
"""
|
||||
Summon your wing's steward NPC to your current location.
|
||||
|
||||
Usage:
|
||||
summon steward
|
||||
"""
|
||||
|
||||
key = "summon steward"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
steward_key = f"{wing}-steward"
|
||||
|
||||
# Search for existing steward
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"A shimmer of light coalesces into |c{steward.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
steward = StewardNPC.create(steward_key)[0]
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = self.caller.key
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"You call forth |c{steward.key}|n from the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Hall of Wings — Builds the central MemPalace zone in Evennia.
|
||||
|
||||
Usage (from Evennia shell or script):
|
||||
from world.hall_of_wings import build_hall_of_wings
|
||||
build_hall_of_wings()
|
||||
"""
|
||||
|
||||
from evennia import create_object
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
from typeclasses.rooms import Room
|
||||
from typeclasses.exits import Exit
|
||||
|
||||
HALL_KEY = "hall_of_wings"
|
||||
HALL_NAME = "Hall of Wings"
|
||||
|
||||
DEFAULT_WINGS = [
|
||||
"bezalel",
|
||||
"timmy",
|
||||
"allegro",
|
||||
"ezra",
|
||||
]
|
||||
|
||||
|
||||
def build_hall_of_wings():
|
||||
"""Create or update the central Hall of Wings and attach steward chambers."""
|
||||
# Find or create the hall
|
||||
from evennia import search_object
|
||||
halls = search_object(HALL_KEY, typeclass="typeclasses.rooms.Room")
|
||||
if halls:
|
||||
hall = halls[0]
|
||||
else:
|
||||
hall = create_object(Room, key=HALL_KEY)
|
||||
hall.db.desc = (
|
||||
"|cThe Hall of Wings|n\n"
|
||||
"A vast circular chamber of pale stone and shifting starlight.\n"
|
||||
"Arched doorways line the perimeter, each leading to a steward's chamber.\n"
|
||||
"Here, the memories of the fleet converge.\n\n"
|
||||
"Use |wsummon steward|n to call your wing's steward, or\n"
|
||||
"|wask <steward> about <topic>|n to query the palace archives."
|
||||
)
|
||||
|
||||
for wing in DEFAULT_WINGS:
|
||||
chamber_key = f"chamber:{wing}"
|
||||
chambers = search_object(chamber_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if chambers:
|
||||
chamber = chambers[0]
|
||||
else:
|
||||
chamber = create_object(PalaceRoom, key=chamber_key)
|
||||
chamber.db.memory_topic = wing
|
||||
chamber.db.wing = wing
|
||||
chamber.db.desc = (
|
||||
f"|cThe Chamber of {wing.title()}|n\n"
|
||||
f"This room holds the accumulated memories of the {wing} wing.\n"
|
||||
f"A steward stands ready to answer questions."
|
||||
)
|
||||
chamber.update_description()
|
||||
|
||||
# Link hall <-> chamber with exits
|
||||
exit_name = f"{wing}-chamber"
|
||||
existing_exits = [ex for ex in hall.exits if ex.key == exit_name]
|
||||
if not existing_exits:
|
||||
create_object(Exit, key=exit_name, location=hall, destination=chamber)
|
||||
|
||||
return_exits = [ex for ex in chamber.exits if ex.key == "hall"]
|
||||
if not return_exits:
|
||||
create_object(Exit, key="hall", location=chamber, destination=hall)
|
||||
|
||||
# Place or summon steward
|
||||
steward_key = f"{wing}-steward"
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
if steward.location != chamber:
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
else:
|
||||
steward = create_object(StewardNPC, key=steward_key)
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = wing.title()
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
|
||||
return hall
|
||||
87
docs/bezalel/evennia/palace_room.py
Normal file
87
docs/bezalel/evennia/palace_room.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""
|
||||
PalaceRoom
|
||||
|
||||
A Room that represents a topic in the memory palace.
|
||||
Memory objects spawned here embody concepts retrieved from mempalace.
|
||||
Its description auto-populates from a palace search on the memory topic.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultRoom
|
||||
from .objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class PalaceRoom(ObjectParent, DefaultRoom):
|
||||
"""
|
||||
A room in the mind palace. Its db.memory_topic describes what
|
||||
kind of memories are stored here. The description is populated
|
||||
from a live MemPalace search.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.memory_topic = ""
|
||||
self.db.wing = "bezalel"
|
||||
self.db.desc = (
|
||||
f"This is the |c{self.key}|n room of your mind palace.\n"
|
||||
"Memories and concepts drift here like motes of light.\n"
|
||||
"Use |wpalace/search <query>|n or |wrecall <topic>|n to summon memories."
|
||||
)
|
||||
|
||||
def _search_palace(self, query, wing=None, room=None, n=3):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def update_description(self):
|
||||
"""Refresh the room description from a palace search on its topic."""
|
||||
topic = self.db.memory_topic or self.key.split(":")[-1] if ":" in self.key else self.key
|
||||
wing = self.db.wing or "bezalel"
|
||||
results = self._search_palace(topic, wing=wing, n=3)
|
||||
|
||||
header = (
|
||||
f"=|c {topic.upper()} |n="
|
||||
)
|
||||
desc_lines = [
|
||||
header,
|
||||
f"You stand in the |c{topic}|n room of the |y{wing}|n wing.",
|
||||
"Memories drift here like motes of light.",
|
||||
"",
|
||||
]
|
||||
|
||||
if results:
|
||||
desc_lines.append("|gNearby memories:|n")
|
||||
for i, r in enumerate(results, 1):
|
||||
content = r.get("content", "")[:200]
|
||||
source = r.get("source", "unknown")
|
||||
room_name = r.get("room", "unknown")
|
||||
desc_lines.append(f" |m[{i}]|n |c{room_name}|n — {content}... |x({source})|n")
|
||||
else:
|
||||
desc_lines.append("|xThe palace is quiet here. No memories resonate with this topic yet.|n")
|
||||
|
||||
desc_lines.append("")
|
||||
desc_lines.append("Use |wrecall <query>|n to search deeper, or |wpalace/search <query>|n.")
|
||||
self.db.desc = "\n".join(desc_lines)
|
||||
|
||||
def at_object_receive(self, moved_obj, source_location, **kwargs):
|
||||
"""Refresh description when someone enters."""
|
||||
if moved_obj.has_account:
|
||||
self.update_description()
|
||||
super().at_object_receive(moved_obj, source_location, **kwargs)
|
||||
|
||||
def return_appearance(self, looker):
|
||||
text = super().return_appearance(looker)
|
||||
if self.db.memory_topic:
|
||||
text += f"\n|xTopic: {self.db.memory_topic}|n"
|
||||
return text
|
||||
70
docs/bezalel/evennia/steward_npc.py
Normal file
70
docs/bezalel/evennia/steward_npc.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
StewardNPC
|
||||
|
||||
A palace steward NPC that answers questions by querying the local
|
||||
or fleet MemPalace backend. One steward per wizard wing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultCharacter
|
||||
from typeclasses.objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class StewardNPC(ObjectParent, DefaultCharacter):
|
||||
"""
|
||||
A steward of the mind palace. Ask it about memories,
|
||||
decisions, or events from its wing.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.wing = "bezalel"
|
||||
self.db.steward_name = "Bezalel"
|
||||
self.db.desc = (
|
||||
f"|c{self.key}|n stands here quietly, eyes like polished steel, "
|
||||
"waiting to recall anything from the palace archives."
|
||||
)
|
||||
self.locks.add("get:false();delete:perm(Admin)")
|
||||
|
||||
def _search_palace(self, query, fleet=False, n=3):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
PALACE_SCRIPT,
|
||||
query,
|
||||
"none" if fleet else self.db.wing,
|
||||
"none",
|
||||
str(n),
|
||||
]
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _summarize_for_speech(self, results, query):
|
||||
"""Convert search results into in-character dialogue."""
|
||||
if not results:
|
||||
return "I find no memory of that in the palace."
|
||||
|
||||
lines = [f"Regarding '{query}':"]
|
||||
for r in results:
|
||||
room = r.get("room", "unknown")
|
||||
content = r.get("content", "")[:300]
|
||||
source = r.get("source", "unknown")
|
||||
lines.append(f" From the |c{room}|n room: {content}... |x[{source}]|n")
|
||||
return "\n".join(lines)
|
||||
|
||||
def respond_to_question(self, question, asker, fleet=False):
|
||||
results = self._search_palace(question, fleet=fleet, n=3)
|
||||
speech = self._summarize_for_speech(results, question)
|
||||
self.location.msg_contents(
|
||||
f"|c{self.key}|n says to $you(asker): \"{speech}\"",
|
||||
mapping={"asker": asker},
|
||||
from_obj=self,
|
||||
)
|
||||
@@ -1,44 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Apply branch protections to all repositories
|
||||
# Requires GITEA_TOKEN env var
|
||||
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for repo in "${REPOS[@]}"
|
||||
do
|
||||
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"required_reviews": 1,
|
||||
"dismiss_stale_reviews": true,
|
||||
"block_force_push": true,
|
||||
"block_deletions": true
|
||||
}'
|
||||
done
|
||||
#!/bin/bash
|
||||
|
||||
# Gitea API credentials
|
||||
GITEA_TOKEN="your-personal-access-token"
|
||||
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
# Repos to protect
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for REPO in "${REPO[@]}"; do
|
||||
echo "Configuring branch protection for $REPO..."
|
||||
|
||||
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"name": "main",
|
||||
"require_pull_request": true,
|
||||
"required_approvals": 1,
|
||||
"dismiss_stale_approvals": true,
|
||||
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
|
||||
"block_force_push": true,
|
||||
"block_delete": true
|
||||
}' \
|
||||
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
|
||||
done
|
||||
# Wrapper for the canonical branch-protection sync script.
|
||||
# Usage: ./gitea-branch-protection.sh
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
python3 scripts/sync_branch_protection.py
|
||||
|
||||
186
mempalace/fleet_api.py
Normal file
186
mempalace/fleet_api.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
|
||||
|
||||
Exposes fleet memory search over HTTP so that Alpha servers and other
|
||||
wizard deployments can query the palace without direct filesystem access.
|
||||
|
||||
Endpoints:
|
||||
GET /health
|
||||
Returns {"status": "ok", "palace": "<path>"}
|
||||
|
||||
GET /search?q=<query>[&room=<room>][&n=<int>]
|
||||
Returns {"results": [...], "query": "...", "room": "...", "count": N}
|
||||
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
|
||||
|
||||
GET /wings
|
||||
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
|
||||
|
||||
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
|
||||
|
||||
Usage:
|
||||
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
|
||||
python mempalace/fleet_api.py
|
||||
|
||||
# Custom host/port/palace:
|
||||
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
# Add repo root to path so we can import nexus.mempalace
|
||||
_HERE = Path(__file__).resolve().parent
|
||||
_REPO_ROOT = _HERE.parent
|
||||
if str(_REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 7771
|
||||
MAX_RESULTS = 50
|
||||
|
||||
|
||||
def _get_palace_path() -> Path:
|
||||
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
|
||||
|
||||
|
||||
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
|
||||
payload = json.dumps(body).encode()
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", str(len(payload)))
|
||||
handler.end_headers()
|
||||
handler.wfile.write(payload)
|
||||
|
||||
|
||||
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
|
||||
palace = _get_palace_path()
|
||||
_json_response(handler, 200, {
|
||||
"status": "ok",
|
||||
"palace": str(palace),
|
||||
"palace_exists": palace.exists(),
|
||||
})
|
||||
|
||||
|
||||
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
|
||||
query_terms = qs.get("q", [""])
|
||||
q = query_terms[0].strip() if query_terms else ""
|
||||
if not q:
|
||||
_json_response(handler, 400, {"error": "Missing required parameter: q"})
|
||||
return
|
||||
|
||||
room_terms = qs.get("room", [])
|
||||
room = room_terms[0].strip() if room_terms else None
|
||||
|
||||
n_terms = qs.get("n", [])
|
||||
try:
|
||||
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
|
||||
except (ValueError, IndexError):
|
||||
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
|
||||
return
|
||||
|
||||
try:
|
||||
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
|
||||
except ImportError as exc:
|
||||
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
|
||||
return
|
||||
|
||||
try:
|
||||
results = search_fleet(q, room=room, n_results=n)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_json_response(handler, 503, {"error": str(exc)})
|
||||
return
|
||||
|
||||
_json_response(handler, 200, {
|
||||
"query": q,
|
||||
"room": room,
|
||||
"count": len(results),
|
||||
"results": [
|
||||
{
|
||||
"text": r.text,
|
||||
"room": r.room,
|
||||
"wing": r.wing,
|
||||
"score": round(r.score, 4),
|
||||
}
|
||||
for r in results
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
|
||||
"""Return distinct wizard wing names found in the fleet palace directory."""
|
||||
palace = _get_palace_path()
|
||||
if not palace.exists():
|
||||
_json_response(handler, 503, {
|
||||
"error": f"Fleet palace not found: {palace}",
|
||||
})
|
||||
return
|
||||
|
||||
wings = sorted({
|
||||
p.name for p in palace.iterdir() if p.is_dir()
|
||||
})
|
||||
_json_response(handler, 200, {"wings": wings})
|
||||
|
||||
|
||||
class FleetAPIHandler(BaseHTTPRequestHandler):
|
||||
"""Request handler for the fleet memory API."""
|
||||
|
||||
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
|
||||
# Prefix with tag for easier log filtering
|
||||
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path.rstrip("/") or "/"
|
||||
qs = parse_qs(parsed.query)
|
||||
|
||||
if path == "/health":
|
||||
_handle_health(self)
|
||||
elif path == "/search":
|
||||
_handle_search(self, qs)
|
||||
elif path == "/wings":
|
||||
_handle_wings(self)
|
||||
else:
|
||||
_json_response(self, 404, {
|
||||
"error": f"Unknown endpoint: {path}",
|
||||
"endpoints": ["/health", "/search", "/wings"],
|
||||
})
|
||||
|
||||
|
||||
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
|
||||
return HTTPServer((host, port), FleetAPIHandler)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet palace HTTP API server."
|
||||
)
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
|
||||
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
palace = _get_palace_path()
|
||||
print(f"[fleet_api] Palace: {palace}")
|
||||
if not palace.exists():
|
||||
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
|
||||
|
||||
server = make_server(args.host, args.port)
|
||||
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[fleet_api] Shutting down.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -2,11 +2,17 @@
|
||||
Morning Report Generator — runs at 0600 to compile overnight activity.
|
||||
Gathers: cycles executed, issues closed, PRs merged, commits pushed.
|
||||
Outputs a structured report for delivery to the main channel.
|
||||
|
||||
Includes a HEARTBEAT PANEL that checks all cron job heartbeats via
|
||||
bezalel_heartbeat_check.py (poka-yoke #1096). Any stale jobs surface
|
||||
as blockers in the report.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
@@ -118,7 +124,46 @@ def generate_morning_report():
|
||||
if not report["highlights"] and not report["blockers"]:
|
||||
print("No significant activity or blockers detected.")
|
||||
print("")
|
||||
|
||||
|
||||
# ── Heartbeat panel (poka-yoke #1096) ────────────────────────────────────
|
||||
# Import bezalel_heartbeat_check via importlib so we don't need __init__.py
|
||||
# or a sys.path hack. If the module is missing or the dir doesn't exist,
|
||||
# we print a "not provisioned" notice and continue — never crash the report.
|
||||
_hb_result = None
|
||||
try:
|
||||
_project_root = Path(__file__).parent.parent
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check",
|
||||
_project_root / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
if _hb_spec is not None:
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules.setdefault("bezalel_heartbeat_check", _hb_mod)
|
||||
_hb_spec.loader.exec_module(_hb_mod) # type: ignore[union-attr]
|
||||
_hb_result = _hb_mod.check_cron_heartbeats()
|
||||
except Exception:
|
||||
_hb_result = None
|
||||
|
||||
print("HEARTBEAT PANEL:")
|
||||
if _hb_result is None or not _hb_result.get("jobs"):
|
||||
print(" HEARTBEAT PANEL: no data (bezalel not provisioned)")
|
||||
report["heartbeat_panel"] = {"status": "not_provisioned"}
|
||||
else:
|
||||
for _job in _hb_result["jobs"]:
|
||||
_prefix = "+" if _job["healthy"] else "-"
|
||||
print(f" {_prefix} {_job['job']}: {_job['message']}")
|
||||
if not _job["healthy"]:
|
||||
report["blockers"].append(
|
||||
f"Stale heartbeat: {_job['job']} — {_job['message']}"
|
||||
)
|
||||
print("")
|
||||
report["heartbeat_panel"] = {
|
||||
"checked_at": _hb_result.get("checked_at"),
|
||||
"healthy_count": _hb_result.get("healthy_count", 0),
|
||||
"stale_count": _hb_result.get("stale_count", 0),
|
||||
"jobs": _hb_result.get("jobs", []),
|
||||
}
|
||||
|
||||
# Save report
|
||||
report_dir = Path(os.path.expanduser("~/.local/timmy/reports"))
|
||||
report_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
95
scripts/audit_mempalace_privacy.py
Normal file
95
scripts/audit_mempalace_privacy.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Audit the fleet shared palace for privacy violations.
|
||||
Ensures no raw drawers, full source paths, or private workspace leaks exist.
|
||||
|
||||
Usage:
|
||||
python audit_mempalace_privacy.py /path/to/fleet/palace
|
||||
|
||||
Exit codes:
|
||||
0 = clean
|
||||
1 = violations found
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import chromadb
|
||||
except ImportError:
|
||||
print("ERROR: chromadb not installed")
|
||||
sys.exit(1)
|
||||
|
||||
VIOLATION_KEYWORDS = [
|
||||
"/root/wizards/",
|
||||
"/home/",
|
||||
"/Users/",
|
||||
"private_key",
|
||||
"-----BEGIN",
|
||||
"GITEA_TOKEN=",
|
||||
"OPENAI_API_KEY",
|
||||
"ANTHROPIC_API_KEY",
|
||||
]
|
||||
|
||||
|
||||
def audit(palace_path: Path):
|
||||
violations = []
|
||||
client = chromadb.PersistentClient(path=str(palace_path))
|
||||
try:
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
except Exception as e:
|
||||
print(f"ERROR: Could not open collection: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
all_data = col.get(include=["documents", "metadatas"])
|
||||
docs = all_data["documents"]
|
||||
metas = all_data["metadatas"]
|
||||
|
||||
for doc, meta in zip(docs, metas):
|
||||
source = meta.get("source_file", "")
|
||||
doc_type = meta.get("type", "")
|
||||
|
||||
# Rule 1: Fleet palace should only contain closets or explicitly typed entries
|
||||
if doc_type not in ("closet", "summary", "fleet"):
|
||||
violations.append(
|
||||
f"VIOLATION: Document type is '{doc_type}' (expected closet/summary/fleet). "
|
||||
f"Source: {source}"
|
||||
)
|
||||
|
||||
# Rule 2: No full absolute paths from private workspaces
|
||||
if any(abs_path in source for abs_path in ["/root/wizards/", "/home/", "/Users/"]):
|
||||
violations.append(
|
||||
f"VIOLATION: Source contains absolute path: {source}"
|
||||
)
|
||||
|
||||
# Rule 3: No raw secrets in document text
|
||||
for kw in VIOLATION_KEYWORDS:
|
||||
if kw in doc:
|
||||
violations.append(
|
||||
f"VIOLATION: Document contains sensitive keyword '{kw}'. Source: {source}"
|
||||
)
|
||||
break # one violation per doc is enough
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Audit fleet palace privacy")
|
||||
parser.add_argument("palace", default="/var/lib/mempalace/fleet", nargs="?", help="Path to fleet palace")
|
||||
args = parser.parse_args()
|
||||
|
||||
violations = audit(Path(args.palace))
|
||||
|
||||
if violations:
|
||||
print(f"FAIL: {len(violations)} privacy violation(s) found")
|
||||
for v in violations:
|
||||
print(f" {v}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("PASS: No privacy violations detected")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet Merge Review Audit
|
||||
========================
|
||||
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||
and validates that each merged PR had at least one approving review.
|
||||
|
||||
Exit 0 = no unreviewed merges
|
||||
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||
|
||||
Usage:
|
||||
python scripts/audit_merge_reviews.py
|
||||
python scripts/audit_merge_reviews.py --create-issues
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import json
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
DAYS_BACK = 7
|
||||
SECURITY_LABEL = "security"
|
||||
|
||||
|
||||
def api_request(path: str) -> dict | list:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def api_post(path: str, payload: dict) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repos() -> list[str]:
|
||||
repos = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||
if not batch:
|
||||
break
|
||||
repos.extend([r["name"] for r in batch])
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(
|
||||
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||
prs.append(pr)
|
||||
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||
return prs
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
|
||||
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||
try:
|
||||
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
|
||||
|
||||
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||
body = (
|
||||
f"## Unreviewed Merge Detected\n\n"
|
||||
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||
f"- **Merged at:** {pr['merged_at']}\n"
|
||||
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||
f"### Required Actions\n"
|
||||
f"1. Validate the merge contents are safe.\n"
|
||||
f"2. If malicious or incorrect, revert immediately.\n"
|
||||
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||
)
|
||||
try:
|
||||
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [SECURITY_LABEL],
|
||||
})
|
||||
return issue.get("number")
|
||||
except Exception as e:
|
||||
print(f" FAILED to create issue: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||
return 1
|
||||
|
||||
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||
since = since_dt.isoformat()
|
||||
|
||||
repos = get_repos()
|
||||
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||
|
||||
unreviewed_count = 0
|
||||
for repo in repos:
|
||||
merged = get_merged_prs(repo, since)
|
||||
if not merged:
|
||||
continue
|
||||
|
||||
repo_unreviewed = []
|
||||
for pr in merged:
|
||||
reviews = get_reviews(repo, pr["number"])
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if not approvals:
|
||||
repo_unreviewed.append(pr)
|
||||
|
||||
if repo_unreviewed:
|
||||
print(f"\n{repo}:")
|
||||
for pr in repo_unreviewed:
|
||||
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||
unreviewed_count += 1
|
||||
if args.create_issues:
|
||||
issue_num = create_post_mortem(repo, pr)
|
||||
if issue_num:
|
||||
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
if unreviewed_count == 0:
|
||||
print("All merges in the last 7 days had at least one approving review.")
|
||||
return 0
|
||||
else:
|
||||
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
50
scripts/backup_databases.sh
Executable file
50
scripts/backup_databases.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel Database Backup — MemPalace + Evennia + Fleet
|
||||
# Runs nightly after re-mine completes. Keeps 7 days of rolling backups.
|
||||
set -euo pipefail
|
||||
|
||||
BACKUP_BASE="/root/wizards/bezalel/home/backups"
|
||||
DATE=$(date +%Y%m%d_%H%M%S)
|
||||
LOG="/var/log/bezalel_db_backup.log"
|
||||
|
||||
# Sources
|
||||
LOCAL_PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_PALACE="/var/lib/mempalace/fleet"
|
||||
EVENNIA_DB="/root/wizards/bezalel/evennia/bezalel_world/server/evennia.db3"
|
||||
|
||||
# Destinations
|
||||
LOCAL_BACKUP="${BACKUP_BASE}/mempalace/mempalace_${DATE}.tar.gz"
|
||||
FLEET_BACKUP="${BACKUP_BASE}/fleet/fleet_${DATE}.tar.gz"
|
||||
EVENNIA_BACKUP="${BACKUP_BASE}/evennia/evennia_${DATE}.db3.gz"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting database backup cycle..."
|
||||
|
||||
# 1. Backup local MemPalace
|
||||
tar -czf "$LOCAL_BACKUP" -C "$(dirname "$LOCAL_PALACE")" "$(basename "$LOCAL_PALACE")"
|
||||
log "Local palace backed up: ${LOCAL_BACKUP} ($(du -h "$LOCAL_BACKUP" | cut -f1))"
|
||||
|
||||
# 2. Backup fleet MemPalace
|
||||
tar -czf "$FLEET_BACKUP" -C "$(dirname "$FLEET_PALACE")" "$(basename "$FLEET_PALACE")"
|
||||
log "Fleet palace backed up: ${FLEET_BACKUP} ($(du -h "$FLEET_BACKUP" | cut -f1))"
|
||||
|
||||
# 3. Backup Evennia DB (gzip for space)
|
||||
gzip -c "$EVENNIA_DB" > "$EVENNIA_BACKUP"
|
||||
log "Evennia DB backed up: ${EVENNIA_BACKUP} ($(du -h "$EVENNIA_BACKUP" | cut -f1))"
|
||||
|
||||
# 4. Prune backups older than 7 days
|
||||
find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' -mtime +7 -delete
|
||||
log "Pruned backups older than 7 days"
|
||||
|
||||
# 5. Report counts
|
||||
MP_COUNT=$(find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' | wc -l)
|
||||
FL_COUNT=$(find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' | wc -l)
|
||||
EV_COUNT=$(find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' | wc -l)
|
||||
log "Backup cycle complete. Retained: mempalace=${MP_COUNT}, fleet=${FL_COUNT}, evennia=${EV_COUNT}"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/db_backup.last
|
||||
135
scripts/ci_auto_revert.py
Normal file
135
scripts/ci_auto_revert.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CI Auto-Revert — Poka-yoke for broken merges.
|
||||
Monitors the main branch post-merge and auto-reverts via local git if CI fails.
|
||||
|
||||
Usage:
|
||||
python ci_auto_revert.py <repo_owner>/<repo_name>
|
||||
python ci_auto_revert.py Timmy_Foundation/hermes-agent
|
||||
|
||||
Recommended cron: */10 * * * *
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REVERT_WINDOW_MINUTES = 10
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_recent_commits(owner, repo, since):
|
||||
since_iso = since.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits?sha=main&since={since_iso}&limit=20")
|
||||
|
||||
|
||||
def get_commit_status(owner, repo, sha):
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits/{sha}/status")
|
||||
|
||||
|
||||
def revert_via_git(clone_url, sha, msg, owner, repo):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Clone with token
|
||||
auth_url = clone_url.replace("https://", f"https://bezalel:{GITEA_TOKEN}@")
|
||||
subprocess.run(["git", "clone", "--depth", "10", auth_url, tmpdir], check=True, capture_output=True)
|
||||
|
||||
# Configure git
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.email", "bezalel@timmy.foundation"], check=True, capture_output=True)
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.name", "Bezalel"], check=True, capture_output=True)
|
||||
|
||||
# Revert the commit
|
||||
revert_msg = f"[auto-revert] {msg}\n\nOriginal commit {sha} failed CI."
|
||||
result = subprocess.run(
|
||||
["git", "-C", tmpdir, "revert", "--no-edit", "-m", revert_msg, sha],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return {"error": f"git revert failed: {result.stderr}"}
|
||||
|
||||
# Push
|
||||
push_result = subprocess.run(
|
||||
["git", "-C", tmpdir, "push", "origin", "main"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if push_result.returncode != 0:
|
||||
return {"error": f"git push failed: {push_result.stderr}"}
|
||||
|
||||
return {"ok": True, "reverted_sha": sha}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: {sys.argv[0]} <owner/repo>")
|
||||
sys.exit(1)
|
||||
|
||||
repo_full = sys.argv[1]
|
||||
owner, repo = repo_full.split("/", 1)
|
||||
|
||||
since = datetime.now(timezone.utc) - timedelta(minutes=REVERT_WINDOW_MINUTES + 5)
|
||||
commits = get_recent_commits(owner, repo, since)
|
||||
|
||||
if not isinstance(commits, list):
|
||||
print(f"ERROR fetching commits: {commits}")
|
||||
sys.exit(1)
|
||||
|
||||
reverted = 0
|
||||
for commit in commits:
|
||||
sha = commit.get("sha", "")
|
||||
msg = commit.get("commit", {}).get("message", "").split("\n")[0]
|
||||
commit_time = commit.get("commit", {}).get("committer", {}).get("date", "")
|
||||
if not commit_time:
|
||||
continue
|
||||
|
||||
commit_dt = datetime.fromisoformat(commit_time.replace("Z", "+00:00"))
|
||||
age_min = (datetime.now(timezone.utc) - commit_dt).total_seconds() / 60
|
||||
|
||||
if age_min > REVERT_WINDOW_MINUTES:
|
||||
continue
|
||||
|
||||
status = get_commit_status(owner, repo, sha)
|
||||
state = status.get("state", "")
|
||||
|
||||
if state == "failure":
|
||||
print(f"ALERT: Commit {sha[:8]} '{msg}' failed CI ({age_min:.1f}m old). Initiating revert...")
|
||||
repo_info = api_call("GET", f"/repos/{owner}/{repo}")
|
||||
clone_url = repo_info.get("clone_url", "")
|
||||
if not clone_url:
|
||||
print(f" Cannot find clone URL")
|
||||
continue
|
||||
result = revert_via_git(clone_url, sha, msg, owner, repo)
|
||||
if "error" in result:
|
||||
print(f" Revert failed: {result['error']}")
|
||||
else:
|
||||
print(f" Reverted successfully.")
|
||||
reverted += 1
|
||||
elif state == "success":
|
||||
print(f"OK: Commit {sha[:8]} '{msg}' passed CI.")
|
||||
elif state == "pending":
|
||||
print(f"PENDING: Commit {sha[:8]} '{msg}' still running CI.")
|
||||
else:
|
||||
print(f"UNKNOWN: Commit {sha[:8]} '{msg}' has CI state '{state}'.")
|
||||
|
||||
if reverted == 0:
|
||||
print("No broken merges found in the last 10 minutes.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
115
scripts/cron-heartbeat-write.sh
Executable file
115
scripts/cron-heartbeat-write.sh
Executable file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env bash
|
||||
# cron-heartbeat-write.sh — Bezalel Cron Heartbeat Writer (poka-yoke #1096)
|
||||
# Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
#
|
||||
# POKA-YOKE design:
|
||||
# Prevention — Cron jobs declare their identity + expected interval up front.
|
||||
# Detection — bezalel_heartbeat_check.py reads these files every 15 min and
|
||||
# alerts P1 if any job is silent for > 2× its interval.
|
||||
# Correction — Alerts fire fast enough for manual intervention or auto-restart
|
||||
# before the next scheduled run window expires.
|
||||
#
|
||||
# Usage:
|
||||
# cron-heartbeat-write.sh <job-name> [interval-seconds]
|
||||
#
|
||||
# <job-name> Unique identifier for this cron job (e.g. "morning-report")
|
||||
# [interval-seconds] Expected run interval in seconds (default: 3600)
|
||||
#
|
||||
# The heartbeat file is written to:
|
||||
# /var/run/bezalel/heartbeats/<job-name>.last
|
||||
#
|
||||
# File format (JSON):
|
||||
# {"job":"<name>","timestamp":<epoch_float>,"interval":<secs>,"pid":<pid>}
|
||||
#
|
||||
# This script ALWAYS exits 0 — it must never crash the calling cron job.
|
||||
#
|
||||
# Typical crontab usage:
|
||||
# 0 * * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh hourly-job 3600
|
||||
# 0 6 * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh morning-report 86400
|
||||
|
||||
set -uo pipefail
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
HEARTBEAT_DIR="${BEZALEL_HEARTBEAT_DIR:-/var/run/bezalel/heartbeats}"
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT: $*"; }
|
||||
warn() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT WARNING: $*" >&2; }
|
||||
|
||||
# ── Input validation ──────────────────────────────────────────────────────────
|
||||
if [[ $# -lt 1 ]]; then
|
||||
warn "Usage: $0 <job-name> [interval-seconds]"
|
||||
warn "No job name provided — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
JOB_NAME="$1"
|
||||
INTERVAL_SECS="${2:-3600}"
|
||||
|
||||
# Sanitize job name to prevent path traversal / weird filenames
|
||||
# Allow alphanumeric, dash, underscore, dot only
|
||||
SAFE_JOB_NAME="${JOB_NAME//[^a-zA-Z0-9_.-]/}"
|
||||
if [[ -z "$SAFE_JOB_NAME" ]]; then
|
||||
warn "Job name '${JOB_NAME}' contains only unsafe characters — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [[ "$SAFE_JOB_NAME" != "$JOB_NAME" ]]; then
|
||||
warn "Job name sanitized: '${JOB_NAME}' → '${SAFE_JOB_NAME}'"
|
||||
fi
|
||||
|
||||
# Validate interval is a positive integer
|
||||
if ! [[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || (( INTERVAL_SECS < 1 )); then
|
||||
warn "Invalid interval '${INTERVAL_SECS}' — using default 3600."
|
||||
INTERVAL_SECS=3600
|
||||
fi
|
||||
|
||||
# ── Create heartbeat directory ────────────────────────────────────────────────
|
||||
if ! mkdir -p "$HEARTBEAT_DIR" 2>/dev/null; then
|
||||
warn "Cannot create heartbeat dir '${HEARTBEAT_DIR}' — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ── Build JSON payload ────────────────────────────────────────────────────────
|
||||
# Use python3 for reliable epoch float and JSON encoding.
|
||||
# Falls back to date-based approach if python3 unavailable.
|
||||
TIMESTAMP=$(python3 -c "import time; print(time.time())" 2>/dev/null \
|
||||
|| date +%s)
|
||||
|
||||
CURRENT_PID=$$
|
||||
|
||||
PAYLOAD=$(python3 -c "
|
||||
import json, sys
|
||||
print(json.dumps({
|
||||
'job': sys.argv[1],
|
||||
'timestamp': float(sys.argv[2]),
|
||||
'interval': int(sys.argv[3]),
|
||||
'pid': int(sys.argv[4]),
|
||||
}))
|
||||
" "$SAFE_JOB_NAME" "$TIMESTAMP" "$INTERVAL_SECS" "$CURRENT_PID" 2>/dev/null)
|
||||
|
||||
if [[ -z "$PAYLOAD" ]]; then
|
||||
# Minimal fallback if python3 fails
|
||||
PAYLOAD="{\"job\":\"${SAFE_JOB_NAME}\",\"timestamp\":${TIMESTAMP},\"interval\":${INTERVAL_SECS},\"pid\":${CURRENT_PID}}"
|
||||
fi
|
||||
|
||||
# ── Atomic write via temp + rename ────────────────────────────────────────────
|
||||
# Writes to a temp file first then renames, so bezalel_heartbeat_check.py
|
||||
# never sees a partial file mid-write. This is the poka-yoke atomic guarantee.
|
||||
TARGET_FILE="${HEARTBEAT_DIR}/${SAFE_JOB_NAME}.last"
|
||||
TMP_FILE="${HEARTBEAT_DIR}/.${SAFE_JOB_NAME}.last.tmp.$$"
|
||||
|
||||
if printf '%s\n' "$PAYLOAD" > "$TMP_FILE" 2>/dev/null; then
|
||||
if mv "$TMP_FILE" "$TARGET_FILE" 2>/dev/null; then
|
||||
log "Heartbeat written: ${TARGET_FILE} (job=${SAFE_JOB_NAME}, interval=${INTERVAL_SECS}s)"
|
||||
else
|
||||
warn "mv failed for '${TMP_FILE}' → '${TARGET_FILE}' — heartbeat not committed."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
else
|
||||
warn "Write to temp file '${TMP_FILE}' failed — heartbeat not written."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# Always exit 0 — never crash the calling cron job.
|
||||
exit 0
|
||||
75
scripts/mempalace_export.py
Normal file
75
scripts/mempalace_export.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Export closets from a local MemPalace wing for fleet-wide sharing.
|
||||
|
||||
Privacy rule: only summaries/closets are exported. No raw source_file paths.
|
||||
Source filenames are anonymized to just the basename.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import chromadb
|
||||
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_INCOMING = "/var/lib/mempalace/fleet/incoming"
|
||||
WING = "bezalel"
|
||||
DOCS_PER_ROOM = 5
|
||||
|
||||
|
||||
def main():
|
||||
client = chromadb.PersistentClient(path=PALACE_PATH)
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
|
||||
# Discover rooms in this wing
|
||||
all_meta = col.get(include=["metadatas"])["metadatas"]
|
||||
rooms = set()
|
||||
for m in all_meta:
|
||||
if m.get("wing") == WING:
|
||||
rooms.add(m.get("room", "general"))
|
||||
|
||||
Path(FLEET_INCOMING).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
closets = []
|
||||
for room in sorted(rooms):
|
||||
results = col.query(
|
||||
query_texts=[room],
|
||||
n_results=DOCS_PER_ROOM,
|
||||
where={"$and": [{"wing": WING}, {"room": room}]},
|
||||
include=["documents", "metadatas"],
|
||||
)
|
||||
docs = results["documents"][0]
|
||||
metas = results["metadatas"][0]
|
||||
|
||||
entries = []
|
||||
for doc, meta in zip(docs, metas):
|
||||
# Sanitize content: strip absolute workspace paths
|
||||
sanitized = doc[:800]
|
||||
sanitized = sanitized.replace("/root/wizards/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/root/wizards/", "~/")
|
||||
sanitized = sanitized.replace("/home/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/home/", "~/")
|
||||
entries.append({
|
||||
"content": sanitized,
|
||||
"source_basename": Path(meta.get("source_file", "?")).name,
|
||||
})
|
||||
|
||||
closet = {
|
||||
"wing": WING,
|
||||
"room": room,
|
||||
"type": "closet",
|
||||
"entries": entries,
|
||||
}
|
||||
closets.append(closet)
|
||||
|
||||
out_file = Path(FLEET_INCOMING) / f"{WING}_closets.json"
|
||||
with open(out_file, "w") as f:
|
||||
json.dump(closets, f, indent=2)
|
||||
|
||||
print(f"Exported {len(closets)} closets to {out_file}")
|
||||
for c in closets:
|
||||
print(f" {c['wing']} / {c['room']} : {len(c['entries'])} entries")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
24
scripts/mempalace_nightly.sh
Executable file
24
scripts/mempalace_nightly.sh
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel MemPalace Nightly Re-mine + Fleet Sync
|
||||
set -euo pipefail
|
||||
|
||||
PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
MINER="/root/wizards/bezalel/hermes/venv/bin/mempalace"
|
||||
WING_DIR="/root/wizards/bezalel"
|
||||
LOG="/var/log/bezalel_mempalace.log"
|
||||
EXPORTER="/root/wizards/bezalel/hermes/venv/bin/python /root/wizards/bezalel/mempalace_export.py"
|
||||
IMPORTER="/root/wizards/bezalel/hermes/venv/bin/python /var/lib/mempalace/fleet_import.py"
|
||||
|
||||
echo "[$(date -Iseconds)] Starting mempalace re-mine" >> "$LOG"
|
||||
cd "$WING_DIR"
|
||||
"$MINER" --palace "$PALACE" mine "$WING_DIR" --agent bezalel >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Finished mempalace re-mine" >> "$LOG"
|
||||
"$MINER" --palace "$PALACE" status >> "$LOG" 2>&1 || true
|
||||
|
||||
echo "[$(date -Iseconds)] Starting fleet closet export" >> "$LOG"
|
||||
$EXPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Starting fleet closet import" >> "$LOG"
|
||||
$IMPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Fleet sync complete" >> "$LOG"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/mempalace_nightly.last
|
||||
53
scripts/meta_heartbeat.sh
Executable file
53
scripts/meta_heartbeat.sh
Executable file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env bash
|
||||
# Meta-heartbeat — checks all Bezalel cron jobs for stale timestamps
|
||||
set -euo pipefail
|
||||
|
||||
HEARTBEAT_DIR="/var/lib/bezalel/heartbeats"
|
||||
ALERT_LOG="/var/log/bezalel_meta_heartbeat.log"
|
||||
STALE_MINUTES=30
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
mkdir -p "$HEARTBEAT_DIR"
|
||||
|
||||
# Define expected heartbeats: name => max_stale_minutes
|
||||
HEARTBEATS=(
|
||||
"nightly_watch:150" # 2.5h — runs at 02:00
|
||||
"mempalace_nightly:150" # 2.5h — runs at 03:00
|
||||
"db_backup:150" # 2.5h — runs at 03:30
|
||||
"runner_health:15" # 15m — every 5 min
|
||||
)
|
||||
|
||||
NOW_EPOCH=$(date +%s)
|
||||
FAILURES=0
|
||||
|
||||
for entry in "${HEARTBEATS[@]}"; do
|
||||
name="${entry%%:*}"
|
||||
max_minutes="${entry##*:}"
|
||||
file="${HEARTBEAT_DIR}/${name}.last"
|
||||
|
||||
if [[ ! -f "$file" ]]; then
|
||||
log "MISSING: $name heartbeat file not found ($file)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
continue
|
||||
fi
|
||||
|
||||
LAST_EPOCH=$(stat -c %Y "$file")
|
||||
AGE_MIN=$(( (NOW_EPOCH - LAST_EPOCH) / 60 ))
|
||||
|
||||
if [[ $AGE_MIN -gt $max_minutes ]]; then
|
||||
log "STALE: $name is ${AGE_MIN}m old (max ${max_minutes}m)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
else
|
||||
log "OK: $name is ${AGE_MIN}m old"
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ $FAILURES -gt 0 ]]; then
|
||||
log "ALERT: $FAILURES stale/missing heartbeat(s) detected."
|
||||
exit 1
|
||||
else
|
||||
log "ALL_OK: All heartbeats healthy."
|
||||
fi
|
||||
70
scripts/review_gate.py
Normal file
70
scripts/review_gate.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Review Gate — Poka-yoke for unreviewed merges.
|
||||
Fails if the current PR has fewer than 1 approving review.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Review Approval Gate
|
||||
run: python scripts/review_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "")
|
||||
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
if not REPO:
|
||||
print("ERROR: GITEA_REPO not set")
|
||||
sys.exit(1)
|
||||
|
||||
pr_number = PR_NUMBER
|
||||
if not pr_number:
|
||||
# Try to infer from Gitea Actions environment
|
||||
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
||||
|
||||
if not pr_number:
|
||||
print("ERROR: Could not determine PR number")
|
||||
sys.exit(1)
|
||||
|
||||
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
|
||||
if isinstance(reviews, dict) and "error" in reviews:
|
||||
print(f"ERROR fetching reviews: {reviews}")
|
||||
sys.exit(1)
|
||||
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if len(approvals) >= 1:
|
||||
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
|
||||
print("Merges are not permitted without at least one approval.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
46
scripts/runner_health_probe.sh
Executable file
46
scripts/runner_health_probe.sh
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env bash
|
||||
# Gitea Runner Health Probe — Poka-yoke for unregistered runners
|
||||
set -euo pipefail
|
||||
|
||||
GITEA_TOKEN="${GITEA_TOKEN:-}"
|
||||
GITEA_URL="https://forge.alexanderwhitestone.com"
|
||||
ALERT_LOG="/var/log/bezalel_runner_health.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
if [[ -z "$GITEA_TOKEN" ]]; then
|
||||
log "ERROR: GITEA_TOKEN not set"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ACTIVE_RUNNERS=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
|
||||
log "Active runners: ${ACTIVE_RUNNERS}"
|
||||
|
||||
if [[ "$ACTIVE_RUNNERS" -eq 0 ]]; then
|
||||
log "CRITICAL: Zero active runners detected. Attempting self-healing restart."
|
||||
pkill -f "act_runner daemon" 2>/dev/null || true
|
||||
sleep 2
|
||||
cd /opt/gitea-runner && nohup ./act_runner daemon > /var/log/gitea-runner.log 2>&1 &
|
||||
sleep 3
|
||||
# Re-check
|
||||
ACTIVE_RUNNERS_AFTER=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
log "Active runners after restart: ${ACTIVE_RUNNERS_AFTER}"
|
||||
if [[ "$ACTIVE_RUNNERS_AFTER" -eq 0 ]]; then
|
||||
log "CRITICAL: Self-healing failed. Runner still offline."
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
exit 1
|
||||
else
|
||||
log "RECOVERED: Runner back online."
|
||||
fi
|
||||
else
|
||||
log "OK: ${ACTIVE_RUNNERS} runner(s) online."
|
||||
fi
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
50
scripts/secret_guard.sh
Executable file
50
scripts/secret_guard.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Secret Guard — Poka-yoke for world-readable credentials
|
||||
set -euo pipefail
|
||||
|
||||
ALERT_LOG="/var/log/bezalel_secret_guard.log"
|
||||
QUARANTINE_DIR="/root/wizards/bezalel/home/quarantine"
|
||||
|
||||
mkdir -p "$QUARANTINE_DIR"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
# Scan for world-readable files with sensitive keywords in /root, /home, /etc, /tmp, /var/log
|
||||
# Exclude binary files, large files (>1MB), and known safe paths
|
||||
BAD_FILES=$(find /root /home /etc /tmp /var/log -maxdepth 4 -type f -perm /o+r 2>/dev/null \
|
||||
! -path "*/.git/*" \
|
||||
! -path "*/node_modules/*" \
|
||||
! -path "*/venv/*" \
|
||||
! -path "*/.venv/*" \
|
||||
! -path "*/__pycache__/*" \
|
||||
! -path "*/.pyc" \
|
||||
! -size +1M \
|
||||
-exec grep -l -i -E 'password|token|secret|nsec|api_key|private_key|aws_access_key_id|aws_secret_access_key' {} + 2>/dev/null | head -50)
|
||||
|
||||
VIOLATIONS=0
|
||||
for file in $BAD_FILES; do
|
||||
# Skip if already quarantined
|
||||
if [[ "$file" == "$QUARANTINE_DIR"* ]]; then
|
||||
continue
|
||||
fi
|
||||
# Skip log files that are expected to be world-readable
|
||||
if [[ "$file" == /var/log/* ]]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
VIOLATIONS=$((VIOLATIONS + 1))
|
||||
basename=$(basename "$file")
|
||||
quarantine_path="${QUARANTINE_DIR}/${basename}.$(date +%s)"
|
||||
cp "$file" "$quarantine_path"
|
||||
chmod 600 "$quarantine_path"
|
||||
chmod 600 "$file"
|
||||
log "QUARANTINED: $file -> $quarantine_path (permissions fixed to 600)"
|
||||
done
|
||||
|
||||
if [[ $VIOLATIONS -gt 0 ]]; then
|
||||
log "ALERT: $VIOLATIONS world-readable secret file(s) detected and quarantined."
|
||||
else
|
||||
log "OK: No world-readable secret files found."
|
||||
fi
|
||||
77
scripts/staging_gate.py
Normal file
77
scripts/staging_gate.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Staging Gate — Poka-yoke for production deployments.
|
||||
Checks if the PR that introduced the current commit was marked `staging-verified`.
|
||||
Fails the workflow if not, blocking deploy.yml from proceeding.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Staging Verification Gate
|
||||
run: python scripts/staging_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_commit_sha():
|
||||
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def get_pr_for_commit(sha):
|
||||
# Search open and closed PRs for this commit
|
||||
for state in ["closed", "open"]:
|
||||
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
|
||||
if isinstance(prs, list):
|
||||
for pr in prs:
|
||||
if pr.get("merge_commit_sha") == sha:
|
||||
return pr
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
sha = get_commit_sha()
|
||||
pr = get_pr_for_commit(sha)
|
||||
|
||||
if not pr:
|
||||
# Direct push to main without PR — block unless explicitly forced
|
||||
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
|
||||
print("To bypass, merge via PR and add the 'staging-verified' label.")
|
||||
sys.exit(1)
|
||||
|
||||
labels = {label["name"] for label in pr.get("labels", [])}
|
||||
if "staging-verified" in labels:
|
||||
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
|
||||
print("Deploy to production is not permitted until staging is verified.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
81
scripts/sync_branch_protection.py
Normal file
81
scripts/sync_branch_protection.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
|
||||
Correctly uses the Gitea 1.25+ API (not GitHub-style).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
import yaml
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
CONFIG_DIR = ".gitea/branch-protection"
|
||||
|
||||
|
||||
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode() if payload else None
|
||||
req = urllib.request.Request(url, data=data, method=method, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def apply_protection(repo: str, rules: dict) -> bool:
|
||||
branch = rules.pop("branch", "main")
|
||||
# Check if protection already exists
|
||||
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
|
||||
exists = any(r.get("branch_name") == branch for r in existing)
|
||||
|
||||
payload = {
|
||||
"branch_name": branch,
|
||||
"rule_name": branch,
|
||||
"required_approvals": rules.get("required_approvals", 1),
|
||||
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
|
||||
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
|
||||
"block_deletions": rules.get("block_deletions", True),
|
||||
"block_force_push": rules.get("block_force_push", True),
|
||||
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
|
||||
"enable_status_check": rules.get("require_ci_to_merge", False),
|
||||
"status_check_contexts": rules.get("status_check_contexts", []),
|
||||
}
|
||||
|
||||
try:
|
||||
if exists:
|
||||
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
|
||||
else:
|
||||
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
|
||||
print(f"✅ {repo}:{branch} synced")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ {repo}:{branch} failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
return 1
|
||||
|
||||
ok = 0
|
||||
for fname in os.listdir(CONFIG_DIR):
|
||||
if not fname.endswith(".yml"):
|
||||
continue
|
||||
repo = fname[:-4]
|
||||
with open(os.path.join(CONFIG_DIR, fname)) as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
if apply_protection(repo, cfg.get("rules", {})):
|
||||
ok += 1
|
||||
|
||||
print(f"\nSynced {ok} repo(s)")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
30
scripts/sync_fleet_to_alpha.sh
Normal file
30
scripts/sync_fleet_to_alpha.sh
Normal file
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env bash
|
||||
# Sync Fleet MemPalace from Beta to Alpha
|
||||
# Usage: ./sync_fleet_to_alpha.sh
|
||||
set -euo pipefail
|
||||
|
||||
FLEET_DIR="/var/lib/mempalace/fleet"
|
||||
ALPHA_HOST="167.99.126.228"
|
||||
ALPHA_USER="root"
|
||||
ALPHA_DEST="/var/lib/mempalace/fleet"
|
||||
LOG="/var/log/bezalel_alpha_sync.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting fleet palace sync to Alpha (${ALPHA_HOST})..."
|
||||
|
||||
# Ensure Alpha destination exists (SSH must be configured key-based or agent-forwarded)
|
||||
ssh -o ConnectTimeout=10 "${ALPHA_USER}@${ALPHA_HOST}" "mkdir -p ${ALPHA_DEST}" || {
|
||||
log "ERROR: Cannot reach Alpha host. Aborting."
|
||||
exit 1
|
||||
}
|
||||
|
||||
# rsync the fleet palace directory (ChromaDB files + incoming closets)
|
||||
rsync -avz --delete \
|
||||
-e "ssh -o ConnectTimeout=10" \
|
||||
"${FLEET_DIR}/" \
|
||||
"${ALPHA_USER}@${ALPHA_HOST}:${ALPHA_DEST}/" >> "$LOG" 2>&1
|
||||
|
||||
log "Fleet palace sync complete."
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — stale cron detection (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
ExecStart=/root/wizards/the-nexus/bin/bezalel_heartbeat_check.py
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
Environment=HOME=/root
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — fires every 15 minutes (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
|
||||
[Timer]
|
||||
OnBootSec=5min
|
||||
OnUnitActiveSec=15min
|
||||
Persistent=true
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
123
scripts/validate_mempalace_taxonomy.py
Normal file
123
scripts/validate_mempalace_taxonomy.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validate a wizard's mempalace.yaml against the fleet taxonomy standard.
|
||||
|
||||
Usage:
|
||||
python validate_mempalace_taxonomy.py /path/to/mempalace.yaml
|
||||
python validate_mempalace_taxonomy.py --ci /path/to/mempalace.yaml
|
||||
|
||||
Exit codes:
|
||||
0 = valid
|
||||
1 = missing required rooms or other violations
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
REQUIRED_ROOMS = {
|
||||
"forge",
|
||||
"hermes",
|
||||
"nexus",
|
||||
"issues",
|
||||
"experiments",
|
||||
}
|
||||
|
||||
|
||||
def load_standard():
|
||||
# Try to find the fleet standard in the-nexus clone or local path
|
||||
candidates = [
|
||||
Path(__file__).parent.parent / "mempalace_taxonomy.yaml",
|
||||
Path("/tmp/nexus_clone/docs/mempalace_taxonomy.yaml"),
|
||||
Path(__file__).parent.parent.parent / "the-nexus" / "docs" / "mempalace_taxonomy.yaml",
|
||||
]
|
||||
for c in candidates:
|
||||
if c.exists():
|
||||
with open(c) as f:
|
||||
return yaml.safe_load(f)
|
||||
return None
|
||||
|
||||
|
||||
def validate(path: Path):
|
||||
errors = []
|
||||
warnings = []
|
||||
|
||||
if not path.exists():
|
||||
errors.append(f"File not found: {path}")
|
||||
return errors, warnings
|
||||
|
||||
with open(path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data:
|
||||
errors.append("Empty or invalid YAML")
|
||||
return errors, warnings
|
||||
|
||||
rooms = data.get("rooms", data.get("wings", {}).get("bezalel", {}).get("rooms", []))
|
||||
if isinstance(rooms, list) and rooms and isinstance(rooms[0], dict):
|
||||
room_names = {r.get("name") for r in rooms if isinstance(r, dict)}
|
||||
elif isinstance(rooms, dict):
|
||||
room_names = set(rooms.keys())
|
||||
else:
|
||||
room_names = set()
|
||||
|
||||
missing = REQUIRED_ROOMS - room_names
|
||||
if missing:
|
||||
errors.append(f"Missing required rooms: {', '.join(sorted(missing))}")
|
||||
|
||||
# Check for duplicate room names
|
||||
if len(room_names) < len(list(rooms) if isinstance(rooms, list) else rooms):
|
||||
errors.append("Duplicate room names detected")
|
||||
|
||||
# Check for empty keywords
|
||||
if isinstance(rooms, list):
|
||||
for r in rooms:
|
||||
if isinstance(r, dict):
|
||||
kw = r.get("keywords", [])
|
||||
if not kw:
|
||||
warnings.append(f"Room '{r.get('name')}' has no keywords")
|
||||
|
||||
standard = load_standard()
|
||||
if standard:
|
||||
std_optional = set(standard.get("optional_rooms", {}).keys())
|
||||
unknown = room_names - REQUIRED_ROOMS - std_optional
|
||||
if unknown:
|
||||
warnings.append(f"Non-standard rooms (OK but not in fleet spec): {', '.join(sorted(unknown))}")
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Validate MemPalace taxonomy")
|
||||
parser.add_argument("config", help="Path to mempalace.yaml")
|
||||
parser.add_argument("--ci", action="store_true", help="CI mode: fail on warnings too")
|
||||
args = parser.parse_args()
|
||||
|
||||
errors, warnings = validate(Path(args.config))
|
||||
|
||||
if warnings:
|
||||
for w in warnings:
|
||||
print(f"WARNING: {w}")
|
||||
|
||||
if errors:
|
||||
for e in errors:
|
||||
print(f"ERROR: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if args.ci and warnings:
|
||||
print("Validation failed in CI mode (warnings treated as errors)")
|
||||
sys.exit(1)
|
||||
|
||||
print("OK: Taxonomy validation passed")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "forge",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "CI pipeline green on main. All 253 tests passing.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
},
|
||||
{
|
||||
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "hermes",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
|
||||
"source_file": "hermes.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "issues",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
|
||||
"source_file": "issues.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
334
tests/test_bezalel_heartbeat.py
Normal file
334
tests/test_bezalel_heartbeat.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""
|
||||
Tests for Bezalel Cron Heartbeat system (poka-yoke #1096).
|
||||
|
||||
Validates:
|
||||
- check_cron_heartbeats() with healthy and stale jobs
|
||||
- Empty heartbeat dir (no .last files) returns safely
|
||||
- Corrupt JSON in a .last file is handled gracefully
|
||||
- Mixed healthy/stale jobs
|
||||
- Alert file writing (write_alert)
|
||||
- The 2× interval staleness threshold is applied correctly
|
||||
|
||||
Uses importlib to load bin/bezalel_heartbeat_check.py without __init__.py,
|
||||
following the same pattern as test_nexus_watchdog.py.
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Load module under test ────────────────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check_test",
|
||||
PROJECT_ROOT / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["bezalel_heartbeat_check_test"] = _hb_mod
|
||||
_hb_spec.loader.exec_module(_hb_mod)
|
||||
|
||||
check_cron_heartbeats = _hb_mod.check_cron_heartbeats
|
||||
write_alert = _hb_mod.write_alert
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def write_heartbeat_file(
|
||||
directory: Path,
|
||||
job: str,
|
||||
timestamp: float,
|
||||
interval: int = 3600,
|
||||
pid: int = 12345,
|
||||
) -> Path:
|
||||
"""Write a valid .last heartbeat file for testing."""
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"job": job,
|
||||
"timestamp": timestamp,
|
||||
"interval": interval,
|
||||
"pid": pid,
|
||||
}
|
||||
path = directory / f"{job}.last"
|
||||
path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCheckCronHeartbeats:
|
||||
|
||||
def test_healthy_job(self, tmp_path: Path) -> None:
|
||||
"""A job with a recent timestamp is reported as healthy."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "morning-report", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
assert len(result["jobs"]) == 1
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "morning-report"
|
||||
assert job["healthy"] is True
|
||||
assert job["age_secs"] == pytest.approx(100, abs=5)
|
||||
assert "OK" in job["message"]
|
||||
|
||||
def test_stale_job(self, tmp_path: Path) -> None:
|
||||
"""A job silent for > 2× its interval is reported as stale."""
|
||||
now = time.time()
|
||||
# 3 hours ago with 1-hour interval → 3 > 2×1 → stale
|
||||
write_heartbeat_file(tmp_path, "hourly-sync", timestamp=now - 10800, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "hourly-sync"
|
||||
assert job["healthy"] is False
|
||||
assert "STALE" in job["message"]
|
||||
assert "exceeds 2x threshold" in job["message"]
|
||||
|
||||
def test_just_within_threshold(self, tmp_path: Path) -> None:
|
||||
"""A job at exactly 2× interval is NOT stale (threshold is strictly >)."""
|
||||
fake_now = 1700000000.0
|
||||
# age = 7200, threshold = 2 * 3600 = 7200 — NOT stale (not strictly greater)
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=fake_now - 7200, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
# age_secs == 7200 and threshold = 7200, so not stale (age > threshold is False)
|
||||
assert result["stale_count"] == 0
|
||||
|
||||
def test_stale_threshold_just_over(self, tmp_path: Path) -> None:
|
||||
"""A job silent for 2× interval + 1 second is stale."""
|
||||
now = time.time()
|
||||
# age = 7201, threshold = 7200 — IS stale
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=now - 7201, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
|
||||
def test_empty_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Empty heartbeat directory returns zero jobs without error."""
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
assert "checked_at" in result
|
||||
|
||||
def test_nonexistent_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Non-existent heartbeat dir returns empty result without error."""
|
||||
missing = str(tmp_path / "does-not-exist")
|
||||
result = check_cron_heartbeats(missing)
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
|
||||
def test_corrupt_json_handled_gracefully(self, tmp_path: Path) -> None:
|
||||
"""Corrupt JSON in a .last file is reported as stale with an error message."""
|
||||
bad_file = tmp_path / "broken-job.last"
|
||||
bad_file.write_text("{this is not valid json!}", encoding="utf-8")
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "broken-job"
|
||||
assert job["healthy"] is False
|
||||
assert "CORRUPT" in job["message"]
|
||||
assert job["last_seen"] is None
|
||||
|
||||
def test_multiple_jobs_mixed(self, tmp_path: Path) -> None:
|
||||
"""Mixed healthy and stale jobs are correctly counted."""
|
||||
now = time.time()
|
||||
|
||||
# 3 healthy jobs (recent)
|
||||
write_heartbeat_file(tmp_path, "job-a", timestamp=now - 60, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-b", timestamp=now - 1800, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-c", timestamp=now - 3599, interval=3600)
|
||||
|
||||
# 2 stale jobs
|
||||
write_heartbeat_file(tmp_path, "job-d", timestamp=now - 10000, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-e", timestamp=now - 86400, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 2
|
||||
assert result["healthy_count"] == 3
|
||||
assert len(result["jobs"]) == 5
|
||||
|
||||
stale_jobs = {j["job"] for j in result["jobs"] if not j["healthy"]}
|
||||
healthy_jobs = {j["job"] for j in result["jobs"] if j["healthy"]}
|
||||
assert stale_jobs == {"job-d", "job-e"}
|
||||
assert healthy_jobs == {"job-a", "job-b", "job-c"}
|
||||
|
||||
def test_result_contains_required_keys(self, tmp_path: Path) -> None:
|
||||
"""Result dict contains all required keys."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "test-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert "checked_at" in result
|
||||
assert "jobs" in result
|
||||
assert "stale_count" in result
|
||||
assert "healthy_count" in result
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert "job" in job
|
||||
assert "healthy" in job
|
||||
assert "age_secs" in job
|
||||
assert "interval" in job
|
||||
assert "last_seen" in job
|
||||
assert "message" in job
|
||||
|
||||
def test_job_last_seen_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""last_seen field is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "ts-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
job = result["jobs"][0]
|
||||
|
||||
# Should be parseable as an ISO timestamp
|
||||
assert job["last_seen"] is not None
|
||||
dt = datetime.fromisoformat(job["last_seen"])
|
||||
assert dt is not None
|
||||
|
||||
def test_checked_at_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""checked_at is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
dt = datetime.fromisoformat(result["checked_at"])
|
||||
assert dt is not None
|
||||
|
||||
def test_custom_interval_applied(self, tmp_path: Path) -> None:
|
||||
"""Custom interval (e.g. daily) is respected for stale detection."""
|
||||
now = time.time()
|
||||
# 25 hours ago with 12-hour interval → 25 > 2×12 = 24 → stale
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 90000, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
job = result["jobs"][0]
|
||||
assert job["interval"] == 43200
|
||||
assert not job["healthy"]
|
||||
|
||||
def test_custom_interval_healthy(self, tmp_path: Path) -> None:
|
||||
"""Job within 2× custom interval is healthy."""
|
||||
now = time.time()
|
||||
# 23 hours ago with 12-hour interval → 23 < 2×12 = 24 → healthy
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 82800, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
|
||||
def test_deterministic_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Test with mocked time.time() for fully deterministic assertion."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
write_heartbeat_file(tmp_path, "frozen-job", timestamp=fake_now - 500, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
# age should be exactly 500s
|
||||
assert job["age_secs"] == pytest.approx(500.0, abs=0.01)
|
||||
assert job["healthy"] is True # 500 < 7200
|
||||
|
||||
def test_stale_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Stale detection with mocked time is exact."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
# 8000s ago with 3600s interval → 8000 > 7200 → stale
|
||||
write_heartbeat_file(tmp_path, "frozen-stale", timestamp=fake_now - 8000, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["age_secs"] == pytest.approx(8000.0, abs=0.01)
|
||||
assert job["healthy"] is False
|
||||
|
||||
|
||||
class TestWriteAlert:
|
||||
|
||||
def test_alert_file_created(self, tmp_path: Path) -> None:
|
||||
"""write_alert creates an alert file in the alerts subdirectory."""
|
||||
job_info = {
|
||||
"job": "test-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-01-01T00:00:00+00:00",
|
||||
"message": "STALE (last 8000s ago, interval 3600s — exceeds 2x threshold of 7200s)",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "test-job.alert"
|
||||
assert alert_file.exists()
|
||||
|
||||
def test_alert_file_content(self, tmp_path: Path) -> None:
|
||||
"""Alert file contains correct JSON fields."""
|
||||
job_info = {
|
||||
"job": "my-job",
|
||||
"healthy": False,
|
||||
"age_secs": 9000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-06-01T12:00:00+00:00",
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "my-job.alert"
|
||||
data = json.loads(alert_file.read_text())
|
||||
|
||||
assert data["alert_level"] == "P1"
|
||||
assert data["job"] == "my-job"
|
||||
assert data["age_secs"] == 9000.0
|
||||
assert data["interval"] == 3600
|
||||
assert "detected_at" in data
|
||||
|
||||
def test_alert_no_partial_files_left(self, tmp_path: Path) -> None:
|
||||
"""No temp files remain after a successful write."""
|
||||
job_info = {
|
||||
"job": "clean-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alerts_dir = tmp_path / "alerts"
|
||||
# Only the .alert file should exist — no .tmp files
|
||||
files = list(alerts_dir.iterdir())
|
||||
assert len(files) == 1
|
||||
assert files[0].suffix == ".alert"
|
||||
239
tests/test_mempalace_fleet_api.py
Normal file
239
tests/test_mempalace_fleet_api.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import handler directly so we can test without running a server process.
|
||||
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeSocket:
|
||||
"""Minimal socket stub for BaseHTTPRequestHandler."""
|
||||
|
||||
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
|
||||
return io.BytesIO(b"")
|
||||
|
||||
|
||||
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
|
||||
"""Construct a handler pointed at *path*, capture wfile output."""
|
||||
buf = io.BytesIO()
|
||||
request = _FakeSocket()
|
||||
client_address = ("127.0.0.1", 0)
|
||||
|
||||
handler = FleetAPIHandler.__new__(FleetAPIHandler)
|
||||
handler.path = path
|
||||
handler.request = request
|
||||
handler.client_address = client_address
|
||||
handler.server = MagicMock()
|
||||
handler.wfile = buf
|
||||
handler.rfile = io.BytesIO(b"")
|
||||
handler.command = "GET"
|
||||
handler._headers_buffer = []
|
||||
|
||||
# Stub send_response / send_header / end_headers to write minimal HTTP
|
||||
handler._response_code = None
|
||||
def _send_response(code, message=None): # noqa: ANN001
|
||||
handler._response_code = code
|
||||
def _send_header(k, v): # noqa: ANN001
|
||||
pass
|
||||
def _end_headers(): # noqa: ANN001
|
||||
pass
|
||||
handler.send_response = _send_response
|
||||
handler.send_header = _send_header
|
||||
handler.end_headers = _end_headers
|
||||
|
||||
return handler, buf
|
||||
|
||||
|
||||
def _parse_response(buf: io.BytesIO) -> dict:
|
||||
buf.seek(0)
|
||||
return json.loads(buf.read())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /health
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_health_returns_ok(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is True
|
||||
|
||||
|
||||
def test_health_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_search_fleet(results):
|
||||
"""Return a patch target that returns *results*."""
|
||||
mock = MagicMock(return_value=results)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
|
||||
from nexus.mempalace.searcher import MemPalaceResult
|
||||
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
|
||||
|
||||
|
||||
def test_search_missing_q_param():
|
||||
handler, buf = _make_handler("/search")
|
||||
_handle_search(handler, {})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_returns_results(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "chroma.sqlite3").touch()
|
||||
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
|
||||
|
||||
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
|
||||
handler, buf = _make_handler("/search?q=CI")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]):
|
||||
import importlib
|
||||
import mempalace.fleet_api as api_module
|
||||
# Patch search_fleet inside the handler's import context
|
||||
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
|
||||
_handle_search(handler, {"q": ["CI"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert data["count"] == 1
|
||||
assert data["results"][0]["text"] == "CI green"
|
||||
assert data["results"][0]["room"] == "forge"
|
||||
assert data["results"][0]["wing"] == "bezalel"
|
||||
assert data["results"][0]["score"] == 0.95
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_search_with_room_filter(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
result = _make_result()
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
|
||||
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
|
||||
|
||||
# Verify room was passed through
|
||||
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
|
||||
|
||||
|
||||
def test_search_invalid_n_param():
|
||||
handler, buf = _make_handler("/search?q=test&n=bad")
|
||||
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_palace_unavailable(monkeypatch):
|
||||
from nexus.mempalace.searcher import MemPalaceUnavailable
|
||||
|
||||
handler, buf = _make_handler("/search?q=test")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
|
||||
_handle_search(handler, {"q": ["test"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
def test_search_n_clamped_to_max():
|
||||
"""n > MAX_RESULTS is silently clamped."""
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
|
||||
handler = MagicMock()
|
||||
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
|
||||
|
||||
mock_sf.assert_called_once_with("test", room=None, n_results=50)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /wings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_wings_returns_list(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "bezalel").mkdir()
|
||||
(tmp_path / "timmy").mkdir()
|
||||
# A file should not appear in wings
|
||||
(tmp_path / "README.txt").touch()
|
||||
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert set(data["wings"]) == {"bezalel", "timmy"}
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_wings_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 404 unknown endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_unknown_endpoint():
|
||||
handler, buf = _make_handler("/foobar")
|
||||
handler.do_GET()
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 404
|
||||
assert "/search" in data["endpoints"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# audit fixture smoke test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_audit_fixture_is_clean():
|
||||
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
|
||||
from mempalace.audit_privacy import audit_palace
|
||||
|
||||
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
|
||||
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
|
||||
|
||||
result = audit_palace(fixture_dir)
|
||||
assert result.clean, (
|
||||
f"Privacy violations found in CI fixture:\n" +
|
||||
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
|
||||
)
|
||||
Reference in New Issue
Block a user