Compare commits

...

6 Commits

Author SHA1 Message Date
Bezalel
b445c04037 feat(ci): staging verification gate + review approval gate (#1095, #1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Staging Verification Gate / verify-staging (push) Has been cancelled
2026-04-07 14:58:39 +00:00
60bd9a05ff fix(security): replace broken branch protection scripts with Gitea-native sync (#1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:56:31 +00:00
c7468a3c6a [claude] Weekly privacy audit cron + fleet HTTP API (#1075) (#1109)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:54:48 +00:00
07a4be3bb9 [claude] Weekly privacy audit cron + fleet HTTP API (#1075) (#1109)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:54:41 +00:00
804536a3f2 feat(security): add fleet merge-review audit script (#1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:53:07 +00:00
Bezalel
a0ee7858ff feat(bezalel): MemPalace ecosystem — validation, audit, sync, auto-revert, Evennia integration
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:47:12 +00:00
29 changed files with 2370 additions and 43 deletions

View File

@@ -0,0 +1,21 @@
name: Review Approval Gate
on:
pull_request:
branches: [main]
jobs:
verify-review:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Verify PR has approving review
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
GITEA_REPO: Timmy_Foundation/the-nexus
PR_NUMBER: ${{ gitea.event.pull_request.number }}
run: |
python3 scripts/review_gate.py

View File

@@ -0,0 +1,20 @@
name: Staging Verification Gate
on:
push:
branches: [main]
jobs:
verify-staging:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Verify staging label on merge PR
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
GITEA_REPO: Timmy_Foundation/the-nexus
run: |
python3 scripts/staging_gate.py

View File

@@ -0,0 +1,28 @@
name: Weekly Privacy Audit
# Runs every Monday at 05:00 UTC against a CI test fixture.
# On production wizards this same script should be run via cron:
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
#
# Refs: #1083, #1075
on:
schedule:
- cron: "0 5 * * 1" # Monday 05:00 UTC
workflow_dispatch: {} # allow manual trigger
jobs:
privacy-audit:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.x"
- name: Run privacy audit against CI fixture
run: |
python mempalace/audit_privacy.py tests/fixtures/fleet_palace

View File

@@ -0,0 +1,246 @@
"""
Palace commands — bridge Evennia to the local MemPalace memory system.
"""
import json
import subprocess
from evennia.commands.command import Command
from evennia import create_object, search_object
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
def _search_mempalace(query, wing=None, room=None, n=5, fleet=False):
"""Call the helper script and return parsed results."""
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
cmd.append(wing or "none")
cmd.append(room or "none")
cmd.append(str(n))
if fleet:
cmd.append("--fleet")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
data = json.loads(result.stdout)
return data.get("results", [])
except Exception:
return []
def _get_wing(caller):
"""Return the caller's wing, defaulting to their key or 'general'."""
return caller.db.wing if caller.attributes.has("wing") else (caller.key.lower() if caller.key else "general")
class CmdPalaceSearch(Command):
"""
Search your memory palace.
Usage:
palace/search <query>
palace/search <query> [--room <room>]
palace/recall <topic>
palace/file <name> = <content>
palace/status
"""
key = "palace"
aliases = ["pal"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Usage: palace/search <query> | palace/recall <topic> | palace/file <name> = <content> | palace/status")
return
parts = self.args.strip().split(" ", 1)
subcmd = parts[0].lower()
rest = parts[1] if len(parts) > 1 else ""
if subcmd == "search":
self._do_search(rest)
elif subcmd == "recall":
self._do_recall(rest)
elif subcmd == "file":
self._do_file(rest)
elif subcmd == "status":
self._do_status()
else:
self._do_search(self.args.strip())
def _do_search(self, query):
if not query:
self.caller.msg("Search for what?")
return
self.caller.msg(f"Searching the palace for: |c{query}|n...")
wing = _get_wing(self.caller)
results = _search_mempalace(query, wing=wing)
if not results:
self.caller.msg("The palace is silent on that matter.")
return
lines = []
for i, r in enumerate(results[:5], 1):
room = r.get("room", "unknown")
source = r.get("source", "unknown")
content = r.get("content", "")[:400]
lines.append(f"\n|g[{i}]|n |c{room}|n — |x{source}|n")
lines.append(f"{content}\n")
self.caller.msg("\n".join(lines))
def _do_recall(self, topic):
if not topic:
self.caller.msg("Recall what topic?")
return
results = _search_mempalace(topic, wing=_get_wing(self.caller), n=1)
if not results:
self.caller.msg("Nothing to recall.")
return
r = results[0]
content = r.get("content", "")
source = r.get("source", "unknown")
from typeclasses.memory_object import MemoryObject
obj = create_object(
MemoryObject,
key=f"memory:{topic}",
location=self.caller.location,
)
obj.db.memory_content = content
obj.db.source_file = source
obj.db.room_name = r.get("room", "general")
self.caller.location.msg_contents(
f"$You() conjure() a memory shard from the palace: |m{obj.key}|n.",
from_obj=self.caller,
)
def _do_file(self, rest):
if "=" not in rest:
self.caller.msg("Usage: palace/file <name> = <content>")
return
name, content = rest.split("=", 1)
name = name.strip()
content = content.strip()
if not name or not content:
self.caller.msg("Both name and content are required.")
return
from typeclasses.memory_object import MemoryObject
obj = create_object(
MemoryObject,
key=f"memory:{name}",
location=self.caller.location,
)
obj.db.memory_content = content
obj.db.source_file = f"filed by {self.caller.key}"
obj.db.room_name = self.caller.location.key if self.caller.location else "general"
self.caller.location.msg_contents(
f"$You() file() a new memory in the palace: |m{obj.key}|n.",
from_obj=self.caller,
)
def _do_status(self):
cmd = [
"/root/wizards/bezalel/hermes/venv/bin/mempalace",
"--palace", "/root/wizards/bezalel/.mempalace/palace",
"status"
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
self.caller.msg(result.stdout or result.stderr)
except Exception as e:
self.caller.msg(f"Could not reach the palace: {e}")
class CmdRecall(Command):
"""
Recall a memory from the palace.
Usage:
recall <query>
recall <query> --fleet
recall <query> --room <room>
"""
key = "recall"
aliases = ["remember", "mem"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Recall what? Usage: recall <query> [--fleet] [--room <room>]")
return
args = self.args.strip()
fleet = "--fleet" in args
room = None
if "--room" in args:
parts = args.split("--room")
args = parts[0].strip()
room = parts[1].strip().split()[0] if len(parts) > 1 else None
if "--fleet" in args:
args = args.replace("--fleet", "").strip()
self.caller.msg(f"Recalling from the {'fleet' if fleet else 'personal'} palace: |c{args}|n...")
wing = None if fleet else _get_wing(self.caller)
results = _search_mempalace(args, wing=wing, room=room, n=5, fleet=fleet)
if not results:
self.caller.msg("The palace is silent on that matter.")
return
lines = []
for i, r in enumerate(results[:5], 1):
room_name = r.get("room", "unknown")
source = r.get("source", "unknown")
content = r.get("content", "")[:400]
wing_label = r.get("wing", "unknown")
wing_tag = f" |y[{wing_label}]|n" if fleet else ""
lines.append(f"\n|g[{i}]|n |c{room_name}|n{wing_tag} — |x{source}|n")
lines.append(f"{content}\n")
self.caller.msg("\n".join(lines))
class CmdEnterRoom(Command):
"""
Enter a room in the mind palace by topic.
Usage:
enter room <topic>
"""
key = "enter room"
aliases = ["enter palace", "go room"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Enter which room? Usage: enter room <topic>")
return
topic = self.args.strip().lower().replace(" ", "-")
wing = _get_wing(self.caller)
room_key = f"palace:{wing}:{topic}"
# Search for existing room
rooms = search_object(room_key, typeclass="typeclasses.palace_room.PalaceRoom")
if rooms:
room = rooms[0]
else:
# Create the room dynamically
from typeclasses.palace_room import PalaceRoom
room = create_object(
PalaceRoom,
key=room_key,
)
room.db.memory_topic = topic
room.db.wing = wing
room.update_description()
self.caller.move_to(room, move_type="teleport")
self.caller.msg(f"You step into the |c{topic}|n room of your mind palace.")

View File

@@ -0,0 +1,166 @@
"""
Live memory commands — write new memories into the palace from Evennia.
"""
import json
import subprocess
from evennia.commands.command import Command
from evennia import create_object
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
ADDER_SCRIPT = "/root/wizards/bezalel/evennia/palace_add.py"
def _add_drawer(content, wing, room, source):
"""Add a verbatim drawer to the palace via the helper script."""
cmd = [
"/root/wizards/bezalel/hermes/venv/bin/python",
ADDER_SCRIPT,
content,
wing,
room,
source,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return result.returncode == 0 and "OK" in result.stdout
except Exception:
return False
class CmdRecord(Command):
"""
Record a decision into the palace hall_facts.
Usage:
record <text>
record We decided to use PostgreSQL over MySQL.
"""
key = "record"
aliases = ["decide"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Record what decision? Usage: record <text>")
return
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
text = self.args.strip()
full_text = f"DECISION ({wing}): {text}\nRecorded by {self.caller.key} via Evennia."
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
if ok:
self.caller.location.msg_contents(
f"$You() record() a decision in the palace archives.",
from_obj=self.caller,
)
else:
self.caller.msg("The palace scribes could not write that down.")
class CmdNote(Command):
"""
Note a breakthrough into the palace hall_discoveries.
Usage:
note <text>
note The GraphQL schema can be auto-generated from our typeclasses.
"""
key = "note"
aliases = ["jot"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Note what? Usage: note <text>")
return
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
text = self.args.strip()
full_text = f"BREAKTHROUGH ({wing}): {text}\nNoted by {self.caller.key} via Evennia."
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
if ok:
self.caller.location.msg_contents(
f"$You() inscribe() a breakthrough into the palace scrolls.",
from_obj=self.caller,
)
else:
self.caller.msg("The palace scribes could not write that down.")
class CmdEvent(Command):
"""
Log an event into the palace hall_events.
Usage:
event <text>
event Gitea runner came back online after being offline for 6 hours.
"""
key = "event"
aliases = ["log"]
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if not self.args.strip():
self.caller.msg("Log what event? Usage: event <text>")
return
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
text = self.args.strip()
full_text = f"EVENT ({wing}): {text}\nLogged by {self.caller.key} via Evennia."
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
if ok:
self.caller.location.msg_contents(
f"$You() chronicle() an event in the palace records.",
from_obj=self.caller,
)
else:
self.caller.msg("The palace scribes could not write that down.")
class CmdPalaceWrite(Command):
"""
Directly write a memory into a specific palace room.
Usage:
palace/write <room> = <text>
"""
key = "palace/write"
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
if "=" not in self.args:
self.caller.msg("Usage: palace/write <room> = <text>")
return
room, text = self.args.split("=", 1)
room = room.strip()
text = text.strip()
if not room or not text:
self.caller.msg("Both room and text are required.")
return
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
full_text = f"MEMORY ({wing}/{room}): {text}\nWritten by {self.caller.key} via Evennia."
ok = _add_drawer(full_text, wing, room, f"evennia:{self.caller.key}")
if ok:
self.caller.location.msg_contents(
f"$You() etch() a memory into the |c{room}|n room of the palace.",
from_obj=self.caller,
)
else:
self.caller.msg("The palace scribes could not write that down.")

View File

@@ -0,0 +1,105 @@
"""
Steward commands — ask a palace steward about memories.
"""
from evennia.commands.command import Command
from evennia import search_object
class CmdAskSteward(Command):
"""
Ask a steward NPC about a topic from the palace memory.
Usage:
ask <steward> about <topic>
ask <steward> about <topic> --fleet
Example:
ask bezalel-steward about nightly watch
ask bezalel-steward about runner outage --fleet
"""
key = "ask"
aliases = ["question"]
locks = "cmd:all()"
help_category = "Mind Palace"
def parse(self):
"""Parse 'ask <target> about <topic>' syntax."""
raw = self.args.strip()
fleet = "--fleet" in raw
if fleet:
raw = raw.replace("--fleet", "").strip()
if " about " in raw.lower():
parts = raw.split(" about ", 1)
self.target_name = parts[0].strip()
self.topic = parts[1].strip()
else:
self.target_name = ""
self.topic = raw
self.fleet = fleet
def func(self):
if not self.args.strip():
self.caller.msg("Usage: ask <steward> about <topic> [--fleet]")
return
self.parse()
if not self.target_name:
self.caller.msg("Ask whom? Usage: ask <steward> about <topic>")
return
# Find steward NPC in current room
stewards = [
obj for obj in self.caller.location.contents
if hasattr(obj, "respond_to_question")
and self.target_name.lower() in obj.key.lower()
]
if not stewards:
self.caller.msg(f"There is no steward here matching '{self.target_name}'.")
return
steward = stewards[0]
self.caller.msg(f"You ask |c{steward.key}|n about '{self.topic}'...")
steward.respond_to_question(self.topic, self.caller, fleet=self.fleet)
class CmdSummonSteward(Command):
"""
Summon your wing's steward NPC to your current location.
Usage:
summon steward
"""
key = "summon steward"
locks = "cmd:all()"
help_category = "Mind Palace"
def func(self):
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
steward_key = f"{wing}-steward"
# Search for existing steward
from typeclasses.steward_npc import StewardNPC
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
if stewards:
steward = stewards[0]
steward.move_to(self.caller.location, move_type="teleport")
self.caller.location.msg_contents(
f"A shimmer of light coalesces into |c{steward.key}|n.",
from_obj=self.caller,
)
else:
steward = StewardNPC.create(steward_key)[0]
steward.db.wing = wing
steward.db.steward_name = self.caller.key
steward.move_to(self.caller.location, move_type="teleport")
self.caller.location.msg_contents(
f"You call forth |c{steward.key}|n from the palace archives.",
from_obj=self.caller,
)

View File

@@ -0,0 +1,83 @@
"""
Hall of Wings — Builds the central MemPalace zone in Evennia.
Usage (from Evennia shell or script):
from world.hall_of_wings import build_hall_of_wings
build_hall_of_wings()
"""
from evennia import create_object
from typeclasses.palace_room import PalaceRoom
from typeclasses.steward_npc import StewardNPC
from typeclasses.rooms import Room
from typeclasses.exits import Exit
HALL_KEY = "hall_of_wings"
HALL_NAME = "Hall of Wings"
DEFAULT_WINGS = [
"bezalel",
"timmy",
"allegro",
"ezra",
]
def build_hall_of_wings():
"""Create or update the central Hall of Wings and attach steward chambers."""
# Find or create the hall
from evennia import search_object
halls = search_object(HALL_KEY, typeclass="typeclasses.rooms.Room")
if halls:
hall = halls[0]
else:
hall = create_object(Room, key=HALL_KEY)
hall.db.desc = (
"|cThe Hall of Wings|n\n"
"A vast circular chamber of pale stone and shifting starlight.\n"
"Arched doorways line the perimeter, each leading to a steward's chamber.\n"
"Here, the memories of the fleet converge.\n\n"
"Use |wsummon steward|n to call your wing's steward, or\n"
"|wask <steward> about <topic>|n to query the palace archives."
)
for wing in DEFAULT_WINGS:
chamber_key = f"chamber:{wing}"
chambers = search_object(chamber_key, typeclass="typeclasses.palace_room.PalaceRoom")
if chambers:
chamber = chambers[0]
else:
chamber = create_object(PalaceRoom, key=chamber_key)
chamber.db.memory_topic = wing
chamber.db.wing = wing
chamber.db.desc = (
f"|cThe Chamber of {wing.title()}|n\n"
f"This room holds the accumulated memories of the {wing} wing.\n"
f"A steward stands ready to answer questions."
)
chamber.update_description()
# Link hall <-> chamber with exits
exit_name = f"{wing}-chamber"
existing_exits = [ex for ex in hall.exits if ex.key == exit_name]
if not existing_exits:
create_object(Exit, key=exit_name, location=hall, destination=chamber)
return_exits = [ex for ex in chamber.exits if ex.key == "hall"]
if not return_exits:
create_object(Exit, key="hall", location=chamber, destination=hall)
# Place or summon steward
steward_key = f"{wing}-steward"
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
if stewards:
steward = stewards[0]
if steward.location != chamber:
steward.move_to(chamber, move_type="teleport")
else:
steward = create_object(StewardNPC, key=steward_key)
steward.db.wing = wing
steward.db.steward_name = wing.title()
steward.move_to(chamber, move_type="teleport")
return hall

View File

@@ -0,0 +1,87 @@
"""
PalaceRoom
A Room that represents a topic in the memory palace.
Memory objects spawned here embody concepts retrieved from mempalace.
Its description auto-populates from a palace search on the memory topic.
"""
import json
import subprocess
from evennia.objects.objects import DefaultRoom
from .objects import ObjectParent
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
class PalaceRoom(ObjectParent, DefaultRoom):
"""
A room in the mind palace. Its db.memory_topic describes what
kind of memories are stored here. The description is populated
from a live MemPalace search.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.memory_topic = ""
self.db.wing = "bezalel"
self.db.desc = (
f"This is the |c{self.key}|n room of your mind palace.\n"
"Memories and concepts drift here like motes of light.\n"
"Use |wpalace/search <query>|n or |wrecall <topic>|n to summon memories."
)
def _search_palace(self, query, wing=None, room=None, n=3):
"""Call the helper script and return parsed results."""
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
cmd.append(wing or "none")
cmd.append(room or "none")
cmd.append(str(n))
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
data = json.loads(result.stdout)
return data.get("results", [])
except Exception:
return []
def update_description(self):
"""Refresh the room description from a palace search on its topic."""
topic = self.db.memory_topic or self.key.split(":")[-1] if ":" in self.key else self.key
wing = self.db.wing or "bezalel"
results = self._search_palace(topic, wing=wing, n=3)
header = (
f"=|c {topic.upper()} |n="
)
desc_lines = [
header,
f"You stand in the |c{topic}|n room of the |y{wing}|n wing.",
"Memories drift here like motes of light.",
"",
]
if results:
desc_lines.append("|gNearby memories:|n")
for i, r in enumerate(results, 1):
content = r.get("content", "")[:200]
source = r.get("source", "unknown")
room_name = r.get("room", "unknown")
desc_lines.append(f" |m[{i}]|n |c{room_name}|n — {content}... |x({source})|n")
else:
desc_lines.append("|xThe palace is quiet here. No memories resonate with this topic yet.|n")
desc_lines.append("")
desc_lines.append("Use |wrecall <query>|n to search deeper, or |wpalace/search <query>|n.")
self.db.desc = "\n".join(desc_lines)
def at_object_receive(self, moved_obj, source_location, **kwargs):
"""Refresh description when someone enters."""
if moved_obj.has_account:
self.update_description()
super().at_object_receive(moved_obj, source_location, **kwargs)
def return_appearance(self, looker):
text = super().return_appearance(looker)
if self.db.memory_topic:
text += f"\n|xTopic: {self.db.memory_topic}|n"
return text

View File

@@ -0,0 +1,70 @@
"""
StewardNPC
A palace steward NPC that answers questions by querying the local
or fleet MemPalace backend. One steward per wizard wing.
"""
import json
import subprocess
from evennia.objects.objects import DefaultCharacter
from typeclasses.objects import ObjectParent
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
class StewardNPC(ObjectParent, DefaultCharacter):
"""
A steward of the mind palace. Ask it about memories,
decisions, or events from its wing.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.wing = "bezalel"
self.db.steward_name = "Bezalel"
self.db.desc = (
f"|c{self.key}|n stands here quietly, eyes like polished steel, "
"waiting to recall anything from the palace archives."
)
self.locks.add("get:false();delete:perm(Admin)")
def _search_palace(self, query, fleet=False, n=3):
cmd = [
"/root/wizards/bezalel/hermes/venv/bin/python",
PALACE_SCRIPT,
query,
"none" if fleet else self.db.wing,
"none",
str(n),
]
if fleet:
cmd.append("--fleet")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
data = json.loads(result.stdout)
return data.get("results", [])
except Exception:
return []
def _summarize_for_speech(self, results, query):
"""Convert search results into in-character dialogue."""
if not results:
return "I find no memory of that in the palace."
lines = [f"Regarding '{query}':"]
for r in results:
room = r.get("room", "unknown")
content = r.get("content", "")[:300]
source = r.get("source", "unknown")
lines.append(f" From the |c{room}|n room: {content}... |x[{source}]|n")
return "\n".join(lines)
def respond_to_question(self, question, asker, fleet=False):
results = self._search_palace(question, fleet=fleet, n=3)
speech = self._summarize_for_speech(results, question)
self.location.msg_contents(
f"|c{self.key}|n says to $you(asker): \"{speech}\"",
mapping={"asker": asker},
from_obj=self,
)

View File

@@ -1,44 +1,6 @@
#!/bin/bash
# Apply branch protections to all repositories
# Requires GITEA_TOKEN env var
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
for repo in "${REPOS[@]}"
do
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"required_reviews": 1,
"dismiss_stale_reviews": true,
"block_force_push": true,
"block_deletions": true
}'
done
#!/bin/bash
# Gitea API credentials
GITEA_TOKEN="your-personal-access-token"
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
# Repos to protect
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
for REPO in "${REPO[@]}"; do
echo "Configuring branch protection for $REPO..."
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "main",
"require_pull_request": true,
"required_approvals": 1,
"dismiss_stale_approvals": true,
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
"block_force_push": true,
"block_delete": true
}' \
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
done
# Wrapper for the canonical branch-protection sync script.
# Usage: ./gitea-branch-protection.sh
set -euo pipefail
cd "$(dirname "$0")"
python3 scripts/sync_branch_protection.py

186
mempalace/fleet_api.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
Exposes fleet memory search over HTTP so that Alpha servers and other
wizard deployments can query the palace without direct filesystem access.
Endpoints:
GET /health
Returns {"status": "ok", "palace": "<path>"}
GET /search?q=<query>[&room=<room>][&n=<int>]
Returns {"results": [...], "query": "...", "room": "...", "count": N}
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
GET /wings
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
Usage:
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
python mempalace/fleet_api.py
# Custom host/port/palace:
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
Refs: #1078, #1075
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
# Add repo root to path so we can import nexus.mempalace
_HERE = Path(__file__).resolve().parent
_REPO_ROOT = _HERE.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 7771
MAX_RESULTS = 50
def _get_palace_path() -> Path:
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
payload = json.dumps(body).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(payload)))
handler.end_headers()
handler.wfile.write(payload)
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
palace = _get_palace_path()
_json_response(handler, 200, {
"status": "ok",
"palace": str(palace),
"palace_exists": palace.exists(),
})
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
query_terms = qs.get("q", [""])
q = query_terms[0].strip() if query_terms else ""
if not q:
_json_response(handler, 400, {"error": "Missing required parameter: q"})
return
room_terms = qs.get("room", [])
room = room_terms[0].strip() if room_terms else None
n_terms = qs.get("n", [])
try:
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
except (ValueError, IndexError):
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
return
try:
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
except ImportError as exc:
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
return
try:
results = search_fleet(q, room=room, n_results=n)
except Exception as exc: # noqa: BLE001
_json_response(handler, 503, {"error": str(exc)})
return
_json_response(handler, 200, {
"query": q,
"room": room,
"count": len(results),
"results": [
{
"text": r.text,
"room": r.room,
"wing": r.wing,
"score": round(r.score, 4),
}
for r in results
],
})
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
"""Return distinct wizard wing names found in the fleet palace directory."""
palace = _get_palace_path()
if not palace.exists():
_json_response(handler, 503, {
"error": f"Fleet palace not found: {palace}",
})
return
wings = sorted({
p.name for p in palace.iterdir() if p.is_dir()
})
_json_response(handler, 200, {"wings": wings})
class FleetAPIHandler(BaseHTTPRequestHandler):
"""Request handler for the fleet memory API."""
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
# Prefix with tag for easier log filtering
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
def do_GET(self) -> None: # noqa: N802
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
qs = parse_qs(parsed.query)
if path == "/health":
_handle_health(self)
elif path == "/search":
_handle_search(self, qs)
elif path == "/wings":
_handle_wings(self)
else:
_json_response(self, 404, {
"error": f"Unknown endpoint: {path}",
"endpoints": ["/health", "/search", "/wings"],
})
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
return HTTPServer((host, port), FleetAPIHandler)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Fleet palace HTTP API server."
)
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
args = parser.parse_args(argv)
palace = _get_palace_path()
print(f"[fleet_api] Palace: {palace}")
if not palace.exists():
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
server = make_server(args.host, args.port)
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n[fleet_api] Shutting down.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
Audit the fleet shared palace for privacy violations.
Ensures no raw drawers, full source paths, or private workspace leaks exist.
Usage:
python audit_mempalace_privacy.py /path/to/fleet/palace
Exit codes:
0 = clean
1 = violations found
"""
import sys
from pathlib import Path
try:
import chromadb
except ImportError:
print("ERROR: chromadb not installed")
sys.exit(1)
VIOLATION_KEYWORDS = [
"/root/wizards/",
"/home/",
"/Users/",
"private_key",
"-----BEGIN",
"GITEA_TOKEN=",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
]
def audit(palace_path: Path):
violations = []
client = chromadb.PersistentClient(path=str(palace_path))
try:
col = client.get_collection("mempalace_drawers")
except Exception as e:
print(f"ERROR: Could not open collection: {e}")
sys.exit(1)
all_data = col.get(include=["documents", "metadatas"])
docs = all_data["documents"]
metas = all_data["metadatas"]
for doc, meta in zip(docs, metas):
source = meta.get("source_file", "")
doc_type = meta.get("type", "")
# Rule 1: Fleet palace should only contain closets or explicitly typed entries
if doc_type not in ("closet", "summary", "fleet"):
violations.append(
f"VIOLATION: Document type is '{doc_type}' (expected closet/summary/fleet). "
f"Source: {source}"
)
# Rule 2: No full absolute paths from private workspaces
if any(abs_path in source for abs_path in ["/root/wizards/", "/home/", "/Users/"]):
violations.append(
f"VIOLATION: Source contains absolute path: {source}"
)
# Rule 3: No raw secrets in document text
for kw in VIOLATION_KEYWORDS:
if kw in doc:
violations.append(
f"VIOLATION: Document contains sensitive keyword '{kw}'. Source: {source}"
)
break # one violation per doc is enough
return violations
def main():
import argparse
parser = argparse.ArgumentParser(description="Audit fleet palace privacy")
parser.add_argument("palace", default="/var/lib/mempalace/fleet", nargs="?", help="Path to fleet palace")
args = parser.parse_args()
violations = audit(Path(args.palace))
if violations:
print(f"FAIL: {len(violations)} privacy violation(s) found")
for v in violations:
print(f" {v}")
sys.exit(1)
else:
print("PASS: No privacy violations detected")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Fleet Merge Review Audit
========================
Scans all Timmy_Foundation repos for merges in the last 7 days
and validates that each merged PR had at least one approving review.
Exit 0 = no unreviewed merges
Exit 1 = unreviewed merges found (and issues created if --create-issues)
Usage:
python scripts/audit_merge_reviews.py
python scripts/audit_merge_reviews.py --create-issues
"""
import os
import sys
import argparse
from datetime import datetime, timedelta, timezone
import urllib.request
import urllib.error
import json
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
DAYS_BACK = 7
SECURITY_LABEL = "security"
def api_request(path: str) -> dict | list:
url = f"{GITEA_URL}/api/v1{path}"
req = urllib.request.Request(url, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def api_post(path: str, payload: dict) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def get_repos() -> list[str]:
repos = []
page = 1
while True:
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
if not batch:
break
repos.extend([r["name"] for r in batch])
page += 1
return repos
def get_merged_prs(repo: str, since: str) -> list[dict]:
"""Get closed (merged) PRs updated since `since` (ISO format)."""
prs = []
page = 1
while True:
batch = api_request(
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
)
if not batch:
break
for pr in batch:
if pr.get("merged_at") and pr["merged_at"] >= since:
prs.append(pr)
elif pr.get("updated_at") and pr["updated_at"] < since:
return prs
page += 1
return prs
def get_reviews(repo: str, pr_number: int) -> list[dict]:
try:
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
except urllib.error.HTTPError as e:
if e.code == 404:
return []
raise
def create_post_mortem(repo: str, pr: dict) -> int | None:
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
body = (
f"## Unreviewed Merge Detected\n\n"
f"- **Repository:** `{ORG}/{repo}`\n"
f"- **PR:** #{pr['number']}{pr['title']}\n"
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
f"- **Merged at:** {pr['merged_at']}\n"
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
f"This merge had **zero approving reviews** at the time of merge.\n\n"
f"### Required Actions\n"
f"1. Validate the merge contents are safe.\n"
f"2. If malicious or incorrect, revert immediately.\n"
f"3. Document root cause (bypassed branch protection? direct push?).\n"
)
try:
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
"title": title,
"body": body,
"labels": [SECURITY_LABEL],
})
return issue.get("number")
except Exception as e:
print(f" FAILED to create issue: {e}")
return None
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
args = parser.parse_args()
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN environment variable not set.")
return 1
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
since = since_dt.isoformat()
repos = get_repos()
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
unreviewed_count = 0
for repo in repos:
merged = get_merged_prs(repo, since)
if not merged:
continue
repo_unreviewed = []
for pr in merged:
reviews = get_reviews(repo, pr["number"])
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if not approvals:
repo_unreviewed.append(pr)
if repo_unreviewed:
print(f"\n{repo}:")
for pr in repo_unreviewed:
print(f" ! UNREVIEWED merge: PR #{pr['number']}{pr['title']} ({pr['merged_at'][:10]})")
unreviewed_count += 1
if args.create_issues:
issue_num = create_post_mortem(repo, pr)
if issue_num:
print(f" → Created post-mortem issue the-nexus#{issue_num}")
print(f"\n{'='*60}")
if unreviewed_count == 0:
print("All merges in the last 7 days had at least one approving review.")
return 0
else:
print(f"Found {unreviewed_count} unreviewed merge(s).")
return 1
if __name__ == "__main__":
raise SystemExit(main())

50
scripts/backup_databases.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Bezalel Database Backup — MemPalace + Evennia + Fleet
# Runs nightly after re-mine completes. Keeps 7 days of rolling backups.
set -euo pipefail
BACKUP_BASE="/root/wizards/bezalel/home/backups"
DATE=$(date +%Y%m%d_%H%M%S)
LOG="/var/log/bezalel_db_backup.log"
# Sources
LOCAL_PALACE="/root/wizards/bezalel/.mempalace/palace"
FLEET_PALACE="/var/lib/mempalace/fleet"
EVENNIA_DB="/root/wizards/bezalel/evennia/bezalel_world/server/evennia.db3"
# Destinations
LOCAL_BACKUP="${BACKUP_BASE}/mempalace/mempalace_${DATE}.tar.gz"
FLEET_BACKUP="${BACKUP_BASE}/fleet/fleet_${DATE}.tar.gz"
EVENNIA_BACKUP="${BACKUP_BASE}/evennia/evennia_${DATE}.db3.gz"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
}
log "Starting database backup cycle..."
# 1. Backup local MemPalace
tar -czf "$LOCAL_BACKUP" -C "$(dirname "$LOCAL_PALACE")" "$(basename "$LOCAL_PALACE")"
log "Local palace backed up: ${LOCAL_BACKUP} ($(du -h "$LOCAL_BACKUP" | cut -f1))"
# 2. Backup fleet MemPalace
tar -czf "$FLEET_BACKUP" -C "$(dirname "$FLEET_PALACE")" "$(basename "$FLEET_PALACE")"
log "Fleet palace backed up: ${FLEET_BACKUP} ($(du -h "$FLEET_BACKUP" | cut -f1))"
# 3. Backup Evennia DB (gzip for space)
gzip -c "$EVENNIA_DB" > "$EVENNIA_BACKUP"
log "Evennia DB backed up: ${EVENNIA_BACKUP} ($(du -h "$EVENNIA_BACKUP" | cut -f1))"
# 4. Prune backups older than 7 days
find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' -mtime +7 -delete
find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' -mtime +7 -delete
find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' -mtime +7 -delete
log "Pruned backups older than 7 days"
# 5. Report counts
MP_COUNT=$(find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' | wc -l)
FL_COUNT=$(find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' | wc -l)
EV_COUNT=$(find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' | wc -l)
log "Backup cycle complete. Retained: mempalace=${MP_COUNT}, fleet=${FL_COUNT}, evennia=${EV_COUNT}"
touch /var/lib/bezalel/heartbeats/db_backup.last

135
scripts/ci_auto_revert.py Normal file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
CI Auto-Revert — Poka-yoke for broken merges.
Monitors the main branch post-merge and auto-reverts via local git if CI fails.
Usage:
python ci_auto_revert.py <repo_owner>/<repo_name>
python ci_auto_revert.py Timmy_Foundation/hermes-agent
Recommended cron: */10 * * * *
"""
import os
import sys
import json
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REVERT_WINDOW_MINUTES = 10
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def get_recent_commits(owner, repo, since):
since_iso = since.strftime("%Y-%m-%dT%H:%M:%SZ")
return api_call("GET", f"/repos/{owner}/{repo}/commits?sha=main&since={since_iso}&limit=20")
def get_commit_status(owner, repo, sha):
return api_call("GET", f"/repos/{owner}/{repo}/commits/{sha}/status")
def revert_via_git(clone_url, sha, msg, owner, repo):
with tempfile.TemporaryDirectory() as tmpdir:
# Clone with token
auth_url = clone_url.replace("https://", f"https://bezalel:{GITEA_TOKEN}@")
subprocess.run(["git", "clone", "--depth", "10", auth_url, tmpdir], check=True, capture_output=True)
# Configure git
subprocess.run(["git", "-C", tmpdir, "config", "user.email", "bezalel@timmy.foundation"], check=True, capture_output=True)
subprocess.run(["git", "-C", tmpdir, "config", "user.name", "Bezalel"], check=True, capture_output=True)
# Revert the commit
revert_msg = f"[auto-revert] {msg}\n\nOriginal commit {sha} failed CI."
result = subprocess.run(
["git", "-C", tmpdir, "revert", "--no-edit", "-m", revert_msg, sha],
capture_output=True,
text=True,
)
if result.returncode != 0:
return {"error": f"git revert failed: {result.stderr}"}
# Push
push_result = subprocess.run(
["git", "-C", tmpdir, "push", "origin", "main"],
capture_output=True,
text=True,
)
if push_result.returncode != 0:
return {"error": f"git push failed: {push_result.stderr}"}
return {"ok": True, "reverted_sha": sha}
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <owner/repo>")
sys.exit(1)
repo_full = sys.argv[1]
owner, repo = repo_full.split("/", 1)
since = datetime.now(timezone.utc) - timedelta(minutes=REVERT_WINDOW_MINUTES + 5)
commits = get_recent_commits(owner, repo, since)
if not isinstance(commits, list):
print(f"ERROR fetching commits: {commits}")
sys.exit(1)
reverted = 0
for commit in commits:
sha = commit.get("sha", "")
msg = commit.get("commit", {}).get("message", "").split("\n")[0]
commit_time = commit.get("commit", {}).get("committer", {}).get("date", "")
if not commit_time:
continue
commit_dt = datetime.fromisoformat(commit_time.replace("Z", "+00:00"))
age_min = (datetime.now(timezone.utc) - commit_dt).total_seconds() / 60
if age_min > REVERT_WINDOW_MINUTES:
continue
status = get_commit_status(owner, repo, sha)
state = status.get("state", "")
if state == "failure":
print(f"ALERT: Commit {sha[:8]} '{msg}' failed CI ({age_min:.1f}m old). Initiating revert...")
repo_info = api_call("GET", f"/repos/{owner}/{repo}")
clone_url = repo_info.get("clone_url", "")
if not clone_url:
print(f" Cannot find clone URL")
continue
result = revert_via_git(clone_url, sha, msg, owner, repo)
if "error" in result:
print(f" Revert failed: {result['error']}")
else:
print(f" Reverted successfully.")
reverted += 1
elif state == "success":
print(f"OK: Commit {sha[:8]} '{msg}' passed CI.")
elif state == "pending":
print(f"PENDING: Commit {sha[:8]} '{msg}' still running CI.")
else:
print(f"UNKNOWN: Commit {sha[:8]} '{msg}' has CI state '{state}'.")
if reverted == 0:
print("No broken merges found in the last 10 minutes.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Export closets from a local MemPalace wing for fleet-wide sharing.
Privacy rule: only summaries/closets are exported. No raw source_file paths.
Source filenames are anonymized to just the basename.
"""
import json
import sys
from pathlib import Path
import chromadb
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
FLEET_INCOMING = "/var/lib/mempalace/fleet/incoming"
WING = "bezalel"
DOCS_PER_ROOM = 5
def main():
client = chromadb.PersistentClient(path=PALACE_PATH)
col = client.get_collection("mempalace_drawers")
# Discover rooms in this wing
all_meta = col.get(include=["metadatas"])["metadatas"]
rooms = set()
for m in all_meta:
if m.get("wing") == WING:
rooms.add(m.get("room", "general"))
Path(FLEET_INCOMING).mkdir(parents=True, exist_ok=True)
closets = []
for room in sorted(rooms):
results = col.query(
query_texts=[room],
n_results=DOCS_PER_ROOM,
where={"$and": [{"wing": WING}, {"room": room}]},
include=["documents", "metadatas"],
)
docs = results["documents"][0]
metas = results["metadatas"][0]
entries = []
for doc, meta in zip(docs, metas):
# Sanitize content: strip absolute workspace paths
sanitized = doc[:800]
sanitized = sanitized.replace("/root/wizards/bezalel/", "~/")
sanitized = sanitized.replace("/root/wizards/", "~/")
sanitized = sanitized.replace("/home/bezalel/", "~/")
sanitized = sanitized.replace("/home/", "~/")
entries.append({
"content": sanitized,
"source_basename": Path(meta.get("source_file", "?")).name,
})
closet = {
"wing": WING,
"room": room,
"type": "closet",
"entries": entries,
}
closets.append(closet)
out_file = Path(FLEET_INCOMING) / f"{WING}_closets.json"
with open(out_file, "w") as f:
json.dump(closets, f, indent=2)
print(f"Exported {len(closets)} closets to {out_file}")
for c in closets:
print(f" {c['wing']} / {c['room']} : {len(c['entries'])} entries")
if __name__ == "__main__":
main()

24
scripts/mempalace_nightly.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env bash
# Bezalel MemPalace Nightly Re-mine + Fleet Sync
set -euo pipefail
PALACE="/root/wizards/bezalel/.mempalace/palace"
MINER="/root/wizards/bezalel/hermes/venv/bin/mempalace"
WING_DIR="/root/wizards/bezalel"
LOG="/var/log/bezalel_mempalace.log"
EXPORTER="/root/wizards/bezalel/hermes/venv/bin/python /root/wizards/bezalel/mempalace_export.py"
IMPORTER="/root/wizards/bezalel/hermes/venv/bin/python /var/lib/mempalace/fleet_import.py"
echo "[$(date -Iseconds)] Starting mempalace re-mine" >> "$LOG"
cd "$WING_DIR"
"$MINER" --palace "$PALACE" mine "$WING_DIR" --agent bezalel >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Finished mempalace re-mine" >> "$LOG"
"$MINER" --palace "$PALACE" status >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Starting fleet closet export" >> "$LOG"
$EXPORTER >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Starting fleet closet import" >> "$LOG"
$IMPORTER >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Fleet sync complete" >> "$LOG"
touch /var/lib/bezalel/heartbeats/mempalace_nightly.last

53
scripts/meta_heartbeat.sh Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Meta-heartbeat — checks all Bezalel cron jobs for stale timestamps
set -euo pipefail
HEARTBEAT_DIR="/var/lib/bezalel/heartbeats"
ALERT_LOG="/var/log/bezalel_meta_heartbeat.log"
STALE_MINUTES=30
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
mkdir -p "$HEARTBEAT_DIR"
# Define expected heartbeats: name => max_stale_minutes
HEARTBEATS=(
"nightly_watch:150" # 2.5h — runs at 02:00
"mempalace_nightly:150" # 2.5h — runs at 03:00
"db_backup:150" # 2.5h — runs at 03:30
"runner_health:15" # 15m — every 5 min
)
NOW_EPOCH=$(date +%s)
FAILURES=0
for entry in "${HEARTBEATS[@]}"; do
name="${entry%%:*}"
max_minutes="${entry##*:}"
file="${HEARTBEAT_DIR}/${name}.last"
if [[ ! -f "$file" ]]; then
log "MISSING: $name heartbeat file not found ($file)"
FAILURES=$((FAILURES + 1))
continue
fi
LAST_EPOCH=$(stat -c %Y "$file")
AGE_MIN=$(( (NOW_EPOCH - LAST_EPOCH) / 60 ))
if [[ $AGE_MIN -gt $max_minutes ]]; then
log "STALE: $name is ${AGE_MIN}m old (max ${max_minutes}m)"
FAILURES=$((FAILURES + 1))
else
log "OK: $name is ${AGE_MIN}m old"
fi
done
if [[ $FAILURES -gt 0 ]]; then
log "ALERT: $FAILURES stale/missing heartbeat(s) detected."
exit 1
else
log "ALL_OK: All heartbeats healthy."
fi

70
scripts/review_gate.py Normal file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""
Review Gate — Poka-yoke for unreviewed merges.
Fails if the current PR has fewer than 1 approving review.
Usage in Gitea workflow:
- name: Review Approval Gate
run: python scripts/review_gate.py
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
"""
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REPO = os.environ.get("GITEA_REPO", "")
PR_NUMBER = os.environ.get("PR_NUMBER", "")
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def main():
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
if not REPO:
print("ERROR: GITEA_REPO not set")
sys.exit(1)
pr_number = PR_NUMBER
if not pr_number:
# Try to infer from Gitea Actions environment
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
if not pr_number:
print("ERROR: Could not determine PR number")
sys.exit(1)
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
if isinstance(reviews, dict) and "error" in reviews:
print(f"ERROR fetching reviews: {reviews}")
sys.exit(1)
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
print("Merges are not permitted without at least one approval.")
sys.exit(1)
if __name__ == "__main__":
main()

46
scripts/runner_health_probe.sh Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Gitea Runner Health Probe — Poka-yoke for unregistered runners
set -euo pipefail
GITEA_TOKEN="${GITEA_TOKEN:-}"
GITEA_URL="https://forge.alexanderwhitestone.com"
ALERT_LOG="/var/log/bezalel_runner_health.log"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
if [[ -z "$GITEA_TOKEN" ]]; then
log "ERROR: GITEA_TOKEN not set"
exit 1
fi
ACTIVE_RUNNERS=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
log "Active runners: ${ACTIVE_RUNNERS}"
if [[ "$ACTIVE_RUNNERS" -eq 0 ]]; then
log "CRITICAL: Zero active runners detected. Attempting self-healing restart."
pkill -f "act_runner daemon" 2>/dev/null || true
sleep 2
cd /opt/gitea-runner && nohup ./act_runner daemon > /var/log/gitea-runner.log 2>&1 &
sleep 3
# Re-check
ACTIVE_RUNNERS_AFTER=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
log "Active runners after restart: ${ACTIVE_RUNNERS_AFTER}"
if [[ "$ACTIVE_RUNNERS_AFTER" -eq 0 ]]; then
log "CRITICAL: Self-healing failed. Runner still offline."
touch /var/lib/bezalel/heartbeats/runner_health.last
exit 1
else
log "RECOVERED: Runner back online."
fi
else
log "OK: ${ACTIVE_RUNNERS} runner(s) online."
fi
touch /var/lib/bezalel/heartbeats/runner_health.last

50
scripts/secret_guard.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Secret Guard — Poka-yoke for world-readable credentials
set -euo pipefail
ALERT_LOG="/var/log/bezalel_secret_guard.log"
QUARANTINE_DIR="/root/wizards/bezalel/home/quarantine"
mkdir -p "$QUARANTINE_DIR"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
# Scan for world-readable files with sensitive keywords in /root, /home, /etc, /tmp, /var/log
# Exclude binary files, large files (>1MB), and known safe paths
BAD_FILES=$(find /root /home /etc /tmp /var/log -maxdepth 4 -type f -perm /o+r 2>/dev/null \
! -path "*/.git/*" \
! -path "*/node_modules/*" \
! -path "*/venv/*" \
! -path "*/.venv/*" \
! -path "*/__pycache__/*" \
! -path "*/.pyc" \
! -size +1M \
-exec grep -l -i -E 'password|token|secret|nsec|api_key|private_key|aws_access_key_id|aws_secret_access_key' {} + 2>/dev/null | head -50)
VIOLATIONS=0
for file in $BAD_FILES; do
# Skip if already quarantined
if [[ "$file" == "$QUARANTINE_DIR"* ]]; then
continue
fi
# Skip log files that are expected to be world-readable
if [[ "$file" == /var/log/* ]]; then
continue
fi
VIOLATIONS=$((VIOLATIONS + 1))
basename=$(basename "$file")
quarantine_path="${QUARANTINE_DIR}/${basename}.$(date +%s)"
cp "$file" "$quarantine_path"
chmod 600 "$quarantine_path"
chmod 600 "$file"
log "QUARANTINED: $file -> $quarantine_path (permissions fixed to 600)"
done
if [[ $VIOLATIONS -gt 0 ]]; then
log "ALERT: $VIOLATIONS world-readable secret file(s) detected and quarantined."
else
log "OK: No world-readable secret files found."
fi

77
scripts/staging_gate.py Normal file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
Staging Gate — Poka-yoke for production deployments.
Checks if the PR that introduced the current commit was marked `staging-verified`.
Fails the workflow if not, blocking deploy.yml from proceeding.
Usage in Gitea workflow:
- name: Staging Verification Gate
run: python scripts/staging_gate.py
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
"""
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def get_commit_sha():
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
return result.stdout.strip()
def get_pr_for_commit(sha):
# Search open and closed PRs for this commit
for state in ["closed", "open"]:
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
if isinstance(prs, list):
for pr in prs:
if pr.get("merge_commit_sha") == sha:
return pr
return None
def main():
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
sha = get_commit_sha()
pr = get_pr_for_commit(sha)
if not pr:
# Direct push to main without PR — block unless explicitly forced
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
print("To bypass, merge via PR and add the 'staging-verified' label.")
sys.exit(1)
labels = {label["name"] for label in pr.get("labels", [])}
if "staging-verified" in labels:
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
print("Deploy to production is not permitted until staging is verified.")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
import os
import sys
import json
import urllib.request
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
}
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
else:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
ok = 0
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Sync Fleet MemPalace from Beta to Alpha
# Usage: ./sync_fleet_to_alpha.sh
set -euo pipefail
FLEET_DIR="/var/lib/mempalace/fleet"
ALPHA_HOST="167.99.126.228"
ALPHA_USER="root"
ALPHA_DEST="/var/lib/mempalace/fleet"
LOG="/var/log/bezalel_alpha_sync.log"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
}
log "Starting fleet palace sync to Alpha (${ALPHA_HOST})..."
# Ensure Alpha destination exists (SSH must be configured key-based or agent-forwarded)
ssh -o ConnectTimeout=10 "${ALPHA_USER}@${ALPHA_HOST}" "mkdir -p ${ALPHA_DEST}" || {
log "ERROR: Cannot reach Alpha host. Aborting."
exit 1
}
# rsync the fleet palace directory (ChromaDB files + incoming closets)
rsync -avz --delete \
-e "ssh -o ConnectTimeout=10" \
"${FLEET_DIR}/" \
"${ALPHA_USER}@${ALPHA_HOST}:${ALPHA_DEST}/" >> "$LOG" 2>&1
log "Fleet palace sync complete."

View File

@@ -0,0 +1,123 @@
#!/usr/bin/env python3
"""
Validate a wizard's mempalace.yaml against the fleet taxonomy standard.
Usage:
python validate_mempalace_taxonomy.py /path/to/mempalace.yaml
python validate_mempalace_taxonomy.py --ci /path/to/mempalace.yaml
Exit codes:
0 = valid
1 = missing required rooms or other violations
"""
import sys
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
sys.exit(1)
REQUIRED_ROOMS = {
"forge",
"hermes",
"nexus",
"issues",
"experiments",
}
def load_standard():
# Try to find the fleet standard in the-nexus clone or local path
candidates = [
Path(__file__).parent.parent / "mempalace_taxonomy.yaml",
Path("/tmp/nexus_clone/docs/mempalace_taxonomy.yaml"),
Path(__file__).parent.parent.parent / "the-nexus" / "docs" / "mempalace_taxonomy.yaml",
]
for c in candidates:
if c.exists():
with open(c) as f:
return yaml.safe_load(f)
return None
def validate(path: Path):
errors = []
warnings = []
if not path.exists():
errors.append(f"File not found: {path}")
return errors, warnings
with open(path) as f:
data = yaml.safe_load(f)
if not data:
errors.append("Empty or invalid YAML")
return errors, warnings
rooms = data.get("rooms", data.get("wings", {}).get("bezalel", {}).get("rooms", []))
if isinstance(rooms, list) and rooms and isinstance(rooms[0], dict):
room_names = {r.get("name") for r in rooms if isinstance(r, dict)}
elif isinstance(rooms, dict):
room_names = set(rooms.keys())
else:
room_names = set()
missing = REQUIRED_ROOMS - room_names
if missing:
errors.append(f"Missing required rooms: {', '.join(sorted(missing))}")
# Check for duplicate room names
if len(room_names) < len(list(rooms) if isinstance(rooms, list) else rooms):
errors.append("Duplicate room names detected")
# Check for empty keywords
if isinstance(rooms, list):
for r in rooms:
if isinstance(r, dict):
kw = r.get("keywords", [])
if not kw:
warnings.append(f"Room '{r.get('name')}' has no keywords")
standard = load_standard()
if standard:
std_optional = set(standard.get("optional_rooms", {}).keys())
unknown = room_names - REQUIRED_ROOMS - std_optional
if unknown:
warnings.append(f"Non-standard rooms (OK but not in fleet spec): {', '.join(sorted(unknown))}")
return errors, warnings
def main():
import argparse
parser = argparse.ArgumentParser(description="Validate MemPalace taxonomy")
parser.add_argument("config", help="Path to mempalace.yaml")
parser.add_argument("--ci", action="store_true", help="CI mode: fail on warnings too")
args = parser.parse_args()
errors, warnings = validate(Path(args.config))
if warnings:
for w in warnings:
print(f"WARNING: {w}")
if errors:
for e in errors:
print(f"ERROR: {e}")
sys.exit(1)
if args.ci and warnings:
print("Validation failed in CI mode (warnings treated as errors)")
sys.exit(1)
print("OK: Taxonomy validation passed")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,16 @@
{
"wizard": "bezalel",
"room": "forge",
"drawers": [
{
"text": "CI pipeline green on main. All 253 tests passing.",
"source_file": "forge.closet.json",
"closet": true
},
{
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
"source_file": "forge.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,11 @@
{
"wizard": "bezalel",
"room": "hermes",
"drawers": [
{
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
"source_file": "hermes.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,11 @@
{
"wizard": "bezalel",
"room": "issues",
"drawers": [
{
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
"source_file": "issues.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,239 @@
"""
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
Refs: #1078, #1075
"""
from __future__ import annotations
import io
import json
import threading
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Import handler directly so we can test without running a server process.
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class _FakeSocket:
"""Minimal socket stub for BaseHTTPRequestHandler."""
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
return io.BytesIO(b"")
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
"""Construct a handler pointed at *path*, capture wfile output."""
buf = io.BytesIO()
request = _FakeSocket()
client_address = ("127.0.0.1", 0)
handler = FleetAPIHandler.__new__(FleetAPIHandler)
handler.path = path
handler.request = request
handler.client_address = client_address
handler.server = MagicMock()
handler.wfile = buf
handler.rfile = io.BytesIO(b"")
handler.command = "GET"
handler._headers_buffer = []
# Stub send_response / send_header / end_headers to write minimal HTTP
handler._response_code = None
def _send_response(code, message=None): # noqa: ANN001
handler._response_code = code
def _send_header(k, v): # noqa: ANN001
pass
def _end_headers(): # noqa: ANN001
pass
handler.send_response = _send_response
handler.send_header = _send_header
handler.end_headers = _end_headers
return handler, buf
def _parse_response(buf: io.BytesIO) -> dict:
buf.seek(0)
return json.loads(buf.read())
# ---------------------------------------------------------------------------
# /health
# ---------------------------------------------------------------------------
def test_health_returns_ok(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
handler, buf = _make_handler("/health")
_handle_health(handler)
data = _parse_response(buf)
assert data["status"] == "ok"
assert data["palace_exists"] is True
def test_health_missing_palace(tmp_path, monkeypatch):
missing = tmp_path / "nonexistent"
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
handler, buf = _make_handler("/health")
_handle_health(handler)
data = _parse_response(buf)
assert data["status"] == "ok"
assert data["palace_exists"] is False
# ---------------------------------------------------------------------------
# /search
# ---------------------------------------------------------------------------
def _mock_search_fleet(results):
"""Return a patch target that returns *results*."""
mock = MagicMock(return_value=results)
return mock
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
from nexus.mempalace.searcher import MemPalaceResult
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
def test_search_missing_q_param():
handler, buf = _make_handler("/search")
_handle_search(handler, {})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 400
def test_search_returns_results(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
(tmp_path / "chroma.sqlite3").touch()
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
handler, buf = _make_handler("/search?q=CI")
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[result]):
import importlib
import mempalace.fleet_api as api_module
# Patch search_fleet inside the handler's import context
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
_handle_search(handler, {"q": ["CI"]})
data = _parse_response(buf)
assert data["count"] == 1
assert data["results"][0]["text"] == "CI green"
assert data["results"][0]["room"] == "forge"
assert data["results"][0]["wing"] == "bezalel"
assert data["results"][0]["score"] == 0.95
assert handler._response_code == 200
def test_search_with_room_filter(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
result = _make_result()
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
# Verify room was passed through
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
def test_search_invalid_n_param():
handler, buf = _make_handler("/search?q=test&n=bad")
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 400
def test_search_palace_unavailable(monkeypatch):
from nexus.mempalace.searcher import MemPalaceUnavailable
handler, buf = _make_handler("/search?q=test")
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
_handle_search(handler, {"q": ["test"]})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 503
def test_search_n_clamped_to_max():
"""n > MAX_RESULTS is silently clamped."""
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
handler = MagicMock()
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
mock_sf.assert_called_once_with("test", room=None, n_results=50)
# ---------------------------------------------------------------------------
# /wings
# ---------------------------------------------------------------------------
def test_wings_returns_list(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
(tmp_path / "bezalel").mkdir()
(tmp_path / "timmy").mkdir()
# A file should not appear in wings
(tmp_path / "README.txt").touch()
handler, buf = _make_handler("/wings")
_handle_wings(handler)
data = _parse_response(buf)
assert set(data["wings"]) == {"bezalel", "timmy"}
assert handler._response_code == 200
def test_wings_missing_palace(tmp_path, monkeypatch):
missing = tmp_path / "nonexistent"
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
handler, buf = _make_handler("/wings")
_handle_wings(handler)
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 503
# ---------------------------------------------------------------------------
# 404 unknown endpoint
# ---------------------------------------------------------------------------
def test_unknown_endpoint():
handler, buf = _make_handler("/foobar")
handler.do_GET()
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 404
assert "/search" in data["endpoints"]
# ---------------------------------------------------------------------------
# audit fixture smoke test
# ---------------------------------------------------------------------------
def test_audit_fixture_is_clean():
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
from mempalace.audit_privacy import audit_palace
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
result = audit_palace(fixture_dir)
assert result.clean, (
f"Privacy violations found in CI fixture:\n" +
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
)