Compare commits
6 Commits
groq/issue
...
groq/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b445c04037 | ||
| 60bd9a05ff | |||
| c7468a3c6a | |||
| 07a4be3bb9 | |||
| 804536a3f2 | |||
|
|
a0ee7858ff |
21
.gitea/workflows/review_gate.yml
Normal file
21
.gitea/workflows/review_gate.yml
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Review Approval Gate
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-review:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify PR has approving review
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
PR_NUMBER: ${{ gitea.event.pull_request.number }}
|
||||
run: |
|
||||
python3 scripts/review_gate.py
|
||||
20
.gitea/workflows/staging_gate.yml
Normal file
20
.gitea/workflows/staging_gate.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: Staging Verification Gate
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-staging:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify staging label on merge PR
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
run: |
|
||||
python3 scripts/staging_gate.py
|
||||
28
.gitea/workflows/weekly-audit.yml
Normal file
28
.gitea/workflows/weekly-audit.yml
Normal file
@@ -0,0 +1,28 @@
|
||||
name: Weekly Privacy Audit
|
||||
|
||||
# Runs every Monday at 05:00 UTC against a CI test fixture.
|
||||
# On production wizards this same script should be run via cron:
|
||||
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
|
||||
#
|
||||
# Refs: #1083, #1075
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 5 * * 1" # Monday 05:00 UTC
|
||||
workflow_dispatch: {} # allow manual trigger
|
||||
|
||||
jobs:
|
||||
privacy-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Run privacy audit against CI fixture
|
||||
run: |
|
||||
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
|
||||
246
docs/bezalel/evennia/cmd_palace.py
Normal file
246
docs/bezalel/evennia/cmd_palace.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""
|
||||
Palace commands — bridge Evennia to the local MemPalace memory system.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object, search_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
def _search_mempalace(query, wing=None, room=None, n=5, fleet=False):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _get_wing(caller):
|
||||
"""Return the caller's wing, defaulting to their key or 'general'."""
|
||||
return caller.db.wing if caller.attributes.has("wing") else (caller.key.lower() if caller.key else "general")
|
||||
|
||||
|
||||
class CmdPalaceSearch(Command):
|
||||
"""
|
||||
Search your memory palace.
|
||||
|
||||
Usage:
|
||||
palace/search <query>
|
||||
palace/search <query> [--room <room>]
|
||||
palace/recall <topic>
|
||||
palace/file <name> = <content>
|
||||
palace/status
|
||||
"""
|
||||
|
||||
key = "palace"
|
||||
aliases = ["pal"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: palace/search <query> | palace/recall <topic> | palace/file <name> = <content> | palace/status")
|
||||
return
|
||||
|
||||
parts = self.args.strip().split(" ", 1)
|
||||
subcmd = parts[0].lower()
|
||||
rest = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
if subcmd == "search":
|
||||
self._do_search(rest)
|
||||
elif subcmd == "recall":
|
||||
self._do_recall(rest)
|
||||
elif subcmd == "file":
|
||||
self._do_file(rest)
|
||||
elif subcmd == "status":
|
||||
self._do_status()
|
||||
else:
|
||||
self._do_search(self.args.strip())
|
||||
|
||||
def _do_search(self, query):
|
||||
if not query:
|
||||
self.caller.msg("Search for what?")
|
||||
return
|
||||
self.caller.msg(f"Searching the palace for: |c{query}|n...")
|
||||
wing = _get_wing(self.caller)
|
||||
results = _search_mempalace(query, wing=wing)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
lines.append(f"\n|g[{i}]|n |c{room}|n — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
def _do_recall(self, topic):
|
||||
if not topic:
|
||||
self.caller.msg("Recall what topic?")
|
||||
return
|
||||
results = _search_mempalace(topic, wing=_get_wing(self.caller), n=1)
|
||||
if not results:
|
||||
self.caller.msg("Nothing to recall.")
|
||||
return
|
||||
|
||||
r = results[0]
|
||||
content = r.get("content", "")
|
||||
source = r.get("source", "unknown")
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{topic}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = source
|
||||
obj.db.room_name = r.get("room", "general")
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() conjure() a memory shard from the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_file(self, rest):
|
||||
if "=" not in rest:
|
||||
self.caller.msg("Usage: palace/file <name> = <content>")
|
||||
return
|
||||
name, content = rest.split("=", 1)
|
||||
name = name.strip()
|
||||
content = content.strip()
|
||||
if not name or not content:
|
||||
self.caller.msg("Both name and content are required.")
|
||||
return
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{name}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = f"filed by {self.caller.key}"
|
||||
obj.db.room_name = self.caller.location.key if self.caller.location else "general"
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() file() a new memory in the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_status(self):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/mempalace",
|
||||
"--palace", "/root/wizards/bezalel/.mempalace/palace",
|
||||
"status"
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
self.caller.msg(result.stdout or result.stderr)
|
||||
except Exception as e:
|
||||
self.caller.msg(f"Could not reach the palace: {e}")
|
||||
|
||||
|
||||
class CmdRecall(Command):
|
||||
"""
|
||||
Recall a memory from the palace.
|
||||
|
||||
Usage:
|
||||
recall <query>
|
||||
recall <query> --fleet
|
||||
recall <query> --room <room>
|
||||
"""
|
||||
|
||||
key = "recall"
|
||||
aliases = ["remember", "mem"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Recall what? Usage: recall <query> [--fleet] [--room <room>]")
|
||||
return
|
||||
|
||||
args = self.args.strip()
|
||||
fleet = "--fleet" in args
|
||||
room = None
|
||||
|
||||
if "--room" in args:
|
||||
parts = args.split("--room")
|
||||
args = parts[0].strip()
|
||||
room = parts[1].strip().split()[0] if len(parts) > 1 else None
|
||||
|
||||
if "--fleet" in args:
|
||||
args = args.replace("--fleet", "").strip()
|
||||
|
||||
self.caller.msg(f"Recalling from the {'fleet' if fleet else 'personal'} palace: |c{args}|n...")
|
||||
|
||||
wing = None if fleet else _get_wing(self.caller)
|
||||
results = _search_mempalace(args, wing=wing, room=room, n=5, fleet=fleet)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room_name = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
wing_label = r.get("wing", "unknown")
|
||||
wing_tag = f" |y[{wing_label}]|n" if fleet else ""
|
||||
lines.append(f"\n|g[{i}]|n |c{room_name}|n{wing_tag} — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
|
||||
class CmdEnterRoom(Command):
|
||||
"""
|
||||
Enter a room in the mind palace by topic.
|
||||
|
||||
Usage:
|
||||
enter room <topic>
|
||||
"""
|
||||
|
||||
key = "enter room"
|
||||
aliases = ["enter palace", "go room"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Enter which room? Usage: enter room <topic>")
|
||||
return
|
||||
|
||||
topic = self.args.strip().lower().replace(" ", "-")
|
||||
wing = _get_wing(self.caller)
|
||||
room_key = f"palace:{wing}:{topic}"
|
||||
|
||||
# Search for existing room
|
||||
rooms = search_object(room_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if rooms:
|
||||
room = rooms[0]
|
||||
else:
|
||||
# Create the room dynamically
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
room = create_object(
|
||||
PalaceRoom,
|
||||
key=room_key,
|
||||
)
|
||||
room.db.memory_topic = topic
|
||||
room.db.wing = wing
|
||||
room.update_description()
|
||||
|
||||
self.caller.move_to(room, move_type="teleport")
|
||||
self.caller.msg(f"You step into the |c{topic}|n room of your mind palace.")
|
||||
166
docs/bezalel/evennia/cmd_record.py
Normal file
166
docs/bezalel/evennia/cmd_record.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
Live memory commands — write new memories into the palace from Evennia.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
ADDER_SCRIPT = "/root/wizards/bezalel/evennia/palace_add.py"
|
||||
|
||||
|
||||
def _add_drawer(content, wing, room, source):
|
||||
"""Add a verbatim drawer to the palace via the helper script."""
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
ADDER_SCRIPT,
|
||||
content,
|
||||
wing,
|
||||
room,
|
||||
source,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
return result.returncode == 0 and "OK" in result.stdout
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class CmdRecord(Command):
|
||||
"""
|
||||
Record a decision into the palace hall_facts.
|
||||
|
||||
Usage:
|
||||
record <text>
|
||||
record We decided to use PostgreSQL over MySQL.
|
||||
"""
|
||||
|
||||
key = "record"
|
||||
aliases = ["decide"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Record what decision? Usage: record <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"DECISION ({wing}): {text}\nRecorded by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() record() a decision in the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdNote(Command):
|
||||
"""
|
||||
Note a breakthrough into the palace hall_discoveries.
|
||||
|
||||
Usage:
|
||||
note <text>
|
||||
note The GraphQL schema can be auto-generated from our typeclasses.
|
||||
"""
|
||||
|
||||
key = "note"
|
||||
aliases = ["jot"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Note what? Usage: note <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"BREAKTHROUGH ({wing}): {text}\nNoted by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() inscribe() a breakthrough into the palace scrolls.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdEvent(Command):
|
||||
"""
|
||||
Log an event into the palace hall_events.
|
||||
|
||||
Usage:
|
||||
event <text>
|
||||
event Gitea runner came back online after being offline for 6 hours.
|
||||
"""
|
||||
|
||||
key = "event"
|
||||
aliases = ["log"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Log what event? Usage: event <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"EVENT ({wing}): {text}\nLogged by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() chronicle() an event in the palace records.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdPalaceWrite(Command):
|
||||
"""
|
||||
Directly write a memory into a specific palace room.
|
||||
|
||||
Usage:
|
||||
palace/write <room> = <text>
|
||||
"""
|
||||
|
||||
key = "palace/write"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if "=" not in self.args:
|
||||
self.caller.msg("Usage: palace/write <room> = <text>")
|
||||
return
|
||||
|
||||
room, text = self.args.split("=", 1)
|
||||
room = room.strip()
|
||||
text = text.strip()
|
||||
|
||||
if not room or not text:
|
||||
self.caller.msg("Both room and text are required.")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
full_text = f"MEMORY ({wing}/{room}): {text}\nWritten by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, room, f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() etch() a memory into the |c{room}|n room of the palace.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
105
docs/bezalel/evennia/cmd_steward.py
Normal file
105
docs/bezalel/evennia/cmd_steward.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""
|
||||
Steward commands — ask a palace steward about memories.
|
||||
"""
|
||||
|
||||
from evennia.commands.command import Command
|
||||
from evennia import search_object
|
||||
|
||||
|
||||
class CmdAskSteward(Command):
|
||||
"""
|
||||
Ask a steward NPC about a topic from the palace memory.
|
||||
|
||||
Usage:
|
||||
ask <steward> about <topic>
|
||||
ask <steward> about <topic> --fleet
|
||||
|
||||
Example:
|
||||
ask bezalel-steward about nightly watch
|
||||
ask bezalel-steward about runner outage --fleet
|
||||
"""
|
||||
|
||||
key = "ask"
|
||||
aliases = ["question"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def parse(self):
|
||||
"""Parse 'ask <target> about <topic>' syntax."""
|
||||
raw = self.args.strip()
|
||||
fleet = "--fleet" in raw
|
||||
if fleet:
|
||||
raw = raw.replace("--fleet", "").strip()
|
||||
|
||||
if " about " in raw.lower():
|
||||
parts = raw.split(" about ", 1)
|
||||
self.target_name = parts[0].strip()
|
||||
self.topic = parts[1].strip()
|
||||
else:
|
||||
self.target_name = ""
|
||||
self.topic = raw
|
||||
self.fleet = fleet
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: ask <steward> about <topic> [--fleet]")
|
||||
return
|
||||
|
||||
self.parse()
|
||||
|
||||
if not self.target_name:
|
||||
self.caller.msg("Ask whom? Usage: ask <steward> about <topic>")
|
||||
return
|
||||
|
||||
# Find steward NPC in current room
|
||||
stewards = [
|
||||
obj for obj in self.caller.location.contents
|
||||
if hasattr(obj, "respond_to_question")
|
||||
and self.target_name.lower() in obj.key.lower()
|
||||
]
|
||||
|
||||
if not stewards:
|
||||
self.caller.msg(f"There is no steward here matching '{self.target_name}'.")
|
||||
return
|
||||
|
||||
steward = stewards[0]
|
||||
self.caller.msg(f"You ask |c{steward.key}|n about '{self.topic}'...")
|
||||
steward.respond_to_question(self.topic, self.caller, fleet=self.fleet)
|
||||
|
||||
|
||||
class CmdSummonSteward(Command):
|
||||
"""
|
||||
Summon your wing's steward NPC to your current location.
|
||||
|
||||
Usage:
|
||||
summon steward
|
||||
"""
|
||||
|
||||
key = "summon steward"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
steward_key = f"{wing}-steward"
|
||||
|
||||
# Search for existing steward
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"A shimmer of light coalesces into |c{steward.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
steward = StewardNPC.create(steward_key)[0]
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = self.caller.key
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"You call forth |c{steward.key}|n from the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Hall of Wings — Builds the central MemPalace zone in Evennia.
|
||||
|
||||
Usage (from Evennia shell or script):
|
||||
from world.hall_of_wings import build_hall_of_wings
|
||||
build_hall_of_wings()
|
||||
"""
|
||||
|
||||
from evennia import create_object
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
from typeclasses.rooms import Room
|
||||
from typeclasses.exits import Exit
|
||||
|
||||
HALL_KEY = "hall_of_wings"
|
||||
HALL_NAME = "Hall of Wings"
|
||||
|
||||
DEFAULT_WINGS = [
|
||||
"bezalel",
|
||||
"timmy",
|
||||
"allegro",
|
||||
"ezra",
|
||||
]
|
||||
|
||||
|
||||
def build_hall_of_wings():
|
||||
"""Create or update the central Hall of Wings and attach steward chambers."""
|
||||
# Find or create the hall
|
||||
from evennia import search_object
|
||||
halls = search_object(HALL_KEY, typeclass="typeclasses.rooms.Room")
|
||||
if halls:
|
||||
hall = halls[0]
|
||||
else:
|
||||
hall = create_object(Room, key=HALL_KEY)
|
||||
hall.db.desc = (
|
||||
"|cThe Hall of Wings|n\n"
|
||||
"A vast circular chamber of pale stone and shifting starlight.\n"
|
||||
"Arched doorways line the perimeter, each leading to a steward's chamber.\n"
|
||||
"Here, the memories of the fleet converge.\n\n"
|
||||
"Use |wsummon steward|n to call your wing's steward, or\n"
|
||||
"|wask <steward> about <topic>|n to query the palace archives."
|
||||
)
|
||||
|
||||
for wing in DEFAULT_WINGS:
|
||||
chamber_key = f"chamber:{wing}"
|
||||
chambers = search_object(chamber_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if chambers:
|
||||
chamber = chambers[0]
|
||||
else:
|
||||
chamber = create_object(PalaceRoom, key=chamber_key)
|
||||
chamber.db.memory_topic = wing
|
||||
chamber.db.wing = wing
|
||||
chamber.db.desc = (
|
||||
f"|cThe Chamber of {wing.title()}|n\n"
|
||||
f"This room holds the accumulated memories of the {wing} wing.\n"
|
||||
f"A steward stands ready to answer questions."
|
||||
)
|
||||
chamber.update_description()
|
||||
|
||||
# Link hall <-> chamber with exits
|
||||
exit_name = f"{wing}-chamber"
|
||||
existing_exits = [ex for ex in hall.exits if ex.key == exit_name]
|
||||
if not existing_exits:
|
||||
create_object(Exit, key=exit_name, location=hall, destination=chamber)
|
||||
|
||||
return_exits = [ex for ex in chamber.exits if ex.key == "hall"]
|
||||
if not return_exits:
|
||||
create_object(Exit, key="hall", location=chamber, destination=hall)
|
||||
|
||||
# Place or summon steward
|
||||
steward_key = f"{wing}-steward"
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
if steward.location != chamber:
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
else:
|
||||
steward = create_object(StewardNPC, key=steward_key)
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = wing.title()
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
|
||||
return hall
|
||||
87
docs/bezalel/evennia/palace_room.py
Normal file
87
docs/bezalel/evennia/palace_room.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""
|
||||
PalaceRoom
|
||||
|
||||
A Room that represents a topic in the memory palace.
|
||||
Memory objects spawned here embody concepts retrieved from mempalace.
|
||||
Its description auto-populates from a palace search on the memory topic.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultRoom
|
||||
from .objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class PalaceRoom(ObjectParent, DefaultRoom):
|
||||
"""
|
||||
A room in the mind palace. Its db.memory_topic describes what
|
||||
kind of memories are stored here. The description is populated
|
||||
from a live MemPalace search.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.memory_topic = ""
|
||||
self.db.wing = "bezalel"
|
||||
self.db.desc = (
|
||||
f"This is the |c{self.key}|n room of your mind palace.\n"
|
||||
"Memories and concepts drift here like motes of light.\n"
|
||||
"Use |wpalace/search <query>|n or |wrecall <topic>|n to summon memories."
|
||||
)
|
||||
|
||||
def _search_palace(self, query, wing=None, room=None, n=3):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def update_description(self):
|
||||
"""Refresh the room description from a palace search on its topic."""
|
||||
topic = self.db.memory_topic or self.key.split(":")[-1] if ":" in self.key else self.key
|
||||
wing = self.db.wing or "bezalel"
|
||||
results = self._search_palace(topic, wing=wing, n=3)
|
||||
|
||||
header = (
|
||||
f"=|c {topic.upper()} |n="
|
||||
)
|
||||
desc_lines = [
|
||||
header,
|
||||
f"You stand in the |c{topic}|n room of the |y{wing}|n wing.",
|
||||
"Memories drift here like motes of light.",
|
||||
"",
|
||||
]
|
||||
|
||||
if results:
|
||||
desc_lines.append("|gNearby memories:|n")
|
||||
for i, r in enumerate(results, 1):
|
||||
content = r.get("content", "")[:200]
|
||||
source = r.get("source", "unknown")
|
||||
room_name = r.get("room", "unknown")
|
||||
desc_lines.append(f" |m[{i}]|n |c{room_name}|n — {content}... |x({source})|n")
|
||||
else:
|
||||
desc_lines.append("|xThe palace is quiet here. No memories resonate with this topic yet.|n")
|
||||
|
||||
desc_lines.append("")
|
||||
desc_lines.append("Use |wrecall <query>|n to search deeper, or |wpalace/search <query>|n.")
|
||||
self.db.desc = "\n".join(desc_lines)
|
||||
|
||||
def at_object_receive(self, moved_obj, source_location, **kwargs):
|
||||
"""Refresh description when someone enters."""
|
||||
if moved_obj.has_account:
|
||||
self.update_description()
|
||||
super().at_object_receive(moved_obj, source_location, **kwargs)
|
||||
|
||||
def return_appearance(self, looker):
|
||||
text = super().return_appearance(looker)
|
||||
if self.db.memory_topic:
|
||||
text += f"\n|xTopic: {self.db.memory_topic}|n"
|
||||
return text
|
||||
70
docs/bezalel/evennia/steward_npc.py
Normal file
70
docs/bezalel/evennia/steward_npc.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
StewardNPC
|
||||
|
||||
A palace steward NPC that answers questions by querying the local
|
||||
or fleet MemPalace backend. One steward per wizard wing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultCharacter
|
||||
from typeclasses.objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class StewardNPC(ObjectParent, DefaultCharacter):
|
||||
"""
|
||||
A steward of the mind palace. Ask it about memories,
|
||||
decisions, or events from its wing.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.wing = "bezalel"
|
||||
self.db.steward_name = "Bezalel"
|
||||
self.db.desc = (
|
||||
f"|c{self.key}|n stands here quietly, eyes like polished steel, "
|
||||
"waiting to recall anything from the palace archives."
|
||||
)
|
||||
self.locks.add("get:false();delete:perm(Admin)")
|
||||
|
||||
def _search_palace(self, query, fleet=False, n=3):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
PALACE_SCRIPT,
|
||||
query,
|
||||
"none" if fleet else self.db.wing,
|
||||
"none",
|
||||
str(n),
|
||||
]
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _summarize_for_speech(self, results, query):
|
||||
"""Convert search results into in-character dialogue."""
|
||||
if not results:
|
||||
return "I find no memory of that in the palace."
|
||||
|
||||
lines = [f"Regarding '{query}':"]
|
||||
for r in results:
|
||||
room = r.get("room", "unknown")
|
||||
content = r.get("content", "")[:300]
|
||||
source = r.get("source", "unknown")
|
||||
lines.append(f" From the |c{room}|n room: {content}... |x[{source}]|n")
|
||||
return "\n".join(lines)
|
||||
|
||||
def respond_to_question(self, question, asker, fleet=False):
|
||||
results = self._search_palace(question, fleet=fleet, n=3)
|
||||
speech = self._summarize_for_speech(results, question)
|
||||
self.location.msg_contents(
|
||||
f"|c{self.key}|n says to $you(asker): \"{speech}\"",
|
||||
mapping={"asker": asker},
|
||||
from_obj=self,
|
||||
)
|
||||
@@ -1,44 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Apply branch protections to all repositories
|
||||
# Requires GITEA_TOKEN env var
|
||||
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for repo in "${REPOS[@]}"
|
||||
do
|
||||
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"required_reviews": 1,
|
||||
"dismiss_stale_reviews": true,
|
||||
"block_force_push": true,
|
||||
"block_deletions": true
|
||||
}'
|
||||
done
|
||||
#!/bin/bash
|
||||
|
||||
# Gitea API credentials
|
||||
GITEA_TOKEN="your-personal-access-token"
|
||||
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
# Repos to protect
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for REPO in "${REPO[@]}"; do
|
||||
echo "Configuring branch protection for $REPO..."
|
||||
|
||||
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"name": "main",
|
||||
"require_pull_request": true,
|
||||
"required_approvals": 1,
|
||||
"dismiss_stale_approvals": true,
|
||||
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
|
||||
"block_force_push": true,
|
||||
"block_delete": true
|
||||
}' \
|
||||
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
|
||||
done
|
||||
# Wrapper for the canonical branch-protection sync script.
|
||||
# Usage: ./gitea-branch-protection.sh
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
python3 scripts/sync_branch_protection.py
|
||||
|
||||
186
mempalace/fleet_api.py
Normal file
186
mempalace/fleet_api.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
|
||||
|
||||
Exposes fleet memory search over HTTP so that Alpha servers and other
|
||||
wizard deployments can query the palace without direct filesystem access.
|
||||
|
||||
Endpoints:
|
||||
GET /health
|
||||
Returns {"status": "ok", "palace": "<path>"}
|
||||
|
||||
GET /search?q=<query>[&room=<room>][&n=<int>]
|
||||
Returns {"results": [...], "query": "...", "room": "...", "count": N}
|
||||
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
|
||||
|
||||
GET /wings
|
||||
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
|
||||
|
||||
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
|
||||
|
||||
Usage:
|
||||
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
|
||||
python mempalace/fleet_api.py
|
||||
|
||||
# Custom host/port/palace:
|
||||
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
# Add repo root to path so we can import nexus.mempalace
|
||||
_HERE = Path(__file__).resolve().parent
|
||||
_REPO_ROOT = _HERE.parent
|
||||
if str(_REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 7771
|
||||
MAX_RESULTS = 50
|
||||
|
||||
|
||||
def _get_palace_path() -> Path:
|
||||
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
|
||||
|
||||
|
||||
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
|
||||
payload = json.dumps(body).encode()
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", str(len(payload)))
|
||||
handler.end_headers()
|
||||
handler.wfile.write(payload)
|
||||
|
||||
|
||||
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
|
||||
palace = _get_palace_path()
|
||||
_json_response(handler, 200, {
|
||||
"status": "ok",
|
||||
"palace": str(palace),
|
||||
"palace_exists": palace.exists(),
|
||||
})
|
||||
|
||||
|
||||
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
|
||||
query_terms = qs.get("q", [""])
|
||||
q = query_terms[0].strip() if query_terms else ""
|
||||
if not q:
|
||||
_json_response(handler, 400, {"error": "Missing required parameter: q"})
|
||||
return
|
||||
|
||||
room_terms = qs.get("room", [])
|
||||
room = room_terms[0].strip() if room_terms else None
|
||||
|
||||
n_terms = qs.get("n", [])
|
||||
try:
|
||||
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
|
||||
except (ValueError, IndexError):
|
||||
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
|
||||
return
|
||||
|
||||
try:
|
||||
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
|
||||
except ImportError as exc:
|
||||
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
|
||||
return
|
||||
|
||||
try:
|
||||
results = search_fleet(q, room=room, n_results=n)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_json_response(handler, 503, {"error": str(exc)})
|
||||
return
|
||||
|
||||
_json_response(handler, 200, {
|
||||
"query": q,
|
||||
"room": room,
|
||||
"count": len(results),
|
||||
"results": [
|
||||
{
|
||||
"text": r.text,
|
||||
"room": r.room,
|
||||
"wing": r.wing,
|
||||
"score": round(r.score, 4),
|
||||
}
|
||||
for r in results
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
|
||||
"""Return distinct wizard wing names found in the fleet palace directory."""
|
||||
palace = _get_palace_path()
|
||||
if not palace.exists():
|
||||
_json_response(handler, 503, {
|
||||
"error": f"Fleet palace not found: {palace}",
|
||||
})
|
||||
return
|
||||
|
||||
wings = sorted({
|
||||
p.name for p in palace.iterdir() if p.is_dir()
|
||||
})
|
||||
_json_response(handler, 200, {"wings": wings})
|
||||
|
||||
|
||||
class FleetAPIHandler(BaseHTTPRequestHandler):
|
||||
"""Request handler for the fleet memory API."""
|
||||
|
||||
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
|
||||
# Prefix with tag for easier log filtering
|
||||
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path.rstrip("/") or "/"
|
||||
qs = parse_qs(parsed.query)
|
||||
|
||||
if path == "/health":
|
||||
_handle_health(self)
|
||||
elif path == "/search":
|
||||
_handle_search(self, qs)
|
||||
elif path == "/wings":
|
||||
_handle_wings(self)
|
||||
else:
|
||||
_json_response(self, 404, {
|
||||
"error": f"Unknown endpoint: {path}",
|
||||
"endpoints": ["/health", "/search", "/wings"],
|
||||
})
|
||||
|
||||
|
||||
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
|
||||
return HTTPServer((host, port), FleetAPIHandler)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet palace HTTP API server."
|
||||
)
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
|
||||
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
palace = _get_palace_path()
|
||||
print(f"[fleet_api] Palace: {palace}")
|
||||
if not palace.exists():
|
||||
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
|
||||
|
||||
server = make_server(args.host, args.port)
|
||||
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[fleet_api] Shutting down.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
95
scripts/audit_mempalace_privacy.py
Normal file
95
scripts/audit_mempalace_privacy.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Audit the fleet shared palace for privacy violations.
|
||||
Ensures no raw drawers, full source paths, or private workspace leaks exist.
|
||||
|
||||
Usage:
|
||||
python audit_mempalace_privacy.py /path/to/fleet/palace
|
||||
|
||||
Exit codes:
|
||||
0 = clean
|
||||
1 = violations found
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import chromadb
|
||||
except ImportError:
|
||||
print("ERROR: chromadb not installed")
|
||||
sys.exit(1)
|
||||
|
||||
VIOLATION_KEYWORDS = [
|
||||
"/root/wizards/",
|
||||
"/home/",
|
||||
"/Users/",
|
||||
"private_key",
|
||||
"-----BEGIN",
|
||||
"GITEA_TOKEN=",
|
||||
"OPENAI_API_KEY",
|
||||
"ANTHROPIC_API_KEY",
|
||||
]
|
||||
|
||||
|
||||
def audit(palace_path: Path):
|
||||
violations = []
|
||||
client = chromadb.PersistentClient(path=str(palace_path))
|
||||
try:
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
except Exception as e:
|
||||
print(f"ERROR: Could not open collection: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
all_data = col.get(include=["documents", "metadatas"])
|
||||
docs = all_data["documents"]
|
||||
metas = all_data["metadatas"]
|
||||
|
||||
for doc, meta in zip(docs, metas):
|
||||
source = meta.get("source_file", "")
|
||||
doc_type = meta.get("type", "")
|
||||
|
||||
# Rule 1: Fleet palace should only contain closets or explicitly typed entries
|
||||
if doc_type not in ("closet", "summary", "fleet"):
|
||||
violations.append(
|
||||
f"VIOLATION: Document type is '{doc_type}' (expected closet/summary/fleet). "
|
||||
f"Source: {source}"
|
||||
)
|
||||
|
||||
# Rule 2: No full absolute paths from private workspaces
|
||||
if any(abs_path in source for abs_path in ["/root/wizards/", "/home/", "/Users/"]):
|
||||
violations.append(
|
||||
f"VIOLATION: Source contains absolute path: {source}"
|
||||
)
|
||||
|
||||
# Rule 3: No raw secrets in document text
|
||||
for kw in VIOLATION_KEYWORDS:
|
||||
if kw in doc:
|
||||
violations.append(
|
||||
f"VIOLATION: Document contains sensitive keyword '{kw}'. Source: {source}"
|
||||
)
|
||||
break # one violation per doc is enough
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Audit fleet palace privacy")
|
||||
parser.add_argument("palace", default="/var/lib/mempalace/fleet", nargs="?", help="Path to fleet palace")
|
||||
args = parser.parse_args()
|
||||
|
||||
violations = audit(Path(args.palace))
|
||||
|
||||
if violations:
|
||||
print(f"FAIL: {len(violations)} privacy violation(s) found")
|
||||
for v in violations:
|
||||
print(f" {v}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("PASS: No privacy violations detected")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet Merge Review Audit
|
||||
========================
|
||||
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||
and validates that each merged PR had at least one approving review.
|
||||
|
||||
Exit 0 = no unreviewed merges
|
||||
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||
|
||||
Usage:
|
||||
python scripts/audit_merge_reviews.py
|
||||
python scripts/audit_merge_reviews.py --create-issues
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import json
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
DAYS_BACK = 7
|
||||
SECURITY_LABEL = "security"
|
||||
|
||||
|
||||
def api_request(path: str) -> dict | list:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def api_post(path: str, payload: dict) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repos() -> list[str]:
|
||||
repos = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||
if not batch:
|
||||
break
|
||||
repos.extend([r["name"] for r in batch])
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(
|
||||
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||
prs.append(pr)
|
||||
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||
return prs
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
|
||||
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||
try:
|
||||
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
|
||||
|
||||
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||
body = (
|
||||
f"## Unreviewed Merge Detected\n\n"
|
||||
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||
f"- **Merged at:** {pr['merged_at']}\n"
|
||||
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||
f"### Required Actions\n"
|
||||
f"1. Validate the merge contents are safe.\n"
|
||||
f"2. If malicious or incorrect, revert immediately.\n"
|
||||
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||
)
|
||||
try:
|
||||
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [SECURITY_LABEL],
|
||||
})
|
||||
return issue.get("number")
|
||||
except Exception as e:
|
||||
print(f" FAILED to create issue: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||
return 1
|
||||
|
||||
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||
since = since_dt.isoformat()
|
||||
|
||||
repos = get_repos()
|
||||
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||
|
||||
unreviewed_count = 0
|
||||
for repo in repos:
|
||||
merged = get_merged_prs(repo, since)
|
||||
if not merged:
|
||||
continue
|
||||
|
||||
repo_unreviewed = []
|
||||
for pr in merged:
|
||||
reviews = get_reviews(repo, pr["number"])
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if not approvals:
|
||||
repo_unreviewed.append(pr)
|
||||
|
||||
if repo_unreviewed:
|
||||
print(f"\n{repo}:")
|
||||
for pr in repo_unreviewed:
|
||||
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||
unreviewed_count += 1
|
||||
if args.create_issues:
|
||||
issue_num = create_post_mortem(repo, pr)
|
||||
if issue_num:
|
||||
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
if unreviewed_count == 0:
|
||||
print("All merges in the last 7 days had at least one approving review.")
|
||||
return 0
|
||||
else:
|
||||
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
50
scripts/backup_databases.sh
Executable file
50
scripts/backup_databases.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel Database Backup — MemPalace + Evennia + Fleet
|
||||
# Runs nightly after re-mine completes. Keeps 7 days of rolling backups.
|
||||
set -euo pipefail
|
||||
|
||||
BACKUP_BASE="/root/wizards/bezalel/home/backups"
|
||||
DATE=$(date +%Y%m%d_%H%M%S)
|
||||
LOG="/var/log/bezalel_db_backup.log"
|
||||
|
||||
# Sources
|
||||
LOCAL_PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_PALACE="/var/lib/mempalace/fleet"
|
||||
EVENNIA_DB="/root/wizards/bezalel/evennia/bezalel_world/server/evennia.db3"
|
||||
|
||||
# Destinations
|
||||
LOCAL_BACKUP="${BACKUP_BASE}/mempalace/mempalace_${DATE}.tar.gz"
|
||||
FLEET_BACKUP="${BACKUP_BASE}/fleet/fleet_${DATE}.tar.gz"
|
||||
EVENNIA_BACKUP="${BACKUP_BASE}/evennia/evennia_${DATE}.db3.gz"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting database backup cycle..."
|
||||
|
||||
# 1. Backup local MemPalace
|
||||
tar -czf "$LOCAL_BACKUP" -C "$(dirname "$LOCAL_PALACE")" "$(basename "$LOCAL_PALACE")"
|
||||
log "Local palace backed up: ${LOCAL_BACKUP} ($(du -h "$LOCAL_BACKUP" | cut -f1))"
|
||||
|
||||
# 2. Backup fleet MemPalace
|
||||
tar -czf "$FLEET_BACKUP" -C "$(dirname "$FLEET_PALACE")" "$(basename "$FLEET_PALACE")"
|
||||
log "Fleet palace backed up: ${FLEET_BACKUP} ($(du -h "$FLEET_BACKUP" | cut -f1))"
|
||||
|
||||
# 3. Backup Evennia DB (gzip for space)
|
||||
gzip -c "$EVENNIA_DB" > "$EVENNIA_BACKUP"
|
||||
log "Evennia DB backed up: ${EVENNIA_BACKUP} ($(du -h "$EVENNIA_BACKUP" | cut -f1))"
|
||||
|
||||
# 4. Prune backups older than 7 days
|
||||
find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' -mtime +7 -delete
|
||||
log "Pruned backups older than 7 days"
|
||||
|
||||
# 5. Report counts
|
||||
MP_COUNT=$(find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' | wc -l)
|
||||
FL_COUNT=$(find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' | wc -l)
|
||||
EV_COUNT=$(find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' | wc -l)
|
||||
log "Backup cycle complete. Retained: mempalace=${MP_COUNT}, fleet=${FL_COUNT}, evennia=${EV_COUNT}"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/db_backup.last
|
||||
135
scripts/ci_auto_revert.py
Normal file
135
scripts/ci_auto_revert.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CI Auto-Revert — Poka-yoke for broken merges.
|
||||
Monitors the main branch post-merge and auto-reverts via local git if CI fails.
|
||||
|
||||
Usage:
|
||||
python ci_auto_revert.py <repo_owner>/<repo_name>
|
||||
python ci_auto_revert.py Timmy_Foundation/hermes-agent
|
||||
|
||||
Recommended cron: */10 * * * *
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REVERT_WINDOW_MINUTES = 10
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_recent_commits(owner, repo, since):
|
||||
since_iso = since.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits?sha=main&since={since_iso}&limit=20")
|
||||
|
||||
|
||||
def get_commit_status(owner, repo, sha):
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits/{sha}/status")
|
||||
|
||||
|
||||
def revert_via_git(clone_url, sha, msg, owner, repo):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Clone with token
|
||||
auth_url = clone_url.replace("https://", f"https://bezalel:{GITEA_TOKEN}@")
|
||||
subprocess.run(["git", "clone", "--depth", "10", auth_url, tmpdir], check=True, capture_output=True)
|
||||
|
||||
# Configure git
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.email", "bezalel@timmy.foundation"], check=True, capture_output=True)
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.name", "Bezalel"], check=True, capture_output=True)
|
||||
|
||||
# Revert the commit
|
||||
revert_msg = f"[auto-revert] {msg}\n\nOriginal commit {sha} failed CI."
|
||||
result = subprocess.run(
|
||||
["git", "-C", tmpdir, "revert", "--no-edit", "-m", revert_msg, sha],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return {"error": f"git revert failed: {result.stderr}"}
|
||||
|
||||
# Push
|
||||
push_result = subprocess.run(
|
||||
["git", "-C", tmpdir, "push", "origin", "main"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if push_result.returncode != 0:
|
||||
return {"error": f"git push failed: {push_result.stderr}"}
|
||||
|
||||
return {"ok": True, "reverted_sha": sha}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: {sys.argv[0]} <owner/repo>")
|
||||
sys.exit(1)
|
||||
|
||||
repo_full = sys.argv[1]
|
||||
owner, repo = repo_full.split("/", 1)
|
||||
|
||||
since = datetime.now(timezone.utc) - timedelta(minutes=REVERT_WINDOW_MINUTES + 5)
|
||||
commits = get_recent_commits(owner, repo, since)
|
||||
|
||||
if not isinstance(commits, list):
|
||||
print(f"ERROR fetching commits: {commits}")
|
||||
sys.exit(1)
|
||||
|
||||
reverted = 0
|
||||
for commit in commits:
|
||||
sha = commit.get("sha", "")
|
||||
msg = commit.get("commit", {}).get("message", "").split("\n")[0]
|
||||
commit_time = commit.get("commit", {}).get("committer", {}).get("date", "")
|
||||
if not commit_time:
|
||||
continue
|
||||
|
||||
commit_dt = datetime.fromisoformat(commit_time.replace("Z", "+00:00"))
|
||||
age_min = (datetime.now(timezone.utc) - commit_dt).total_seconds() / 60
|
||||
|
||||
if age_min > REVERT_WINDOW_MINUTES:
|
||||
continue
|
||||
|
||||
status = get_commit_status(owner, repo, sha)
|
||||
state = status.get("state", "")
|
||||
|
||||
if state == "failure":
|
||||
print(f"ALERT: Commit {sha[:8]} '{msg}' failed CI ({age_min:.1f}m old). Initiating revert...")
|
||||
repo_info = api_call("GET", f"/repos/{owner}/{repo}")
|
||||
clone_url = repo_info.get("clone_url", "")
|
||||
if not clone_url:
|
||||
print(f" Cannot find clone URL")
|
||||
continue
|
||||
result = revert_via_git(clone_url, sha, msg, owner, repo)
|
||||
if "error" in result:
|
||||
print(f" Revert failed: {result['error']}")
|
||||
else:
|
||||
print(f" Reverted successfully.")
|
||||
reverted += 1
|
||||
elif state == "success":
|
||||
print(f"OK: Commit {sha[:8]} '{msg}' passed CI.")
|
||||
elif state == "pending":
|
||||
print(f"PENDING: Commit {sha[:8]} '{msg}' still running CI.")
|
||||
else:
|
||||
print(f"UNKNOWN: Commit {sha[:8]} '{msg}' has CI state '{state}'.")
|
||||
|
||||
if reverted == 0:
|
||||
print("No broken merges found in the last 10 minutes.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
75
scripts/mempalace_export.py
Normal file
75
scripts/mempalace_export.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Export closets from a local MemPalace wing for fleet-wide sharing.
|
||||
|
||||
Privacy rule: only summaries/closets are exported. No raw source_file paths.
|
||||
Source filenames are anonymized to just the basename.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import chromadb
|
||||
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_INCOMING = "/var/lib/mempalace/fleet/incoming"
|
||||
WING = "bezalel"
|
||||
DOCS_PER_ROOM = 5
|
||||
|
||||
|
||||
def main():
|
||||
client = chromadb.PersistentClient(path=PALACE_PATH)
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
|
||||
# Discover rooms in this wing
|
||||
all_meta = col.get(include=["metadatas"])["metadatas"]
|
||||
rooms = set()
|
||||
for m in all_meta:
|
||||
if m.get("wing") == WING:
|
||||
rooms.add(m.get("room", "general"))
|
||||
|
||||
Path(FLEET_INCOMING).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
closets = []
|
||||
for room in sorted(rooms):
|
||||
results = col.query(
|
||||
query_texts=[room],
|
||||
n_results=DOCS_PER_ROOM,
|
||||
where={"$and": [{"wing": WING}, {"room": room}]},
|
||||
include=["documents", "metadatas"],
|
||||
)
|
||||
docs = results["documents"][0]
|
||||
metas = results["metadatas"][0]
|
||||
|
||||
entries = []
|
||||
for doc, meta in zip(docs, metas):
|
||||
# Sanitize content: strip absolute workspace paths
|
||||
sanitized = doc[:800]
|
||||
sanitized = sanitized.replace("/root/wizards/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/root/wizards/", "~/")
|
||||
sanitized = sanitized.replace("/home/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/home/", "~/")
|
||||
entries.append({
|
||||
"content": sanitized,
|
||||
"source_basename": Path(meta.get("source_file", "?")).name,
|
||||
})
|
||||
|
||||
closet = {
|
||||
"wing": WING,
|
||||
"room": room,
|
||||
"type": "closet",
|
||||
"entries": entries,
|
||||
}
|
||||
closets.append(closet)
|
||||
|
||||
out_file = Path(FLEET_INCOMING) / f"{WING}_closets.json"
|
||||
with open(out_file, "w") as f:
|
||||
json.dump(closets, f, indent=2)
|
||||
|
||||
print(f"Exported {len(closets)} closets to {out_file}")
|
||||
for c in closets:
|
||||
print(f" {c['wing']} / {c['room']} : {len(c['entries'])} entries")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
24
scripts/mempalace_nightly.sh
Executable file
24
scripts/mempalace_nightly.sh
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel MemPalace Nightly Re-mine + Fleet Sync
|
||||
set -euo pipefail
|
||||
|
||||
PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
MINER="/root/wizards/bezalel/hermes/venv/bin/mempalace"
|
||||
WING_DIR="/root/wizards/bezalel"
|
||||
LOG="/var/log/bezalel_mempalace.log"
|
||||
EXPORTER="/root/wizards/bezalel/hermes/venv/bin/python /root/wizards/bezalel/mempalace_export.py"
|
||||
IMPORTER="/root/wizards/bezalel/hermes/venv/bin/python /var/lib/mempalace/fleet_import.py"
|
||||
|
||||
echo "[$(date -Iseconds)] Starting mempalace re-mine" >> "$LOG"
|
||||
cd "$WING_DIR"
|
||||
"$MINER" --palace "$PALACE" mine "$WING_DIR" --agent bezalel >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Finished mempalace re-mine" >> "$LOG"
|
||||
"$MINER" --palace "$PALACE" status >> "$LOG" 2>&1 || true
|
||||
|
||||
echo "[$(date -Iseconds)] Starting fleet closet export" >> "$LOG"
|
||||
$EXPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Starting fleet closet import" >> "$LOG"
|
||||
$IMPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Fleet sync complete" >> "$LOG"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/mempalace_nightly.last
|
||||
53
scripts/meta_heartbeat.sh
Executable file
53
scripts/meta_heartbeat.sh
Executable file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env bash
|
||||
# Meta-heartbeat — checks all Bezalel cron jobs for stale timestamps
|
||||
set -euo pipefail
|
||||
|
||||
HEARTBEAT_DIR="/var/lib/bezalel/heartbeats"
|
||||
ALERT_LOG="/var/log/bezalel_meta_heartbeat.log"
|
||||
STALE_MINUTES=30
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
mkdir -p "$HEARTBEAT_DIR"
|
||||
|
||||
# Define expected heartbeats: name => max_stale_minutes
|
||||
HEARTBEATS=(
|
||||
"nightly_watch:150" # 2.5h — runs at 02:00
|
||||
"mempalace_nightly:150" # 2.5h — runs at 03:00
|
||||
"db_backup:150" # 2.5h — runs at 03:30
|
||||
"runner_health:15" # 15m — every 5 min
|
||||
)
|
||||
|
||||
NOW_EPOCH=$(date +%s)
|
||||
FAILURES=0
|
||||
|
||||
for entry in "${HEARTBEATS[@]}"; do
|
||||
name="${entry%%:*}"
|
||||
max_minutes="${entry##*:}"
|
||||
file="${HEARTBEAT_DIR}/${name}.last"
|
||||
|
||||
if [[ ! -f "$file" ]]; then
|
||||
log "MISSING: $name heartbeat file not found ($file)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
continue
|
||||
fi
|
||||
|
||||
LAST_EPOCH=$(stat -c %Y "$file")
|
||||
AGE_MIN=$(( (NOW_EPOCH - LAST_EPOCH) / 60 ))
|
||||
|
||||
if [[ $AGE_MIN -gt $max_minutes ]]; then
|
||||
log "STALE: $name is ${AGE_MIN}m old (max ${max_minutes}m)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
else
|
||||
log "OK: $name is ${AGE_MIN}m old"
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ $FAILURES -gt 0 ]]; then
|
||||
log "ALERT: $FAILURES stale/missing heartbeat(s) detected."
|
||||
exit 1
|
||||
else
|
||||
log "ALL_OK: All heartbeats healthy."
|
||||
fi
|
||||
70
scripts/review_gate.py
Normal file
70
scripts/review_gate.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Review Gate — Poka-yoke for unreviewed merges.
|
||||
Fails if the current PR has fewer than 1 approving review.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Review Approval Gate
|
||||
run: python scripts/review_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "")
|
||||
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
if not REPO:
|
||||
print("ERROR: GITEA_REPO not set")
|
||||
sys.exit(1)
|
||||
|
||||
pr_number = PR_NUMBER
|
||||
if not pr_number:
|
||||
# Try to infer from Gitea Actions environment
|
||||
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
||||
|
||||
if not pr_number:
|
||||
print("ERROR: Could not determine PR number")
|
||||
sys.exit(1)
|
||||
|
||||
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
|
||||
if isinstance(reviews, dict) and "error" in reviews:
|
||||
print(f"ERROR fetching reviews: {reviews}")
|
||||
sys.exit(1)
|
||||
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if len(approvals) >= 1:
|
||||
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
|
||||
print("Merges are not permitted without at least one approval.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
46
scripts/runner_health_probe.sh
Executable file
46
scripts/runner_health_probe.sh
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env bash
|
||||
# Gitea Runner Health Probe — Poka-yoke for unregistered runners
|
||||
set -euo pipefail
|
||||
|
||||
GITEA_TOKEN="${GITEA_TOKEN:-}"
|
||||
GITEA_URL="https://forge.alexanderwhitestone.com"
|
||||
ALERT_LOG="/var/log/bezalel_runner_health.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
if [[ -z "$GITEA_TOKEN" ]]; then
|
||||
log "ERROR: GITEA_TOKEN not set"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ACTIVE_RUNNERS=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
|
||||
log "Active runners: ${ACTIVE_RUNNERS}"
|
||||
|
||||
if [[ "$ACTIVE_RUNNERS" -eq 0 ]]; then
|
||||
log "CRITICAL: Zero active runners detected. Attempting self-healing restart."
|
||||
pkill -f "act_runner daemon" 2>/dev/null || true
|
||||
sleep 2
|
||||
cd /opt/gitea-runner && nohup ./act_runner daemon > /var/log/gitea-runner.log 2>&1 &
|
||||
sleep 3
|
||||
# Re-check
|
||||
ACTIVE_RUNNERS_AFTER=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
log "Active runners after restart: ${ACTIVE_RUNNERS_AFTER}"
|
||||
if [[ "$ACTIVE_RUNNERS_AFTER" -eq 0 ]]; then
|
||||
log "CRITICAL: Self-healing failed. Runner still offline."
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
exit 1
|
||||
else
|
||||
log "RECOVERED: Runner back online."
|
||||
fi
|
||||
else
|
||||
log "OK: ${ACTIVE_RUNNERS} runner(s) online."
|
||||
fi
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
50
scripts/secret_guard.sh
Executable file
50
scripts/secret_guard.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Secret Guard — Poka-yoke for world-readable credentials
|
||||
set -euo pipefail
|
||||
|
||||
ALERT_LOG="/var/log/bezalel_secret_guard.log"
|
||||
QUARANTINE_DIR="/root/wizards/bezalel/home/quarantine"
|
||||
|
||||
mkdir -p "$QUARANTINE_DIR"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
# Scan for world-readable files with sensitive keywords in /root, /home, /etc, /tmp, /var/log
|
||||
# Exclude binary files, large files (>1MB), and known safe paths
|
||||
BAD_FILES=$(find /root /home /etc /tmp /var/log -maxdepth 4 -type f -perm /o+r 2>/dev/null \
|
||||
! -path "*/.git/*" \
|
||||
! -path "*/node_modules/*" \
|
||||
! -path "*/venv/*" \
|
||||
! -path "*/.venv/*" \
|
||||
! -path "*/__pycache__/*" \
|
||||
! -path "*/.pyc" \
|
||||
! -size +1M \
|
||||
-exec grep -l -i -E 'password|token|secret|nsec|api_key|private_key|aws_access_key_id|aws_secret_access_key' {} + 2>/dev/null | head -50)
|
||||
|
||||
VIOLATIONS=0
|
||||
for file in $BAD_FILES; do
|
||||
# Skip if already quarantined
|
||||
if [[ "$file" == "$QUARANTINE_DIR"* ]]; then
|
||||
continue
|
||||
fi
|
||||
# Skip log files that are expected to be world-readable
|
||||
if [[ "$file" == /var/log/* ]]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
VIOLATIONS=$((VIOLATIONS + 1))
|
||||
basename=$(basename "$file")
|
||||
quarantine_path="${QUARANTINE_DIR}/${basename}.$(date +%s)"
|
||||
cp "$file" "$quarantine_path"
|
||||
chmod 600 "$quarantine_path"
|
||||
chmod 600 "$file"
|
||||
log "QUARANTINED: $file -> $quarantine_path (permissions fixed to 600)"
|
||||
done
|
||||
|
||||
if [[ $VIOLATIONS -gt 0 ]]; then
|
||||
log "ALERT: $VIOLATIONS world-readable secret file(s) detected and quarantined."
|
||||
else
|
||||
log "OK: No world-readable secret files found."
|
||||
fi
|
||||
77
scripts/staging_gate.py
Normal file
77
scripts/staging_gate.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Staging Gate — Poka-yoke for production deployments.
|
||||
Checks if the PR that introduced the current commit was marked `staging-verified`.
|
||||
Fails the workflow if not, blocking deploy.yml from proceeding.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Staging Verification Gate
|
||||
run: python scripts/staging_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_commit_sha():
|
||||
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def get_pr_for_commit(sha):
|
||||
# Search open and closed PRs for this commit
|
||||
for state in ["closed", "open"]:
|
||||
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
|
||||
if isinstance(prs, list):
|
||||
for pr in prs:
|
||||
if pr.get("merge_commit_sha") == sha:
|
||||
return pr
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
sha = get_commit_sha()
|
||||
pr = get_pr_for_commit(sha)
|
||||
|
||||
if not pr:
|
||||
# Direct push to main without PR — block unless explicitly forced
|
||||
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
|
||||
print("To bypass, merge via PR and add the 'staging-verified' label.")
|
||||
sys.exit(1)
|
||||
|
||||
labels = {label["name"] for label in pr.get("labels", [])}
|
||||
if "staging-verified" in labels:
|
||||
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
|
||||
print("Deploy to production is not permitted until staging is verified.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
81
scripts/sync_branch_protection.py
Normal file
81
scripts/sync_branch_protection.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
|
||||
Correctly uses the Gitea 1.25+ API (not GitHub-style).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
import yaml
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
CONFIG_DIR = ".gitea/branch-protection"
|
||||
|
||||
|
||||
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode() if payload else None
|
||||
req = urllib.request.Request(url, data=data, method=method, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def apply_protection(repo: str, rules: dict) -> bool:
|
||||
branch = rules.pop("branch", "main")
|
||||
# Check if protection already exists
|
||||
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
|
||||
exists = any(r.get("branch_name") == branch for r in existing)
|
||||
|
||||
payload = {
|
||||
"branch_name": branch,
|
||||
"rule_name": branch,
|
||||
"required_approvals": rules.get("required_approvals", 1),
|
||||
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
|
||||
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
|
||||
"block_deletions": rules.get("block_deletions", True),
|
||||
"block_force_push": rules.get("block_force_push", True),
|
||||
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
|
||||
"enable_status_check": rules.get("require_ci_to_merge", False),
|
||||
"status_check_contexts": rules.get("status_check_contexts", []),
|
||||
}
|
||||
|
||||
try:
|
||||
if exists:
|
||||
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
|
||||
else:
|
||||
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
|
||||
print(f"✅ {repo}:{branch} synced")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ {repo}:{branch} failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
return 1
|
||||
|
||||
ok = 0
|
||||
for fname in os.listdir(CONFIG_DIR):
|
||||
if not fname.endswith(".yml"):
|
||||
continue
|
||||
repo = fname[:-4]
|
||||
with open(os.path.join(CONFIG_DIR, fname)) as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
if apply_protection(repo, cfg.get("rules", {})):
|
||||
ok += 1
|
||||
|
||||
print(f"\nSynced {ok} repo(s)")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
30
scripts/sync_fleet_to_alpha.sh
Normal file
30
scripts/sync_fleet_to_alpha.sh
Normal file
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env bash
|
||||
# Sync Fleet MemPalace from Beta to Alpha
|
||||
# Usage: ./sync_fleet_to_alpha.sh
|
||||
set -euo pipefail
|
||||
|
||||
FLEET_DIR="/var/lib/mempalace/fleet"
|
||||
ALPHA_HOST="167.99.126.228"
|
||||
ALPHA_USER="root"
|
||||
ALPHA_DEST="/var/lib/mempalace/fleet"
|
||||
LOG="/var/log/bezalel_alpha_sync.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting fleet palace sync to Alpha (${ALPHA_HOST})..."
|
||||
|
||||
# Ensure Alpha destination exists (SSH must be configured key-based or agent-forwarded)
|
||||
ssh -o ConnectTimeout=10 "${ALPHA_USER}@${ALPHA_HOST}" "mkdir -p ${ALPHA_DEST}" || {
|
||||
log "ERROR: Cannot reach Alpha host. Aborting."
|
||||
exit 1
|
||||
}
|
||||
|
||||
# rsync the fleet palace directory (ChromaDB files + incoming closets)
|
||||
rsync -avz --delete \
|
||||
-e "ssh -o ConnectTimeout=10" \
|
||||
"${FLEET_DIR}/" \
|
||||
"${ALPHA_USER}@${ALPHA_HOST}:${ALPHA_DEST}/" >> "$LOG" 2>&1
|
||||
|
||||
log "Fleet palace sync complete."
|
||||
123
scripts/validate_mempalace_taxonomy.py
Normal file
123
scripts/validate_mempalace_taxonomy.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validate a wizard's mempalace.yaml against the fleet taxonomy standard.
|
||||
|
||||
Usage:
|
||||
python validate_mempalace_taxonomy.py /path/to/mempalace.yaml
|
||||
python validate_mempalace_taxonomy.py --ci /path/to/mempalace.yaml
|
||||
|
||||
Exit codes:
|
||||
0 = valid
|
||||
1 = missing required rooms or other violations
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
REQUIRED_ROOMS = {
|
||||
"forge",
|
||||
"hermes",
|
||||
"nexus",
|
||||
"issues",
|
||||
"experiments",
|
||||
}
|
||||
|
||||
|
||||
def load_standard():
|
||||
# Try to find the fleet standard in the-nexus clone or local path
|
||||
candidates = [
|
||||
Path(__file__).parent.parent / "mempalace_taxonomy.yaml",
|
||||
Path("/tmp/nexus_clone/docs/mempalace_taxonomy.yaml"),
|
||||
Path(__file__).parent.parent.parent / "the-nexus" / "docs" / "mempalace_taxonomy.yaml",
|
||||
]
|
||||
for c in candidates:
|
||||
if c.exists():
|
||||
with open(c) as f:
|
||||
return yaml.safe_load(f)
|
||||
return None
|
||||
|
||||
|
||||
def validate(path: Path):
|
||||
errors = []
|
||||
warnings = []
|
||||
|
||||
if not path.exists():
|
||||
errors.append(f"File not found: {path}")
|
||||
return errors, warnings
|
||||
|
||||
with open(path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data:
|
||||
errors.append("Empty or invalid YAML")
|
||||
return errors, warnings
|
||||
|
||||
rooms = data.get("rooms", data.get("wings", {}).get("bezalel", {}).get("rooms", []))
|
||||
if isinstance(rooms, list) and rooms and isinstance(rooms[0], dict):
|
||||
room_names = {r.get("name") for r in rooms if isinstance(r, dict)}
|
||||
elif isinstance(rooms, dict):
|
||||
room_names = set(rooms.keys())
|
||||
else:
|
||||
room_names = set()
|
||||
|
||||
missing = REQUIRED_ROOMS - room_names
|
||||
if missing:
|
||||
errors.append(f"Missing required rooms: {', '.join(sorted(missing))}")
|
||||
|
||||
# Check for duplicate room names
|
||||
if len(room_names) < len(list(rooms) if isinstance(rooms, list) else rooms):
|
||||
errors.append("Duplicate room names detected")
|
||||
|
||||
# Check for empty keywords
|
||||
if isinstance(rooms, list):
|
||||
for r in rooms:
|
||||
if isinstance(r, dict):
|
||||
kw = r.get("keywords", [])
|
||||
if not kw:
|
||||
warnings.append(f"Room '{r.get('name')}' has no keywords")
|
||||
|
||||
standard = load_standard()
|
||||
if standard:
|
||||
std_optional = set(standard.get("optional_rooms", {}).keys())
|
||||
unknown = room_names - REQUIRED_ROOMS - std_optional
|
||||
if unknown:
|
||||
warnings.append(f"Non-standard rooms (OK but not in fleet spec): {', '.join(sorted(unknown))}")
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Validate MemPalace taxonomy")
|
||||
parser.add_argument("config", help="Path to mempalace.yaml")
|
||||
parser.add_argument("--ci", action="store_true", help="CI mode: fail on warnings too")
|
||||
args = parser.parse_args()
|
||||
|
||||
errors, warnings = validate(Path(args.config))
|
||||
|
||||
if warnings:
|
||||
for w in warnings:
|
||||
print(f"WARNING: {w}")
|
||||
|
||||
if errors:
|
||||
for e in errors:
|
||||
print(f"ERROR: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if args.ci and warnings:
|
||||
print("Validation failed in CI mode (warnings treated as errors)")
|
||||
sys.exit(1)
|
||||
|
||||
print("OK: Taxonomy validation passed")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "forge",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "CI pipeline green on main. All 253 tests passing.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
},
|
||||
{
|
||||
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "hermes",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
|
||||
"source_file": "hermes.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "issues",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
|
||||
"source_file": "issues.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
239
tests/test_mempalace_fleet_api.py
Normal file
239
tests/test_mempalace_fleet_api.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import handler directly so we can test without running a server process.
|
||||
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeSocket:
|
||||
"""Minimal socket stub for BaseHTTPRequestHandler."""
|
||||
|
||||
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
|
||||
return io.BytesIO(b"")
|
||||
|
||||
|
||||
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
|
||||
"""Construct a handler pointed at *path*, capture wfile output."""
|
||||
buf = io.BytesIO()
|
||||
request = _FakeSocket()
|
||||
client_address = ("127.0.0.1", 0)
|
||||
|
||||
handler = FleetAPIHandler.__new__(FleetAPIHandler)
|
||||
handler.path = path
|
||||
handler.request = request
|
||||
handler.client_address = client_address
|
||||
handler.server = MagicMock()
|
||||
handler.wfile = buf
|
||||
handler.rfile = io.BytesIO(b"")
|
||||
handler.command = "GET"
|
||||
handler._headers_buffer = []
|
||||
|
||||
# Stub send_response / send_header / end_headers to write minimal HTTP
|
||||
handler._response_code = None
|
||||
def _send_response(code, message=None): # noqa: ANN001
|
||||
handler._response_code = code
|
||||
def _send_header(k, v): # noqa: ANN001
|
||||
pass
|
||||
def _end_headers(): # noqa: ANN001
|
||||
pass
|
||||
handler.send_response = _send_response
|
||||
handler.send_header = _send_header
|
||||
handler.end_headers = _end_headers
|
||||
|
||||
return handler, buf
|
||||
|
||||
|
||||
def _parse_response(buf: io.BytesIO) -> dict:
|
||||
buf.seek(0)
|
||||
return json.loads(buf.read())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /health
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_health_returns_ok(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is True
|
||||
|
||||
|
||||
def test_health_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_search_fleet(results):
|
||||
"""Return a patch target that returns *results*."""
|
||||
mock = MagicMock(return_value=results)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
|
||||
from nexus.mempalace.searcher import MemPalaceResult
|
||||
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
|
||||
|
||||
|
||||
def test_search_missing_q_param():
|
||||
handler, buf = _make_handler("/search")
|
||||
_handle_search(handler, {})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_returns_results(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "chroma.sqlite3").touch()
|
||||
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
|
||||
|
||||
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
|
||||
handler, buf = _make_handler("/search?q=CI")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]):
|
||||
import importlib
|
||||
import mempalace.fleet_api as api_module
|
||||
# Patch search_fleet inside the handler's import context
|
||||
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
|
||||
_handle_search(handler, {"q": ["CI"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert data["count"] == 1
|
||||
assert data["results"][0]["text"] == "CI green"
|
||||
assert data["results"][0]["room"] == "forge"
|
||||
assert data["results"][0]["wing"] == "bezalel"
|
||||
assert data["results"][0]["score"] == 0.95
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_search_with_room_filter(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
result = _make_result()
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
|
||||
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
|
||||
|
||||
# Verify room was passed through
|
||||
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
|
||||
|
||||
|
||||
def test_search_invalid_n_param():
|
||||
handler, buf = _make_handler("/search?q=test&n=bad")
|
||||
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_palace_unavailable(monkeypatch):
|
||||
from nexus.mempalace.searcher import MemPalaceUnavailable
|
||||
|
||||
handler, buf = _make_handler("/search?q=test")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
|
||||
_handle_search(handler, {"q": ["test"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
def test_search_n_clamped_to_max():
|
||||
"""n > MAX_RESULTS is silently clamped."""
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
|
||||
handler = MagicMock()
|
||||
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
|
||||
|
||||
mock_sf.assert_called_once_with("test", room=None, n_results=50)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /wings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_wings_returns_list(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "bezalel").mkdir()
|
||||
(tmp_path / "timmy").mkdir()
|
||||
# A file should not appear in wings
|
||||
(tmp_path / "README.txt").touch()
|
||||
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert set(data["wings"]) == {"bezalel", "timmy"}
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_wings_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 404 unknown endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_unknown_endpoint():
|
||||
handler, buf = _make_handler("/foobar")
|
||||
handler.do_GET()
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 404
|
||||
assert "/search" in data["endpoints"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# audit fixture smoke test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_audit_fixture_is_clean():
|
||||
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
|
||||
from mempalace.audit_privacy import audit_palace
|
||||
|
||||
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
|
||||
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
|
||||
|
||||
result = audit_palace(fixture_dir)
|
||||
assert result.clean, (
|
||||
f"Privacy violations found in CI fixture:\n" +
|
||||
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
|
||||
)
|
||||
Reference in New Issue
Block a user