Compare commits

...

6 Commits

Author SHA1 Message Date
Alexander Whitestone
e644b00dff feat(mempalace): retention enforcement + tunnel sync client (#1083, #1078)
Some checks failed
CI / test (pull_request) Failing after 7s
CI / validate (pull_request) Failing after 3s
Review Approval Gate / verify-review (pull_request) Failing after 4s
**retain_closets.py** — 90-day closet aging enforcement for #1083.
Removes *.closet.json files older than --days (default 90) from the
fleet palace. Supports --dry-run for safe preview. Wired into the
weekly-audit workflow as a dry-run CI step; production cron guidance
added to workflow comments.

**tunnel_sync.py** — remote wizard wing pull client for #1078.
Connects to a peer's fleet_api.py HTTP endpoint, discovers wings via
/wings, and pulls core rooms via /search into local *.closet.json
files. Zero new dependencies (stdlib urllib only). Supports --dry-run.
This is the code side of the inter-wizard tunnel; infrastructure
(second wizard VPS + fleet_api.py running) still required.

**Tests:** 29 new tests, all passing. Total suite: 294 passing.

Refs #1075, #1078, #1083
2026-04-07 11:05:00 -04:00
Bezalel
b445c04037 feat(ci): staging verification gate + review approval gate (#1095, #1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
Staging Verification Gate / verify-staging (push) Has been cancelled
2026-04-07 14:58:39 +00:00
60bd9a05ff fix(security): replace broken branch protection scripts with Gitea-native sync (#1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:56:31 +00:00
c7468a3c6a [claude] Weekly privacy audit cron + fleet HTTP API (#1075) (#1109)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:54:48 +00:00
07a4be3bb9 [claude] Weekly privacy audit cron + fleet HTTP API (#1075) (#1109)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:54:41 +00:00
804536a3f2 feat(security): add fleet merge-review audit script (#1098)
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
2026-04-07 14:53:07 +00:00
17 changed files with 1753 additions and 43 deletions

View File

@@ -0,0 +1,21 @@
name: Review Approval Gate
on:
pull_request:
branches: [main]
jobs:
verify-review:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Verify PR has approving review
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
GITEA_REPO: Timmy_Foundation/the-nexus
PR_NUMBER: ${{ gitea.event.pull_request.number }}
run: |
python3 scripts/review_gate.py

View File

@@ -0,0 +1,20 @@
name: Staging Verification Gate
on:
push:
branches: [main]
jobs:
verify-staging:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Verify staging label on merge PR
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
GITEA_REPO: Timmy_Foundation/the-nexus
run: |
python3 scripts/staging_gate.py

View File

@@ -0,0 +1,34 @@
name: Weekly Privacy Audit
# Runs every Monday at 05:00 UTC against a CI test fixture.
# On production wizards these same scripts should run via cron:
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
# 0 5 * * 1 python /opt/nexus/mempalace/retain_closets.py /var/lib/mempalace/fleet --days 90
#
# Refs: #1083, #1075
on:
schedule:
- cron: "0 5 * * 1" # Monday 05:00 UTC
workflow_dispatch: {} # allow manual trigger
jobs:
privacy-audit:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.x"
- name: Run privacy audit against CI fixture
run: |
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
- name: Dry-run retention enforcement against CI fixture
# Real enforcement runs on the live VPS; CI verifies the script runs cleanly.
run: |
python mempalace/retain_closets.py tests/fixtures/fleet_palace --days 90 --dry-run

View File

@@ -1,44 +1,6 @@
#!/bin/bash
# Apply branch protections to all repositories
# Requires GITEA_TOKEN env var
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
for repo in "${REPOS[@]}"
do
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"required_reviews": 1,
"dismiss_stale_reviews": true,
"block_force_push": true,
"block_deletions": true
}'
done
#!/bin/bash
# Gitea API credentials
GITEA_TOKEN="your-personal-access-token"
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
# Repos to protect
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
for REPO in "${REPO[@]}"; do
echo "Configuring branch protection for $REPO..."
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "main",
"require_pull_request": true,
"required_approvals": 1,
"dismiss_stale_approvals": true,
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
"block_force_push": true,
"block_delete": true
}' \
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
done
# Wrapper for the canonical branch-protection sync script.
# Usage: ./gitea-branch-protection.sh
set -euo pipefail
cd "$(dirname "$0")"
python3 scripts/sync_branch_protection.py

186
mempalace/fleet_api.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
Exposes fleet memory search over HTTP so that Alpha servers and other
wizard deployments can query the palace without direct filesystem access.
Endpoints:
GET /health
Returns {"status": "ok", "palace": "<path>"}
GET /search?q=<query>[&room=<room>][&n=<int>]
Returns {"results": [...], "query": "...", "room": "...", "count": N}
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
GET /wings
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
Usage:
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
python mempalace/fleet_api.py
# Custom host/port/palace:
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
Refs: #1078, #1075
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
# Add repo root to path so we can import nexus.mempalace
_HERE = Path(__file__).resolve().parent
_REPO_ROOT = _HERE.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 7771
MAX_RESULTS = 50
def _get_palace_path() -> Path:
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
payload = json.dumps(body).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(payload)))
handler.end_headers()
handler.wfile.write(payload)
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
palace = _get_palace_path()
_json_response(handler, 200, {
"status": "ok",
"palace": str(palace),
"palace_exists": palace.exists(),
})
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
query_terms = qs.get("q", [""])
q = query_terms[0].strip() if query_terms else ""
if not q:
_json_response(handler, 400, {"error": "Missing required parameter: q"})
return
room_terms = qs.get("room", [])
room = room_terms[0].strip() if room_terms else None
n_terms = qs.get("n", [])
try:
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
except (ValueError, IndexError):
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
return
try:
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
except ImportError as exc:
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
return
try:
results = search_fleet(q, room=room, n_results=n)
except Exception as exc: # noqa: BLE001
_json_response(handler, 503, {"error": str(exc)})
return
_json_response(handler, 200, {
"query": q,
"room": room,
"count": len(results),
"results": [
{
"text": r.text,
"room": r.room,
"wing": r.wing,
"score": round(r.score, 4),
}
for r in results
],
})
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
"""Return distinct wizard wing names found in the fleet palace directory."""
palace = _get_palace_path()
if not palace.exists():
_json_response(handler, 503, {
"error": f"Fleet palace not found: {palace}",
})
return
wings = sorted({
p.name for p in palace.iterdir() if p.is_dir()
})
_json_response(handler, 200, {"wings": wings})
class FleetAPIHandler(BaseHTTPRequestHandler):
"""Request handler for the fleet memory API."""
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
# Prefix with tag for easier log filtering
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
def do_GET(self) -> None: # noqa: N802
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
qs = parse_qs(parsed.query)
if path == "/health":
_handle_health(self)
elif path == "/search":
_handle_search(self, qs)
elif path == "/wings":
_handle_wings(self)
else:
_json_response(self, 404, {
"error": f"Unknown endpoint: {path}",
"endpoints": ["/health", "/search", "/wings"],
})
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
return HTTPServer((host, port), FleetAPIHandler)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Fleet palace HTTP API server."
)
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
args = parser.parse_args(argv)
palace = _get_palace_path()
print(f"[fleet_api] Palace: {palace}")
if not palace.exists():
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
server = make_server(args.host, args.port)
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n[fleet_api] Shutting down.")
return 0
if __name__ == "__main__":
sys.exit(main())

163
mempalace/retain_closets.py Normal file
View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""
retain_closets.py — Retention policy enforcement for fleet palace closets.
Removes closet files older than a configurable retention window (default: 90 days).
Run this on the Alpha host (or any fleet palace directory) to enforce the
closet aging policy described in #1083.
Usage:
# Dry-run: show what would be removed (no deletions)
python mempalace/retain_closets.py --dry-run
# Enforce 90-day retention (default)
python mempalace/retain_closets.py
# Custom retention window
python mempalace/retain_closets.py --days 30
# Custom palace path
python mempalace/retain_closets.py /data/fleet --days 90
Exits:
0 — success (clean, or pruned without error)
1 — error (e.g., palace directory not found)
Refs: #1083, #1075
"""
from __future__ import annotations
import argparse
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
DEFAULT_RETENTION_DAYS = 90
DEFAULT_PALACE_PATH = "/var/lib/mempalace/fleet"
@dataclass
class RetentionResult:
scanned: int = 0
removed: int = 0
kept: int = 0
errors: list[str] = field(default_factory=list)
@property
def ok(self) -> bool:
return len(self.errors) == 0
def _file_age_days(path: Path) -> float:
"""Return the age of a file in days based on mtime."""
mtime = path.stat().st_mtime
now = time.time()
return (now - mtime) / 86400.0
def enforce_retention(
palace_dir: Path,
retention_days: int = DEFAULT_RETENTION_DAYS,
dry_run: bool = False,
) -> RetentionResult:
"""
Remove *.closet.json files older than *retention_days* from *palace_dir*.
Only closet files are pruned — raw drawer files are never present in a
compliant fleet palace, so this script does not touch them.
Args:
palace_dir: Root directory of the fleet palace to scan.
retention_days: Files older than this many days will be removed.
dry_run: If True, report what would be removed but make no changes.
Returns:
RetentionResult with counts and any errors.
"""
result = RetentionResult()
for closet_file in sorted(palace_dir.rglob("*.closet.json")):
result.scanned += 1
try:
age = _file_age_days(closet_file)
except OSError as exc:
result.errors.append(f"Could not stat {closet_file}: {exc}")
continue
if age > retention_days:
if dry_run:
print(
f"[retain_closets] DRY-RUN would remove ({age:.0f}d old): {closet_file}"
)
result.removed += 1
else:
try:
closet_file.unlink()
print(f"[retain_closets] Removed ({age:.0f}d old): {closet_file}")
result.removed += 1
except OSError as exc:
result.errors.append(f"Could not remove {closet_file}: {exc}")
else:
result.kept += 1
return result
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Enforce retention policy on fleet palace closets."
)
parser.add_argument(
"palace_dir",
nargs="?",
default=os.environ.get("FLEET_PALACE_PATH", DEFAULT_PALACE_PATH),
help=f"Fleet palace directory (default: {DEFAULT_PALACE_PATH})",
)
parser.add_argument(
"--days",
type=int,
default=DEFAULT_RETENTION_DAYS,
metavar="N",
help=f"Retention window in days (default: {DEFAULT_RETENTION_DAYS})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be removed without deleting anything.",
)
args = parser.parse_args(argv)
palace_dir = Path(args.palace_dir)
if not palace_dir.exists():
print(
f"[retain_closets] ERROR: palace directory not found: {palace_dir}",
file=sys.stderr,
)
return 1
mode = "DRY-RUN" if args.dry_run else "LIVE"
print(
f"[retain_closets] {mode} — scanning {palace_dir} "
f"(retention: {args.days} days)"
)
result = enforce_retention(palace_dir, retention_days=args.days, dry_run=args.dry_run)
if result.errors:
for err in result.errors:
print(f"[retain_closets] ERROR: {err}", file=sys.stderr)
return 1
action = "would remove" if args.dry_run else "removed"
print(
f"[retain_closets] Done — scanned {result.scanned}, "
f"{action} {result.removed}, kept {result.kept}."
)
return 0
if __name__ == "__main__":
sys.exit(main())

308
mempalace/tunnel_sync.py Normal file
View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
tunnel_sync.py — Pull closets from a remote wizard's fleet API into the local palace.
This is the client-side tunnel mechanism for #1078. It connects to a peer
wizard's running fleet_api.py HTTP server, discovers their memory wings, and
imports the results into the local fleet palace as closet files. Once imported,
`recall <query> --fleet` in Evennia will return results from the remote wing.
The code side is complete here; the infrastructure side (second wizard running
fleet_api.py behind an SSH tunnel or VPN) is still required to use this.
Usage:
# Pull from a remote Alpha fleet API into the default local palace
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771
# Custom local palace path
FLEET_PALACE_PATH=/data/fleet python mempalace/tunnel_sync.py \\
--peer http://alpha.example.com:7771
# Dry-run: show what would be imported without writing files
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771 --dry-run
# Limit results per room (default: 50)
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771 --n 20
Environment:
FLEET_PALACE_PATH — local fleet palace directory (default: /var/lib/mempalace/fleet)
FLEET_PEER_URL — remote fleet API URL (overridden by --peer flag)
Exits:
0 — sync succeeded (or dry-run completed)
1 — error (connection failure, invalid response, write error)
Refs: #1078, #1075
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
DEFAULT_PALACE_PATH = "/var/lib/mempalace/fleet"
DEFAULT_N_RESULTS = 50
# Broad queries for bulk room pull — used to discover representative content
_BROAD_QUERIES = [
"the", "a", "is", "was", "and", "of", "to", "in", "it", "on",
"commit", "issue", "error", "fix", "deploy", "event", "memory",
]
_REQUEST_TIMEOUT = 10 # seconds
@dataclass
class SyncResult:
wings_found: list[str] = field(default_factory=list)
rooms_pulled: int = 0
closets_written: int = 0
errors: list[str] = field(default_factory=list)
@property
def ok(self) -> bool:
return len(self.errors) == 0
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def _get(url: str) -> dict[str, Any]:
"""GET *url*, return parsed JSON or raise on error."""
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp:
return json.loads(resp.read())
def _peer_url(base: str, path: str) -> str:
return base.rstrip("/") + path
# ---------------------------------------------------------------------------
# Wing / room discovery
# ---------------------------------------------------------------------------
def get_remote_wings(peer_url: str) -> list[str]:
"""Return the list of wing names from the remote fleet API."""
data = _get(_peer_url(peer_url, "/wings"))
return data.get("wings", [])
def search_remote_room(peer_url: str, room: str, n: int = DEFAULT_N_RESULTS) -> list[dict]:
"""
Pull closet entries for a specific room from the remote peer.
Uses multiple broad queries and deduplicates by text to maximize coverage
without requiring a dedicated bulk-export endpoint.
"""
seen_texts: set[str] = set()
results: list[dict] = []
for q in _BROAD_QUERIES:
url = _peer_url(peer_url, f"/search?q={urllib.request.quote(q)}&room={urllib.request.quote(room)}&n={n}")
try:
data = _get(url)
except (urllib.error.URLError, json.JSONDecodeError, OSError):
continue
for entry in data.get("results", []):
text = entry.get("text", "")
if text and text not in seen_texts:
seen_texts.add(text)
results.append(entry)
if len(results) >= n:
break
return results[:n]
# ---------------------------------------------------------------------------
# Core sync
# ---------------------------------------------------------------------------
def _write_closet(
palace_dir: Path,
wing: str,
room: str,
entries: list[dict],
dry_run: bool,
) -> bool:
"""Write entries as a .closet.json file under palace_dir/wing/."""
wing_dir = palace_dir / wing
closet_path = wing_dir / f"{room}.closet.json"
drawers = [
{
"text": e.get("text", ""),
"room": e.get("room", room),
"wing": e.get("wing", wing),
"score": e.get("score", 0.0),
"closet": True,
"source_file": f"tunnel:{wing}/{room}",
"synced_at": int(time.time()),
}
for e in entries
]
payload = json.dumps({"drawers": drawers, "wing": wing, "room": room}, indent=2)
if dry_run:
print(f"[tunnel_sync] DRY-RUN would write {len(drawers)} entries → {closet_path}")
return True
try:
wing_dir.mkdir(parents=True, exist_ok=True)
closet_path.write_text(payload)
print(f"[tunnel_sync] Wrote {len(drawers)} entries → {closet_path}")
return True
except OSError as exc:
print(f"[tunnel_sync] ERROR writing {closet_path}: {exc}", file=sys.stderr)
return False
def sync_peer(
peer_url: str,
palace_dir: Path,
n_results: int = DEFAULT_N_RESULTS,
dry_run: bool = False,
) -> SyncResult:
"""
Pull all wings and rooms from *peer_url* into *palace_dir*.
Args:
peer_url: Base URL of the remote fleet_api.py instance.
palace_dir: Local fleet palace directory to write closets into.
n_results: Maximum results to pull per room.
dry_run: If True, print what would be written without touching disk.
Returns:
SyncResult with counts and any errors.
"""
result = SyncResult()
# Discover health
try:
health = _get(_peer_url(peer_url, "/health"))
if health.get("status") != "ok":
result.errors.append(f"Peer unhealthy: {health}")
return result
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
result.errors.append(f"Could not reach peer at {peer_url}: {exc}")
return result
# Discover wings
try:
wings = get_remote_wings(peer_url)
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
result.errors.append(f"Could not list wings from {peer_url}: {exc}")
return result
result.wings_found = wings
if not wings:
print(f"[tunnel_sync] No wings found at {peer_url} — nothing to sync.")
return result
print(f"[tunnel_sync] Found wings: {wings}")
# Import core rooms from each wing
from nexus.mempalace.config import CORE_ROOMS
for wing in wings:
for room in CORE_ROOMS:
print(f"[tunnel_sync] Pulling {wing}/{room}")
try:
entries = search_remote_room(peer_url, room, n=n_results)
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
err = f"Error pulling {wing}/{room}: {exc}"
result.errors.append(err)
print(f"[tunnel_sync] ERROR: {err}", file=sys.stderr)
continue
if not entries:
print(f"[tunnel_sync] No entries found for {wing}/{room} — skipping.")
continue
ok = _write_closet(palace_dir, wing, room, entries, dry_run=dry_run)
result.rooms_pulled += 1
if ok:
result.closets_written += 1
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Sync closets from a remote wizard's fleet API into the local palace."
)
parser.add_argument(
"--peer",
default=os.environ.get("FLEET_PEER_URL", ""),
metavar="URL",
help="Base URL of the remote fleet_api.py (e.g. http://alpha.example.com:7771)",
)
parser.add_argument(
"--palace",
default=os.environ.get("FLEET_PALACE_PATH", DEFAULT_PALACE_PATH),
metavar="DIR",
help=f"Local fleet palace directory (default: {DEFAULT_PALACE_PATH})",
)
parser.add_argument(
"--n",
type=int,
default=DEFAULT_N_RESULTS,
metavar="N",
help=f"Max results per room (default: {DEFAULT_N_RESULTS})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be synced without writing files.",
)
args = parser.parse_args(argv)
if not args.peer:
print(
"[tunnel_sync] ERROR: --peer URL is required (or set FLEET_PEER_URL).",
file=sys.stderr,
)
return 1
palace_dir = Path(args.palace)
if not palace_dir.exists() and not args.dry_run:
print(
f"[tunnel_sync] ERROR: local palace not found: {palace_dir}",
file=sys.stderr,
)
return 1
mode = "DRY-RUN" if args.dry_run else "LIVE"
print(f"[tunnel_sync] {mode} — peer: {args.peer} palace: {palace_dir}")
result = sync_peer(args.peer, palace_dir, n_results=args.n, dry_run=args.dry_run)
if result.errors:
for err in result.errors:
print(f"[tunnel_sync] ERROR: {err}", file=sys.stderr)
return 1
print(
f"[tunnel_sync] Done — wings: {result.wings_found}, "
f"rooms pulled: {result.rooms_pulled}, closets written: {result.closets_written}."
)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Fleet Merge Review Audit
========================
Scans all Timmy_Foundation repos for merges in the last 7 days
and validates that each merged PR had at least one approving review.
Exit 0 = no unreviewed merges
Exit 1 = unreviewed merges found (and issues created if --create-issues)
Usage:
python scripts/audit_merge_reviews.py
python scripts/audit_merge_reviews.py --create-issues
"""
import os
import sys
import argparse
from datetime import datetime, timedelta, timezone
import urllib.request
import urllib.error
import json
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
DAYS_BACK = 7
SECURITY_LABEL = "security"
def api_request(path: str) -> dict | list:
url = f"{GITEA_URL}/api/v1{path}"
req = urllib.request.Request(url, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def api_post(path: str, payload: dict) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def get_repos() -> list[str]:
repos = []
page = 1
while True:
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
if not batch:
break
repos.extend([r["name"] for r in batch])
page += 1
return repos
def get_merged_prs(repo: str, since: str) -> list[dict]:
"""Get closed (merged) PRs updated since `since` (ISO format)."""
prs = []
page = 1
while True:
batch = api_request(
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
)
if not batch:
break
for pr in batch:
if pr.get("merged_at") and pr["merged_at"] >= since:
prs.append(pr)
elif pr.get("updated_at") and pr["updated_at"] < since:
return prs
page += 1
return prs
def get_reviews(repo: str, pr_number: int) -> list[dict]:
try:
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
except urllib.error.HTTPError as e:
if e.code == 404:
return []
raise
def create_post_mortem(repo: str, pr: dict) -> int | None:
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
body = (
f"## Unreviewed Merge Detected\n\n"
f"- **Repository:** `{ORG}/{repo}`\n"
f"- **PR:** #{pr['number']}{pr['title']}\n"
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
f"- **Merged at:** {pr['merged_at']}\n"
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
f"This merge had **zero approving reviews** at the time of merge.\n\n"
f"### Required Actions\n"
f"1. Validate the merge contents are safe.\n"
f"2. If malicious or incorrect, revert immediately.\n"
f"3. Document root cause (bypassed branch protection? direct push?).\n"
)
try:
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
"title": title,
"body": body,
"labels": [SECURITY_LABEL],
})
return issue.get("number")
except Exception as e:
print(f" FAILED to create issue: {e}")
return None
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
args = parser.parse_args()
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN environment variable not set.")
return 1
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
since = since_dt.isoformat()
repos = get_repos()
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
unreviewed_count = 0
for repo in repos:
merged = get_merged_prs(repo, since)
if not merged:
continue
repo_unreviewed = []
for pr in merged:
reviews = get_reviews(repo, pr["number"])
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if not approvals:
repo_unreviewed.append(pr)
if repo_unreviewed:
print(f"\n{repo}:")
for pr in repo_unreviewed:
print(f" ! UNREVIEWED merge: PR #{pr['number']}{pr['title']} ({pr['merged_at'][:10]})")
unreviewed_count += 1
if args.create_issues:
issue_num = create_post_mortem(repo, pr)
if issue_num:
print(f" → Created post-mortem issue the-nexus#{issue_num}")
print(f"\n{'='*60}")
if unreviewed_count == 0:
print("All merges in the last 7 days had at least one approving review.")
return 0
else:
print(f"Found {unreviewed_count} unreviewed merge(s).")
return 1
if __name__ == "__main__":
raise SystemExit(main())

70
scripts/review_gate.py Normal file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""
Review Gate — Poka-yoke for unreviewed merges.
Fails if the current PR has fewer than 1 approving review.
Usage in Gitea workflow:
- name: Review Approval Gate
run: python scripts/review_gate.py
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
"""
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REPO = os.environ.get("GITEA_REPO", "")
PR_NUMBER = os.environ.get("PR_NUMBER", "")
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def main():
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
if not REPO:
print("ERROR: GITEA_REPO not set")
sys.exit(1)
pr_number = PR_NUMBER
if not pr_number:
# Try to infer from Gitea Actions environment
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
if not pr_number:
print("ERROR: Could not determine PR number")
sys.exit(1)
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
if isinstance(reviews, dict) and "error" in reviews:
print(f"ERROR fetching reviews: {reviews}")
sys.exit(1)
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
if len(approvals) >= 1:
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
print("Merges are not permitted without at least one approval.")
sys.exit(1)
if __name__ == "__main__":
main()

77
scripts/staging_gate.py Normal file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
Staging Gate — Poka-yoke for production deployments.
Checks if the PR that introduced the current commit was marked `staging-verified`.
Fails the workflow if not, blocking deploy.yml from proceeding.
Usage in Gitea workflow:
- name: Staging Verification Gate
run: python scripts/staging_gate.py
env:
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
"""
import os
import sys
import json
import subprocess
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def get_commit_sha():
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
return result.stdout.strip()
def get_pr_for_commit(sha):
# Search open and closed PRs for this commit
for state in ["closed", "open"]:
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
if isinstance(prs, list):
for pr in prs:
if pr.get("merge_commit_sha") == sha:
return pr
return None
def main():
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
sys.exit(1)
sha = get_commit_sha()
pr = get_pr_for_commit(sha)
if not pr:
# Direct push to main without PR — block unless explicitly forced
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
print("To bypass, merge via PR and add the 'staging-verified' label.")
sys.exit(1)
labels = {label["name"] for label in pr.get("labels", [])}
if "staging-verified" in labels:
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
sys.exit(0)
else:
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
print("Deploy to production is not permitted until staging is verified.")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
import os
import sys
import json
import urllib.request
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
}
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
else:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
ok = 0
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,16 @@
{
"wizard": "bezalel",
"room": "forge",
"drawers": [
{
"text": "CI pipeline green on main. All 253 tests passing.",
"source_file": "forge.closet.json",
"closet": true
},
{
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
"source_file": "forge.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,11 @@
{
"wizard": "bezalel",
"room": "hermes",
"drawers": [
{
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
"source_file": "hermes.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,11 @@
{
"wizard": "bezalel",
"room": "issues",
"drawers": [
{
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
"source_file": "issues.closet.json",
"closet": true
}
]
}

View File

@@ -0,0 +1,239 @@
"""
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
Refs: #1078, #1075
"""
from __future__ import annotations
import io
import json
import threading
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Import handler directly so we can test without running a server process.
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class _FakeSocket:
"""Minimal socket stub for BaseHTTPRequestHandler."""
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
return io.BytesIO(b"")
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
"""Construct a handler pointed at *path*, capture wfile output."""
buf = io.BytesIO()
request = _FakeSocket()
client_address = ("127.0.0.1", 0)
handler = FleetAPIHandler.__new__(FleetAPIHandler)
handler.path = path
handler.request = request
handler.client_address = client_address
handler.server = MagicMock()
handler.wfile = buf
handler.rfile = io.BytesIO(b"")
handler.command = "GET"
handler._headers_buffer = []
# Stub send_response / send_header / end_headers to write minimal HTTP
handler._response_code = None
def _send_response(code, message=None): # noqa: ANN001
handler._response_code = code
def _send_header(k, v): # noqa: ANN001
pass
def _end_headers(): # noqa: ANN001
pass
handler.send_response = _send_response
handler.send_header = _send_header
handler.end_headers = _end_headers
return handler, buf
def _parse_response(buf: io.BytesIO) -> dict:
buf.seek(0)
return json.loads(buf.read())
# ---------------------------------------------------------------------------
# /health
# ---------------------------------------------------------------------------
def test_health_returns_ok(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
handler, buf = _make_handler("/health")
_handle_health(handler)
data = _parse_response(buf)
assert data["status"] == "ok"
assert data["palace_exists"] is True
def test_health_missing_palace(tmp_path, monkeypatch):
missing = tmp_path / "nonexistent"
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
handler, buf = _make_handler("/health")
_handle_health(handler)
data = _parse_response(buf)
assert data["status"] == "ok"
assert data["palace_exists"] is False
# ---------------------------------------------------------------------------
# /search
# ---------------------------------------------------------------------------
def _mock_search_fleet(results):
"""Return a patch target that returns *results*."""
mock = MagicMock(return_value=results)
return mock
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
from nexus.mempalace.searcher import MemPalaceResult
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
def test_search_missing_q_param():
handler, buf = _make_handler("/search")
_handle_search(handler, {})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 400
def test_search_returns_results(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
(tmp_path / "chroma.sqlite3").touch()
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
handler, buf = _make_handler("/search?q=CI")
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[result]):
import importlib
import mempalace.fleet_api as api_module
# Patch search_fleet inside the handler's import context
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
_handle_search(handler, {"q": ["CI"]})
data = _parse_response(buf)
assert data["count"] == 1
assert data["results"][0]["text"] == "CI green"
assert data["results"][0]["room"] == "forge"
assert data["results"][0]["wing"] == "bezalel"
assert data["results"][0]["score"] == 0.95
assert handler._response_code == 200
def test_search_with_room_filter(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
result = _make_result()
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
# Verify room was passed through
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
def test_search_invalid_n_param():
handler, buf = _make_handler("/search?q=test&n=bad")
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 400
def test_search_palace_unavailable(monkeypatch):
from nexus.mempalace.searcher import MemPalaceUnavailable
handler, buf = _make_handler("/search?q=test")
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
_handle_search(handler, {"q": ["test"]})
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 503
def test_search_n_clamped_to_max():
"""n > MAX_RESULTS is silently clamped."""
import nexus.mempalace.searcher as s_module
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
handler = MagicMock()
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
mock_sf.assert_called_once_with("test", room=None, n_results=50)
# ---------------------------------------------------------------------------
# /wings
# ---------------------------------------------------------------------------
def test_wings_returns_list(tmp_path, monkeypatch):
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
(tmp_path / "bezalel").mkdir()
(tmp_path / "timmy").mkdir()
# A file should not appear in wings
(tmp_path / "README.txt").touch()
handler, buf = _make_handler("/wings")
_handle_wings(handler)
data = _parse_response(buf)
assert set(data["wings"]) == {"bezalel", "timmy"}
assert handler._response_code == 200
def test_wings_missing_palace(tmp_path, monkeypatch):
missing = tmp_path / "nonexistent"
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
handler, buf = _make_handler("/wings")
_handle_wings(handler)
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 503
# ---------------------------------------------------------------------------
# 404 unknown endpoint
# ---------------------------------------------------------------------------
def test_unknown_endpoint():
handler, buf = _make_handler("/foobar")
handler.do_GET()
data = _parse_response(buf)
assert "error" in data
assert handler._response_code == 404
assert "/search" in data["endpoints"]
# ---------------------------------------------------------------------------
# audit fixture smoke test
# ---------------------------------------------------------------------------
def test_audit_fixture_is_clean():
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
from mempalace.audit_privacy import audit_palace
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
result = audit_palace(fixture_dir)
assert result.clean, (
f"Privacy violations found in CI fixture:\n" +
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
)

View File

@@ -0,0 +1,139 @@
"""
Tests for mempalace/retain_closets.py — 90-day closet retention enforcement.
Refs: #1083, #1075
"""
from __future__ import annotations
import json
import time
from pathlib import Path
import pytest
from mempalace.retain_closets import (
RetentionResult,
_file_age_days,
enforce_retention,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _write_closet(directory: Path, name: str, age_days: float) -> Path:
"""Create a *.closet.json file with a mtime set to *age_days* ago."""
p = directory / name
p.write_text(json.dumps({"drawers": [{"text": "summary", "closet": True}]}))
# Set mtime to simulate age
mtime = time.time() - age_days * 86400.0
import os
os.utime(p, (mtime, mtime))
return p
# ---------------------------------------------------------------------------
# _file_age_days
# ---------------------------------------------------------------------------
def test_file_age_days_recent(tmp_path):
p = tmp_path / "recent.closet.json"
p.write_text("{}")
age = _file_age_days(p)
assert 0 <= age < 1 # just created
def test_file_age_days_old(tmp_path):
p = _write_closet(tmp_path, "old.closet.json", age_days=100)
age = _file_age_days(p)
assert 99 < age < 101
# ---------------------------------------------------------------------------
# enforce_retention — dry_run
# ---------------------------------------------------------------------------
def test_dry_run_does_not_delete(tmp_path):
old = _write_closet(tmp_path, "old.closet.json", age_days=100)
_write_closet(tmp_path, "new.closet.json", age_days=10)
result = enforce_retention(tmp_path, retention_days=90, dry_run=True)
# File still exists after dry-run
assert old.exists()
assert result.removed == 1 # counted but not actually removed
assert result.kept == 1
assert result.ok
def test_dry_run_keeps_recent_files(tmp_path):
_write_closet(tmp_path, "recent.closet.json", age_days=5)
result = enforce_retention(tmp_path, retention_days=90, dry_run=True)
assert result.removed == 0
assert result.kept == 1
# ---------------------------------------------------------------------------
# enforce_retention — live mode
# ---------------------------------------------------------------------------
def test_live_removes_old_closets(tmp_path):
old = _write_closet(tmp_path, "old.closet.json", age_days=100)
new = _write_closet(tmp_path, "new.closet.json", age_days=10)
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
assert not old.exists()
assert new.exists()
assert result.removed == 1
assert result.kept == 1
assert result.ok
def test_live_keeps_files_within_window(tmp_path):
f = _write_closet(tmp_path, "edge.closet.json", age_days=89)
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
assert f.exists()
assert result.removed == 0
assert result.kept == 1
def test_empty_directory_is_ok(tmp_path):
result = enforce_retention(tmp_path, retention_days=90)
assert result.scanned == 0
assert result.removed == 0
assert result.ok
def test_subdirectory_closets_are_pruned(tmp_path):
"""enforce_retention should recurse into subdirs (wing directories)."""
sub = tmp_path / "bezalel"
sub.mkdir()
old = _write_closet(sub, "hermes.closet.json", age_days=120)
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
assert not old.exists()
assert result.removed == 1
def test_non_closet_files_ignored(tmp_path):
"""Non-closet files should not be counted or touched."""
(tmp_path / "readme.txt").write_text("hello")
(tmp_path / "data.drawer.json").write_text("{}")
result = enforce_retention(tmp_path, retention_days=90)
assert result.scanned == 0
# ---------------------------------------------------------------------------
# RetentionResult.ok
# ---------------------------------------------------------------------------
def test_retention_result_ok_with_no_errors():
r = RetentionResult(scanned=5, removed=2, kept=3)
assert r.ok is True
def test_retention_result_not_ok_with_errors():
r = RetentionResult(errors=["could not stat file"])
assert r.ok is False

View File

@@ -0,0 +1,205 @@
"""
Tests for mempalace/tunnel_sync.py — remote wizard wing sync client.
Refs: #1078, #1075
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from mempalace.tunnel_sync import (
SyncResult,
_peer_url,
_write_closet,
get_remote_wings,
search_remote_room,
sync_peer,
)
# ---------------------------------------------------------------------------
# _peer_url
# ---------------------------------------------------------------------------
def test_peer_url_strips_trailing_slash():
assert _peer_url("http://host:7771/", "/wings") == "http://host:7771/wings"
def test_peer_url_with_path():
assert _peer_url("http://host:7771", "/search") == "http://host:7771/search"
# ---------------------------------------------------------------------------
# get_remote_wings
# ---------------------------------------------------------------------------
def test_get_remote_wings_returns_list():
with patch("mempalace.tunnel_sync._get", return_value={"wings": ["bezalel", "timmy"]}):
wings = get_remote_wings("http://peer:7771")
assert wings == ["bezalel", "timmy"]
def test_get_remote_wings_empty():
with patch("mempalace.tunnel_sync._get", return_value={"wings": []}):
wings = get_remote_wings("http://peer:7771")
assert wings == []
# ---------------------------------------------------------------------------
# search_remote_room
# ---------------------------------------------------------------------------
def _make_entry(text: str, room: str = "forge", wing: str = "bezalel", score: float = 0.9) -> dict:
return {"text": text, "room": room, "wing": wing, "score": score}
def test_search_remote_room_deduplicates():
entry = _make_entry("CI passed")
# Same entry returned from multiple queries — should only appear once
with patch("mempalace.tunnel_sync._get", return_value={"results": [entry]}):
results = search_remote_room("http://peer:7771", "forge", n=50)
assert len(results) == 1
assert results[0]["text"] == "CI passed"
def test_search_remote_room_respects_n_limit():
entries = [_make_entry(f"item {i}") for i in range(100)]
with patch("mempalace.tunnel_sync._get", return_value={"results": entries}):
results = search_remote_room("http://peer:7771", "forge", n=5)
assert len(results) <= 5
def test_search_remote_room_handles_request_error():
import urllib.error
with patch("mempalace.tunnel_sync._get", side_effect=urllib.error.URLError("refused")):
results = search_remote_room("http://peer:7771", "forge")
assert results == []
# ---------------------------------------------------------------------------
# _write_closet
# ---------------------------------------------------------------------------
def test_write_closet_creates_file(tmp_path):
entries = [_make_entry("a memory")]
ok = _write_closet(tmp_path, "bezalel", "forge", entries, dry_run=False)
assert ok is True
closet = tmp_path / "bezalel" / "forge.closet.json"
assert closet.exists()
data = json.loads(closet.read_text())
assert data["wing"] == "bezalel"
assert data["room"] == "forge"
assert len(data["drawers"]) == 1
assert data["drawers"][0]["closet"] is True
assert data["drawers"][0]["text"] == "a memory"
def test_write_closet_dry_run_does_not_create(tmp_path):
entries = [_make_entry("a memory")]
ok = _write_closet(tmp_path, "bezalel", "forge", entries, dry_run=True)
assert ok is True
closet = tmp_path / "bezalel" / "forge.closet.json"
assert not closet.exists()
def test_write_closet_creates_wing_subdirectory(tmp_path):
entries = [_make_entry("memory")]
_write_closet(tmp_path, "timmy", "hermes", entries, dry_run=False)
assert (tmp_path / "timmy").is_dir()
def test_write_closet_source_file_is_tunnel_tagged(tmp_path):
entries = [_make_entry("memory")]
_write_closet(tmp_path, "bezalel", "hermes", entries, dry_run=False)
closet = tmp_path / "bezalel" / "hermes.closet.json"
data = json.loads(closet.read_text())
assert data["drawers"][0]["source_file"].startswith("tunnel:")
# ---------------------------------------------------------------------------
# sync_peer
# ---------------------------------------------------------------------------
def _mock_get_responses(peer_url: str) -> dict:
"""Minimal mock _get returning health, wings, and search results."""
def _get(url: str) -> dict:
if url.endswith("/health"):
return {"status": "ok", "palace": "/var/lib/mempalace/fleet"}
if url.endswith("/wings"):
return {"wings": ["bezalel"]}
if "/search" in url:
return {"results": [_make_entry("test memory")]}
return {}
return _get
def test_sync_peer_writes_closets(tmp_path):
(tmp_path / ".gitkeep").touch() # ensure palace dir exists
with patch("mempalace.tunnel_sync._get", side_effect=_mock_get_responses("http://peer:7771")):
result = sync_peer("http://peer:7771", tmp_path, n_results=10)
assert result.ok
assert "bezalel" in result.wings_found
assert result.closets_written > 0
def test_sync_peer_dry_run_no_files(tmp_path):
(tmp_path / ".gitkeep").touch()
with patch("mempalace.tunnel_sync._get", side_effect=_mock_get_responses("http://peer:7771")):
result = sync_peer("http://peer:7771", tmp_path, n_results=10, dry_run=True)
assert result.ok
# No closet files should be written
closets = list(tmp_path.rglob("*.closet.json"))
assert closets == []
def test_sync_peer_unreachable_returns_error(tmp_path):
import urllib.error
with patch("mempalace.tunnel_sync._get", side_effect=urllib.error.URLError("refused")):
result = sync_peer("http://unreachable:7771", tmp_path)
assert not result.ok
assert any("unreachable" in e or "refused" in e for e in result.errors)
def test_sync_peer_unhealthy_returns_error(tmp_path):
with patch("mempalace.tunnel_sync._get", return_value={"status": "degraded"}):
result = sync_peer("http://peer:7771", tmp_path)
assert not result.ok
assert any("unhealthy" in e for e in result.errors)
def test_sync_peer_no_wings_is_ok(tmp_path):
def _get(url: str) -> dict:
if "/health" in url:
return {"status": "ok"}
return {"wings": []}
with patch("mempalace.tunnel_sync._get", side_effect=_get):
result = sync_peer("http://peer:7771", tmp_path)
assert result.ok
assert result.closets_written == 0
# ---------------------------------------------------------------------------
# SyncResult.ok
# ---------------------------------------------------------------------------
def test_sync_result_ok_no_errors():
r = SyncResult(wings_found=["bezalel"], rooms_pulled=5, closets_written=5)
assert r.ok is True
def test_sync_result_not_ok_with_errors():
r = SyncResult(errors=["connection refused"])
assert r.ok is False