Compare commits
6 Commits
groq/issue
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e644b00dff | ||
|
|
b445c04037 | ||
| 60bd9a05ff | |||
| c7468a3c6a | |||
| 07a4be3bb9 | |||
| 804536a3f2 |
21
.gitea/workflows/review_gate.yml
Normal file
21
.gitea/workflows/review_gate.yml
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Review Approval Gate
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-review:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify PR has approving review
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
PR_NUMBER: ${{ gitea.event.pull_request.number }}
|
||||
run: |
|
||||
python3 scripts/review_gate.py
|
||||
20
.gitea/workflows/staging_gate.yml
Normal file
20
.gitea/workflows/staging_gate.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: Staging Verification Gate
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-staging:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify staging label on merge PR
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
run: |
|
||||
python3 scripts/staging_gate.py
|
||||
34
.gitea/workflows/weekly-audit.yml
Normal file
34
.gitea/workflows/weekly-audit.yml
Normal file
@@ -0,0 +1,34 @@
|
||||
name: Weekly Privacy Audit
|
||||
|
||||
# Runs every Monday at 05:00 UTC against a CI test fixture.
|
||||
# On production wizards these same scripts should run via cron:
|
||||
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
|
||||
# 0 5 * * 1 python /opt/nexus/mempalace/retain_closets.py /var/lib/mempalace/fleet --days 90
|
||||
#
|
||||
# Refs: #1083, #1075
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 5 * * 1" # Monday 05:00 UTC
|
||||
workflow_dispatch: {} # allow manual trigger
|
||||
|
||||
jobs:
|
||||
privacy-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Run privacy audit against CI fixture
|
||||
run: |
|
||||
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
|
||||
|
||||
- name: Dry-run retention enforcement against CI fixture
|
||||
# Real enforcement runs on the live VPS; CI verifies the script runs cleanly.
|
||||
run: |
|
||||
python mempalace/retain_closets.py tests/fixtures/fleet_palace --days 90 --dry-run
|
||||
@@ -1,44 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Apply branch protections to all repositories
|
||||
# Requires GITEA_TOKEN env var
|
||||
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for repo in "${REPOS[@]}"
|
||||
do
|
||||
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"required_reviews": 1,
|
||||
"dismiss_stale_reviews": true,
|
||||
"block_force_push": true,
|
||||
"block_deletions": true
|
||||
}'
|
||||
done
|
||||
#!/bin/bash
|
||||
|
||||
# Gitea API credentials
|
||||
GITEA_TOKEN="your-personal-access-token"
|
||||
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
# Repos to protect
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for REPO in "${REPO[@]}"; do
|
||||
echo "Configuring branch protection for $REPO..."
|
||||
|
||||
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"name": "main",
|
||||
"require_pull_request": true,
|
||||
"required_approvals": 1,
|
||||
"dismiss_stale_approvals": true,
|
||||
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
|
||||
"block_force_push": true,
|
||||
"block_delete": true
|
||||
}' \
|
||||
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
|
||||
done
|
||||
# Wrapper for the canonical branch-protection sync script.
|
||||
# Usage: ./gitea-branch-protection.sh
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
python3 scripts/sync_branch_protection.py
|
||||
|
||||
186
mempalace/fleet_api.py
Normal file
186
mempalace/fleet_api.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
|
||||
|
||||
Exposes fleet memory search over HTTP so that Alpha servers and other
|
||||
wizard deployments can query the palace without direct filesystem access.
|
||||
|
||||
Endpoints:
|
||||
GET /health
|
||||
Returns {"status": "ok", "palace": "<path>"}
|
||||
|
||||
GET /search?q=<query>[&room=<room>][&n=<int>]
|
||||
Returns {"results": [...], "query": "...", "room": "...", "count": N}
|
||||
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
|
||||
|
||||
GET /wings
|
||||
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
|
||||
|
||||
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
|
||||
|
||||
Usage:
|
||||
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
|
||||
python mempalace/fleet_api.py
|
||||
|
||||
# Custom host/port/palace:
|
||||
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
# Add repo root to path so we can import nexus.mempalace
|
||||
_HERE = Path(__file__).resolve().parent
|
||||
_REPO_ROOT = _HERE.parent
|
||||
if str(_REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 7771
|
||||
MAX_RESULTS = 50
|
||||
|
||||
|
||||
def _get_palace_path() -> Path:
|
||||
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
|
||||
|
||||
|
||||
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
|
||||
payload = json.dumps(body).encode()
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", str(len(payload)))
|
||||
handler.end_headers()
|
||||
handler.wfile.write(payload)
|
||||
|
||||
|
||||
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
|
||||
palace = _get_palace_path()
|
||||
_json_response(handler, 200, {
|
||||
"status": "ok",
|
||||
"palace": str(palace),
|
||||
"palace_exists": palace.exists(),
|
||||
})
|
||||
|
||||
|
||||
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
|
||||
query_terms = qs.get("q", [""])
|
||||
q = query_terms[0].strip() if query_terms else ""
|
||||
if not q:
|
||||
_json_response(handler, 400, {"error": "Missing required parameter: q"})
|
||||
return
|
||||
|
||||
room_terms = qs.get("room", [])
|
||||
room = room_terms[0].strip() if room_terms else None
|
||||
|
||||
n_terms = qs.get("n", [])
|
||||
try:
|
||||
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
|
||||
except (ValueError, IndexError):
|
||||
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
|
||||
return
|
||||
|
||||
try:
|
||||
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
|
||||
except ImportError as exc:
|
||||
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
|
||||
return
|
||||
|
||||
try:
|
||||
results = search_fleet(q, room=room, n_results=n)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_json_response(handler, 503, {"error": str(exc)})
|
||||
return
|
||||
|
||||
_json_response(handler, 200, {
|
||||
"query": q,
|
||||
"room": room,
|
||||
"count": len(results),
|
||||
"results": [
|
||||
{
|
||||
"text": r.text,
|
||||
"room": r.room,
|
||||
"wing": r.wing,
|
||||
"score": round(r.score, 4),
|
||||
}
|
||||
for r in results
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
|
||||
"""Return distinct wizard wing names found in the fleet palace directory."""
|
||||
palace = _get_palace_path()
|
||||
if not palace.exists():
|
||||
_json_response(handler, 503, {
|
||||
"error": f"Fleet palace not found: {palace}",
|
||||
})
|
||||
return
|
||||
|
||||
wings = sorted({
|
||||
p.name for p in palace.iterdir() if p.is_dir()
|
||||
})
|
||||
_json_response(handler, 200, {"wings": wings})
|
||||
|
||||
|
||||
class FleetAPIHandler(BaseHTTPRequestHandler):
|
||||
"""Request handler for the fleet memory API."""
|
||||
|
||||
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
|
||||
# Prefix with tag for easier log filtering
|
||||
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path.rstrip("/") or "/"
|
||||
qs = parse_qs(parsed.query)
|
||||
|
||||
if path == "/health":
|
||||
_handle_health(self)
|
||||
elif path == "/search":
|
||||
_handle_search(self, qs)
|
||||
elif path == "/wings":
|
||||
_handle_wings(self)
|
||||
else:
|
||||
_json_response(self, 404, {
|
||||
"error": f"Unknown endpoint: {path}",
|
||||
"endpoints": ["/health", "/search", "/wings"],
|
||||
})
|
||||
|
||||
|
||||
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
|
||||
return HTTPServer((host, port), FleetAPIHandler)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet palace HTTP API server."
|
||||
)
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
|
||||
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
palace = _get_palace_path()
|
||||
print(f"[fleet_api] Palace: {palace}")
|
||||
if not palace.exists():
|
||||
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
|
||||
|
||||
server = make_server(args.host, args.port)
|
||||
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[fleet_api] Shutting down.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
163
mempalace/retain_closets.py
Normal file
163
mempalace/retain_closets.py
Normal file
@@ -0,0 +1,163 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
retain_closets.py — Retention policy enforcement for fleet palace closets.
|
||||
|
||||
Removes closet files older than a configurable retention window (default: 90 days).
|
||||
Run this on the Alpha host (or any fleet palace directory) to enforce the
|
||||
closet aging policy described in #1083.
|
||||
|
||||
Usage:
|
||||
# Dry-run: show what would be removed (no deletions)
|
||||
python mempalace/retain_closets.py --dry-run
|
||||
|
||||
# Enforce 90-day retention (default)
|
||||
python mempalace/retain_closets.py
|
||||
|
||||
# Custom retention window
|
||||
python mempalace/retain_closets.py --days 30
|
||||
|
||||
# Custom palace path
|
||||
python mempalace/retain_closets.py /data/fleet --days 90
|
||||
|
||||
Exits:
|
||||
0 — success (clean, or pruned without error)
|
||||
1 — error (e.g., palace directory not found)
|
||||
|
||||
Refs: #1083, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_RETENTION_DAYS = 90
|
||||
DEFAULT_PALACE_PATH = "/var/lib/mempalace/fleet"
|
||||
|
||||
|
||||
@dataclass
|
||||
class RetentionResult:
|
||||
scanned: int = 0
|
||||
removed: int = 0
|
||||
kept: int = 0
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def ok(self) -> bool:
|
||||
return len(self.errors) == 0
|
||||
|
||||
|
||||
def _file_age_days(path: Path) -> float:
|
||||
"""Return the age of a file in days based on mtime."""
|
||||
mtime = path.stat().st_mtime
|
||||
now = time.time()
|
||||
return (now - mtime) / 86400.0
|
||||
|
||||
|
||||
def enforce_retention(
|
||||
palace_dir: Path,
|
||||
retention_days: int = DEFAULT_RETENTION_DAYS,
|
||||
dry_run: bool = False,
|
||||
) -> RetentionResult:
|
||||
"""
|
||||
Remove *.closet.json files older than *retention_days* from *palace_dir*.
|
||||
|
||||
Only closet files are pruned — raw drawer files are never present in a
|
||||
compliant fleet palace, so this script does not touch them.
|
||||
|
||||
Args:
|
||||
palace_dir: Root directory of the fleet palace to scan.
|
||||
retention_days: Files older than this many days will be removed.
|
||||
dry_run: If True, report what would be removed but make no changes.
|
||||
|
||||
Returns:
|
||||
RetentionResult with counts and any errors.
|
||||
"""
|
||||
result = RetentionResult()
|
||||
|
||||
for closet_file in sorted(palace_dir.rglob("*.closet.json")):
|
||||
result.scanned += 1
|
||||
try:
|
||||
age = _file_age_days(closet_file)
|
||||
except OSError as exc:
|
||||
result.errors.append(f"Could not stat {closet_file}: {exc}")
|
||||
continue
|
||||
|
||||
if age > retention_days:
|
||||
if dry_run:
|
||||
print(
|
||||
f"[retain_closets] DRY-RUN would remove ({age:.0f}d old): {closet_file}"
|
||||
)
|
||||
result.removed += 1
|
||||
else:
|
||||
try:
|
||||
closet_file.unlink()
|
||||
print(f"[retain_closets] Removed ({age:.0f}d old): {closet_file}")
|
||||
result.removed += 1
|
||||
except OSError as exc:
|
||||
result.errors.append(f"Could not remove {closet_file}: {exc}")
|
||||
else:
|
||||
result.kept += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Enforce retention policy on fleet palace closets."
|
||||
)
|
||||
parser.add_argument(
|
||||
"palace_dir",
|
||||
nargs="?",
|
||||
default=os.environ.get("FLEET_PALACE_PATH", DEFAULT_PALACE_PATH),
|
||||
help=f"Fleet palace directory (default: {DEFAULT_PALACE_PATH})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--days",
|
||||
type=int,
|
||||
default=DEFAULT_RETENTION_DAYS,
|
||||
metavar="N",
|
||||
help=f"Retention window in days (default: {DEFAULT_RETENTION_DAYS})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be removed without deleting anything.",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
palace_dir = Path(args.palace_dir)
|
||||
if not palace_dir.exists():
|
||||
print(
|
||||
f"[retain_closets] ERROR: palace directory not found: {palace_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
mode = "DRY-RUN" if args.dry_run else "LIVE"
|
||||
print(
|
||||
f"[retain_closets] {mode} — scanning {palace_dir} "
|
||||
f"(retention: {args.days} days)"
|
||||
)
|
||||
|
||||
result = enforce_retention(palace_dir, retention_days=args.days, dry_run=args.dry_run)
|
||||
|
||||
if result.errors:
|
||||
for err in result.errors:
|
||||
print(f"[retain_closets] ERROR: {err}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
action = "would remove" if args.dry_run else "removed"
|
||||
print(
|
||||
f"[retain_closets] Done — scanned {result.scanned}, "
|
||||
f"{action} {result.removed}, kept {result.kept}."
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
308
mempalace/tunnel_sync.py
Normal file
308
mempalace/tunnel_sync.py
Normal file
@@ -0,0 +1,308 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tunnel_sync.py — Pull closets from a remote wizard's fleet API into the local palace.
|
||||
|
||||
This is the client-side tunnel mechanism for #1078. It connects to a peer
|
||||
wizard's running fleet_api.py HTTP server, discovers their memory wings, and
|
||||
imports the results into the local fleet palace as closet files. Once imported,
|
||||
`recall <query> --fleet` in Evennia will return results from the remote wing.
|
||||
|
||||
The code side is complete here; the infrastructure side (second wizard running
|
||||
fleet_api.py behind an SSH tunnel or VPN) is still required to use this.
|
||||
|
||||
Usage:
|
||||
# Pull from a remote Alpha fleet API into the default local palace
|
||||
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771
|
||||
|
||||
# Custom local palace path
|
||||
FLEET_PALACE_PATH=/data/fleet python mempalace/tunnel_sync.py \\
|
||||
--peer http://alpha.example.com:7771
|
||||
|
||||
# Dry-run: show what would be imported without writing files
|
||||
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771 --dry-run
|
||||
|
||||
# Limit results per room (default: 50)
|
||||
python mempalace/tunnel_sync.py --peer http://alpha.example.com:7771 --n 20
|
||||
|
||||
Environment:
|
||||
FLEET_PALACE_PATH — local fleet palace directory (default: /var/lib/mempalace/fleet)
|
||||
FLEET_PEER_URL — remote fleet API URL (overridden by --peer flag)
|
||||
|
||||
Exits:
|
||||
0 — sync succeeded (or dry-run completed)
|
||||
1 — error (connection failure, invalid response, write error)
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
DEFAULT_PALACE_PATH = "/var/lib/mempalace/fleet"
|
||||
DEFAULT_N_RESULTS = 50
|
||||
# Broad queries for bulk room pull — used to discover representative content
|
||||
_BROAD_QUERIES = [
|
||||
"the", "a", "is", "was", "and", "of", "to", "in", "it", "on",
|
||||
"commit", "issue", "error", "fix", "deploy", "event", "memory",
|
||||
]
|
||||
_REQUEST_TIMEOUT = 10 # seconds
|
||||
|
||||
|
||||
@dataclass
|
||||
class SyncResult:
|
||||
wings_found: list[str] = field(default_factory=list)
|
||||
rooms_pulled: int = 0
|
||||
closets_written: int = 0
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def ok(self) -> bool:
|
||||
return len(self.errors) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HTTP helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _get(url: str) -> dict[str, Any]:
|
||||
"""GET *url*, return parsed JSON or raise on error."""
|
||||
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=_REQUEST_TIMEOUT) as resp:
|
||||
return json.loads(resp.read())
|
||||
|
||||
|
||||
def _peer_url(base: str, path: str) -> str:
|
||||
return base.rstrip("/") + path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wing / room discovery
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_remote_wings(peer_url: str) -> list[str]:
|
||||
"""Return the list of wing names from the remote fleet API."""
|
||||
data = _get(_peer_url(peer_url, "/wings"))
|
||||
return data.get("wings", [])
|
||||
|
||||
|
||||
def search_remote_room(peer_url: str, room: str, n: int = DEFAULT_N_RESULTS) -> list[dict]:
|
||||
"""
|
||||
Pull closet entries for a specific room from the remote peer.
|
||||
|
||||
Uses multiple broad queries and deduplicates by text to maximize coverage
|
||||
without requiring a dedicated bulk-export endpoint.
|
||||
"""
|
||||
seen_texts: set[str] = set()
|
||||
results: list[dict] = []
|
||||
|
||||
for q in _BROAD_QUERIES:
|
||||
url = _peer_url(peer_url, f"/search?q={urllib.request.quote(q)}&room={urllib.request.quote(room)}&n={n}")
|
||||
try:
|
||||
data = _get(url)
|
||||
except (urllib.error.URLError, json.JSONDecodeError, OSError):
|
||||
continue
|
||||
|
||||
for entry in data.get("results", []):
|
||||
text = entry.get("text", "")
|
||||
if text and text not in seen_texts:
|
||||
seen_texts.add(text)
|
||||
results.append(entry)
|
||||
|
||||
if len(results) >= n:
|
||||
break
|
||||
|
||||
return results[:n]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core sync
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_closet(
|
||||
palace_dir: Path,
|
||||
wing: str,
|
||||
room: str,
|
||||
entries: list[dict],
|
||||
dry_run: bool,
|
||||
) -> bool:
|
||||
"""Write entries as a .closet.json file under palace_dir/wing/."""
|
||||
wing_dir = palace_dir / wing
|
||||
closet_path = wing_dir / f"{room}.closet.json"
|
||||
|
||||
drawers = [
|
||||
{
|
||||
"text": e.get("text", ""),
|
||||
"room": e.get("room", room),
|
||||
"wing": e.get("wing", wing),
|
||||
"score": e.get("score", 0.0),
|
||||
"closet": True,
|
||||
"source_file": f"tunnel:{wing}/{room}",
|
||||
"synced_at": int(time.time()),
|
||||
}
|
||||
for e in entries
|
||||
]
|
||||
|
||||
payload = json.dumps({"drawers": drawers, "wing": wing, "room": room}, indent=2)
|
||||
|
||||
if dry_run:
|
||||
print(f"[tunnel_sync] DRY-RUN would write {len(drawers)} entries → {closet_path}")
|
||||
return True
|
||||
|
||||
try:
|
||||
wing_dir.mkdir(parents=True, exist_ok=True)
|
||||
closet_path.write_text(payload)
|
||||
print(f"[tunnel_sync] Wrote {len(drawers)} entries → {closet_path}")
|
||||
return True
|
||||
except OSError as exc:
|
||||
print(f"[tunnel_sync] ERROR writing {closet_path}: {exc}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
def sync_peer(
|
||||
peer_url: str,
|
||||
palace_dir: Path,
|
||||
n_results: int = DEFAULT_N_RESULTS,
|
||||
dry_run: bool = False,
|
||||
) -> SyncResult:
|
||||
"""
|
||||
Pull all wings and rooms from *peer_url* into *palace_dir*.
|
||||
|
||||
Args:
|
||||
peer_url: Base URL of the remote fleet_api.py instance.
|
||||
palace_dir: Local fleet palace directory to write closets into.
|
||||
n_results: Maximum results to pull per room.
|
||||
dry_run: If True, print what would be written without touching disk.
|
||||
|
||||
Returns:
|
||||
SyncResult with counts and any errors.
|
||||
"""
|
||||
result = SyncResult()
|
||||
|
||||
# Discover health
|
||||
try:
|
||||
health = _get(_peer_url(peer_url, "/health"))
|
||||
if health.get("status") != "ok":
|
||||
result.errors.append(f"Peer unhealthy: {health}")
|
||||
return result
|
||||
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
|
||||
result.errors.append(f"Could not reach peer at {peer_url}: {exc}")
|
||||
return result
|
||||
|
||||
# Discover wings
|
||||
try:
|
||||
wings = get_remote_wings(peer_url)
|
||||
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
|
||||
result.errors.append(f"Could not list wings from {peer_url}: {exc}")
|
||||
return result
|
||||
|
||||
result.wings_found = wings
|
||||
if not wings:
|
||||
print(f"[tunnel_sync] No wings found at {peer_url} — nothing to sync.")
|
||||
return result
|
||||
|
||||
print(f"[tunnel_sync] Found wings: {wings}")
|
||||
|
||||
# Import core rooms from each wing
|
||||
from nexus.mempalace.config import CORE_ROOMS
|
||||
|
||||
for wing in wings:
|
||||
for room in CORE_ROOMS:
|
||||
print(f"[tunnel_sync] Pulling {wing}/{room} …")
|
||||
try:
|
||||
entries = search_remote_room(peer_url, room, n=n_results)
|
||||
except (urllib.error.URLError, json.JSONDecodeError, OSError) as exc:
|
||||
err = f"Error pulling {wing}/{room}: {exc}"
|
||||
result.errors.append(err)
|
||||
print(f"[tunnel_sync] ERROR: {err}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
if not entries:
|
||||
print(f"[tunnel_sync] No entries found for {wing}/{room} — skipping.")
|
||||
continue
|
||||
|
||||
ok = _write_closet(palace_dir, wing, room, entries, dry_run=dry_run)
|
||||
result.rooms_pulled += 1
|
||||
if ok:
|
||||
result.closets_written += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Sync closets from a remote wizard's fleet API into the local palace."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--peer",
|
||||
default=os.environ.get("FLEET_PEER_URL", ""),
|
||||
metavar="URL",
|
||||
help="Base URL of the remote fleet_api.py (e.g. http://alpha.example.com:7771)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--palace",
|
||||
default=os.environ.get("FLEET_PALACE_PATH", DEFAULT_PALACE_PATH),
|
||||
metavar="DIR",
|
||||
help=f"Local fleet palace directory (default: {DEFAULT_PALACE_PATH})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--n",
|
||||
type=int,
|
||||
default=DEFAULT_N_RESULTS,
|
||||
metavar="N",
|
||||
help=f"Max results per room (default: {DEFAULT_N_RESULTS})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be synced without writing files.",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if not args.peer:
|
||||
print(
|
||||
"[tunnel_sync] ERROR: --peer URL is required (or set FLEET_PEER_URL).",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
palace_dir = Path(args.palace)
|
||||
if not palace_dir.exists() and not args.dry_run:
|
||||
print(
|
||||
f"[tunnel_sync] ERROR: local palace not found: {palace_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 1
|
||||
|
||||
mode = "DRY-RUN" if args.dry_run else "LIVE"
|
||||
print(f"[tunnel_sync] {mode} — peer: {args.peer} palace: {palace_dir}")
|
||||
|
||||
result = sync_peer(args.peer, palace_dir, n_results=args.n, dry_run=args.dry_run)
|
||||
|
||||
if result.errors:
|
||||
for err in result.errors:
|
||||
print(f"[tunnel_sync] ERROR: {err}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
print(
|
||||
f"[tunnel_sync] Done — wings: {result.wings_found}, "
|
||||
f"rooms pulled: {result.rooms_pulled}, closets written: {result.closets_written}."
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet Merge Review Audit
|
||||
========================
|
||||
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||
and validates that each merged PR had at least one approving review.
|
||||
|
||||
Exit 0 = no unreviewed merges
|
||||
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||
|
||||
Usage:
|
||||
python scripts/audit_merge_reviews.py
|
||||
python scripts/audit_merge_reviews.py --create-issues
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import json
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
DAYS_BACK = 7
|
||||
SECURITY_LABEL = "security"
|
||||
|
||||
|
||||
def api_request(path: str) -> dict | list:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def api_post(path: str, payload: dict) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repos() -> list[str]:
|
||||
repos = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||
if not batch:
|
||||
break
|
||||
repos.extend([r["name"] for r in batch])
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(
|
||||
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||
prs.append(pr)
|
||||
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||
return prs
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
|
||||
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||
try:
|
||||
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
|
||||
|
||||
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||
body = (
|
||||
f"## Unreviewed Merge Detected\n\n"
|
||||
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||
f"- **Merged at:** {pr['merged_at']}\n"
|
||||
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||
f"### Required Actions\n"
|
||||
f"1. Validate the merge contents are safe.\n"
|
||||
f"2. If malicious or incorrect, revert immediately.\n"
|
||||
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||
)
|
||||
try:
|
||||
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [SECURITY_LABEL],
|
||||
})
|
||||
return issue.get("number")
|
||||
except Exception as e:
|
||||
print(f" FAILED to create issue: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||
return 1
|
||||
|
||||
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||
since = since_dt.isoformat()
|
||||
|
||||
repos = get_repos()
|
||||
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||
|
||||
unreviewed_count = 0
|
||||
for repo in repos:
|
||||
merged = get_merged_prs(repo, since)
|
||||
if not merged:
|
||||
continue
|
||||
|
||||
repo_unreviewed = []
|
||||
for pr in merged:
|
||||
reviews = get_reviews(repo, pr["number"])
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if not approvals:
|
||||
repo_unreviewed.append(pr)
|
||||
|
||||
if repo_unreviewed:
|
||||
print(f"\n{repo}:")
|
||||
for pr in repo_unreviewed:
|
||||
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||
unreviewed_count += 1
|
||||
if args.create_issues:
|
||||
issue_num = create_post_mortem(repo, pr)
|
||||
if issue_num:
|
||||
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
if unreviewed_count == 0:
|
||||
print("All merges in the last 7 days had at least one approving review.")
|
||||
return 0
|
||||
else:
|
||||
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
70
scripts/review_gate.py
Normal file
70
scripts/review_gate.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Review Gate — Poka-yoke for unreviewed merges.
|
||||
Fails if the current PR has fewer than 1 approving review.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Review Approval Gate
|
||||
run: python scripts/review_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "")
|
||||
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
if not REPO:
|
||||
print("ERROR: GITEA_REPO not set")
|
||||
sys.exit(1)
|
||||
|
||||
pr_number = PR_NUMBER
|
||||
if not pr_number:
|
||||
# Try to infer from Gitea Actions environment
|
||||
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
||||
|
||||
if not pr_number:
|
||||
print("ERROR: Could not determine PR number")
|
||||
sys.exit(1)
|
||||
|
||||
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
|
||||
if isinstance(reviews, dict) and "error" in reviews:
|
||||
print(f"ERROR fetching reviews: {reviews}")
|
||||
sys.exit(1)
|
||||
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if len(approvals) >= 1:
|
||||
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
|
||||
print("Merges are not permitted without at least one approval.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
77
scripts/staging_gate.py
Normal file
77
scripts/staging_gate.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Staging Gate — Poka-yoke for production deployments.
|
||||
Checks if the PR that introduced the current commit was marked `staging-verified`.
|
||||
Fails the workflow if not, blocking deploy.yml from proceeding.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Staging Verification Gate
|
||||
run: python scripts/staging_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_commit_sha():
|
||||
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def get_pr_for_commit(sha):
|
||||
# Search open and closed PRs for this commit
|
||||
for state in ["closed", "open"]:
|
||||
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
|
||||
if isinstance(prs, list):
|
||||
for pr in prs:
|
||||
if pr.get("merge_commit_sha") == sha:
|
||||
return pr
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
sha = get_commit_sha()
|
||||
pr = get_pr_for_commit(sha)
|
||||
|
||||
if not pr:
|
||||
# Direct push to main without PR — block unless explicitly forced
|
||||
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
|
||||
print("To bypass, merge via PR and add the 'staging-verified' label.")
|
||||
sys.exit(1)
|
||||
|
||||
labels = {label["name"] for label in pr.get("labels", [])}
|
||||
if "staging-verified" in labels:
|
||||
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
|
||||
print("Deploy to production is not permitted until staging is verified.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
81
scripts/sync_branch_protection.py
Normal file
81
scripts/sync_branch_protection.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
|
||||
Correctly uses the Gitea 1.25+ API (not GitHub-style).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
import yaml
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
CONFIG_DIR = ".gitea/branch-protection"
|
||||
|
||||
|
||||
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode() if payload else None
|
||||
req = urllib.request.Request(url, data=data, method=method, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def apply_protection(repo: str, rules: dict) -> bool:
|
||||
branch = rules.pop("branch", "main")
|
||||
# Check if protection already exists
|
||||
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
|
||||
exists = any(r.get("branch_name") == branch for r in existing)
|
||||
|
||||
payload = {
|
||||
"branch_name": branch,
|
||||
"rule_name": branch,
|
||||
"required_approvals": rules.get("required_approvals", 1),
|
||||
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
|
||||
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
|
||||
"block_deletions": rules.get("block_deletions", True),
|
||||
"block_force_push": rules.get("block_force_push", True),
|
||||
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
|
||||
"enable_status_check": rules.get("require_ci_to_merge", False),
|
||||
"status_check_contexts": rules.get("status_check_contexts", []),
|
||||
}
|
||||
|
||||
try:
|
||||
if exists:
|
||||
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
|
||||
else:
|
||||
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
|
||||
print(f"✅ {repo}:{branch} synced")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ {repo}:{branch} failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
return 1
|
||||
|
||||
ok = 0
|
||||
for fname in os.listdir(CONFIG_DIR):
|
||||
if not fname.endswith(".yml"):
|
||||
continue
|
||||
repo = fname[:-4]
|
||||
with open(os.path.join(CONFIG_DIR, fname)) as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
if apply_protection(repo, cfg.get("rules", {})):
|
||||
ok += 1
|
||||
|
||||
print(f"\nSynced {ok} repo(s)")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "forge",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "CI pipeline green on main. All 253 tests passing.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
},
|
||||
{
|
||||
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "hermes",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
|
||||
"source_file": "hermes.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "issues",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
|
||||
"source_file": "issues.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
239
tests/test_mempalace_fleet_api.py
Normal file
239
tests/test_mempalace_fleet_api.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import handler directly so we can test without running a server process.
|
||||
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeSocket:
|
||||
"""Minimal socket stub for BaseHTTPRequestHandler."""
|
||||
|
||||
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
|
||||
return io.BytesIO(b"")
|
||||
|
||||
|
||||
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
|
||||
"""Construct a handler pointed at *path*, capture wfile output."""
|
||||
buf = io.BytesIO()
|
||||
request = _FakeSocket()
|
||||
client_address = ("127.0.0.1", 0)
|
||||
|
||||
handler = FleetAPIHandler.__new__(FleetAPIHandler)
|
||||
handler.path = path
|
||||
handler.request = request
|
||||
handler.client_address = client_address
|
||||
handler.server = MagicMock()
|
||||
handler.wfile = buf
|
||||
handler.rfile = io.BytesIO(b"")
|
||||
handler.command = "GET"
|
||||
handler._headers_buffer = []
|
||||
|
||||
# Stub send_response / send_header / end_headers to write minimal HTTP
|
||||
handler._response_code = None
|
||||
def _send_response(code, message=None): # noqa: ANN001
|
||||
handler._response_code = code
|
||||
def _send_header(k, v): # noqa: ANN001
|
||||
pass
|
||||
def _end_headers(): # noqa: ANN001
|
||||
pass
|
||||
handler.send_response = _send_response
|
||||
handler.send_header = _send_header
|
||||
handler.end_headers = _end_headers
|
||||
|
||||
return handler, buf
|
||||
|
||||
|
||||
def _parse_response(buf: io.BytesIO) -> dict:
|
||||
buf.seek(0)
|
||||
return json.loads(buf.read())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /health
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_health_returns_ok(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is True
|
||||
|
||||
|
||||
def test_health_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_search_fleet(results):
|
||||
"""Return a patch target that returns *results*."""
|
||||
mock = MagicMock(return_value=results)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
|
||||
from nexus.mempalace.searcher import MemPalaceResult
|
||||
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
|
||||
|
||||
|
||||
def test_search_missing_q_param():
|
||||
handler, buf = _make_handler("/search")
|
||||
_handle_search(handler, {})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_returns_results(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "chroma.sqlite3").touch()
|
||||
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
|
||||
|
||||
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
|
||||
handler, buf = _make_handler("/search?q=CI")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]):
|
||||
import importlib
|
||||
import mempalace.fleet_api as api_module
|
||||
# Patch search_fleet inside the handler's import context
|
||||
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
|
||||
_handle_search(handler, {"q": ["CI"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert data["count"] == 1
|
||||
assert data["results"][0]["text"] == "CI green"
|
||||
assert data["results"][0]["room"] == "forge"
|
||||
assert data["results"][0]["wing"] == "bezalel"
|
||||
assert data["results"][0]["score"] == 0.95
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_search_with_room_filter(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
result = _make_result()
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
|
||||
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
|
||||
|
||||
# Verify room was passed through
|
||||
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
|
||||
|
||||
|
||||
def test_search_invalid_n_param():
|
||||
handler, buf = _make_handler("/search?q=test&n=bad")
|
||||
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_palace_unavailable(monkeypatch):
|
||||
from nexus.mempalace.searcher import MemPalaceUnavailable
|
||||
|
||||
handler, buf = _make_handler("/search?q=test")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
|
||||
_handle_search(handler, {"q": ["test"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
def test_search_n_clamped_to_max():
|
||||
"""n > MAX_RESULTS is silently clamped."""
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
|
||||
handler = MagicMock()
|
||||
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
|
||||
|
||||
mock_sf.assert_called_once_with("test", room=None, n_results=50)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /wings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_wings_returns_list(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "bezalel").mkdir()
|
||||
(tmp_path / "timmy").mkdir()
|
||||
# A file should not appear in wings
|
||||
(tmp_path / "README.txt").touch()
|
||||
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert set(data["wings"]) == {"bezalel", "timmy"}
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_wings_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 404 unknown endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_unknown_endpoint():
|
||||
handler, buf = _make_handler("/foobar")
|
||||
handler.do_GET()
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 404
|
||||
assert "/search" in data["endpoints"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# audit fixture smoke test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_audit_fixture_is_clean():
|
||||
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
|
||||
from mempalace.audit_privacy import audit_palace
|
||||
|
||||
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
|
||||
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
|
||||
|
||||
result = audit_palace(fixture_dir)
|
||||
assert result.clean, (
|
||||
f"Privacy violations found in CI fixture:\n" +
|
||||
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
|
||||
)
|
||||
139
tests/test_mempalace_retain_closets.py
Normal file
139
tests/test_mempalace_retain_closets.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""
|
||||
Tests for mempalace/retain_closets.py — 90-day closet retention enforcement.
|
||||
|
||||
Refs: #1083, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from mempalace.retain_closets import (
|
||||
RetentionResult,
|
||||
_file_age_days,
|
||||
enforce_retention,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_closet(directory: Path, name: str, age_days: float) -> Path:
|
||||
"""Create a *.closet.json file with a mtime set to *age_days* ago."""
|
||||
p = directory / name
|
||||
p.write_text(json.dumps({"drawers": [{"text": "summary", "closet": True}]}))
|
||||
# Set mtime to simulate age
|
||||
mtime = time.time() - age_days * 86400.0
|
||||
import os
|
||||
os.utime(p, (mtime, mtime))
|
||||
return p
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _file_age_days
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_file_age_days_recent(tmp_path):
|
||||
p = tmp_path / "recent.closet.json"
|
||||
p.write_text("{}")
|
||||
age = _file_age_days(p)
|
||||
assert 0 <= age < 1 # just created
|
||||
|
||||
|
||||
def test_file_age_days_old(tmp_path):
|
||||
p = _write_closet(tmp_path, "old.closet.json", age_days=100)
|
||||
age = _file_age_days(p)
|
||||
assert 99 < age < 101
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# enforce_retention — dry_run
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_dry_run_does_not_delete(tmp_path):
|
||||
old = _write_closet(tmp_path, "old.closet.json", age_days=100)
|
||||
_write_closet(tmp_path, "new.closet.json", age_days=10)
|
||||
|
||||
result = enforce_retention(tmp_path, retention_days=90, dry_run=True)
|
||||
|
||||
# File still exists after dry-run
|
||||
assert old.exists()
|
||||
assert result.removed == 1 # counted but not actually removed
|
||||
assert result.kept == 1
|
||||
assert result.ok
|
||||
|
||||
|
||||
def test_dry_run_keeps_recent_files(tmp_path):
|
||||
_write_closet(tmp_path, "recent.closet.json", age_days=5)
|
||||
result = enforce_retention(tmp_path, retention_days=90, dry_run=True)
|
||||
assert result.removed == 0
|
||||
assert result.kept == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# enforce_retention — live mode
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_live_removes_old_closets(tmp_path):
|
||||
old = _write_closet(tmp_path, "old.closet.json", age_days=100)
|
||||
new = _write_closet(tmp_path, "new.closet.json", age_days=10)
|
||||
|
||||
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
|
||||
|
||||
assert not old.exists()
|
||||
assert new.exists()
|
||||
assert result.removed == 1
|
||||
assert result.kept == 1
|
||||
assert result.ok
|
||||
|
||||
|
||||
def test_live_keeps_files_within_window(tmp_path):
|
||||
f = _write_closet(tmp_path, "edge.closet.json", age_days=89)
|
||||
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
|
||||
assert f.exists()
|
||||
assert result.removed == 0
|
||||
assert result.kept == 1
|
||||
|
||||
|
||||
def test_empty_directory_is_ok(tmp_path):
|
||||
result = enforce_retention(tmp_path, retention_days=90)
|
||||
assert result.scanned == 0
|
||||
assert result.removed == 0
|
||||
assert result.ok
|
||||
|
||||
|
||||
def test_subdirectory_closets_are_pruned(tmp_path):
|
||||
"""enforce_retention should recurse into subdirs (wing directories)."""
|
||||
sub = tmp_path / "bezalel"
|
||||
sub.mkdir()
|
||||
old = _write_closet(sub, "hermes.closet.json", age_days=120)
|
||||
result = enforce_retention(tmp_path, retention_days=90, dry_run=False)
|
||||
assert not old.exists()
|
||||
assert result.removed == 1
|
||||
|
||||
|
||||
def test_non_closet_files_ignored(tmp_path):
|
||||
"""Non-closet files should not be counted or touched."""
|
||||
(tmp_path / "readme.txt").write_text("hello")
|
||||
(tmp_path / "data.drawer.json").write_text("{}")
|
||||
result = enforce_retention(tmp_path, retention_days=90)
|
||||
assert result.scanned == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RetentionResult.ok
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_retention_result_ok_with_no_errors():
|
||||
r = RetentionResult(scanned=5, removed=2, kept=3)
|
||||
assert r.ok is True
|
||||
|
||||
|
||||
def test_retention_result_not_ok_with_errors():
|
||||
r = RetentionResult(errors=["could not stat file"])
|
||||
assert r.ok is False
|
||||
205
tests/test_mempalace_tunnel_sync.py
Normal file
205
tests/test_mempalace_tunnel_sync.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""
|
||||
Tests for mempalace/tunnel_sync.py — remote wizard wing sync client.
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from mempalace.tunnel_sync import (
|
||||
SyncResult,
|
||||
_peer_url,
|
||||
_write_closet,
|
||||
get_remote_wings,
|
||||
search_remote_room,
|
||||
sync_peer,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _peer_url
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_peer_url_strips_trailing_slash():
|
||||
assert _peer_url("http://host:7771/", "/wings") == "http://host:7771/wings"
|
||||
|
||||
|
||||
def test_peer_url_with_path():
|
||||
assert _peer_url("http://host:7771", "/search") == "http://host:7771/search"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_remote_wings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_get_remote_wings_returns_list():
|
||||
with patch("mempalace.tunnel_sync._get", return_value={"wings": ["bezalel", "timmy"]}):
|
||||
wings = get_remote_wings("http://peer:7771")
|
||||
assert wings == ["bezalel", "timmy"]
|
||||
|
||||
|
||||
def test_get_remote_wings_empty():
|
||||
with patch("mempalace.tunnel_sync._get", return_value={"wings": []}):
|
||||
wings = get_remote_wings("http://peer:7771")
|
||||
assert wings == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# search_remote_room
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_entry(text: str, room: str = "forge", wing: str = "bezalel", score: float = 0.9) -> dict:
|
||||
return {"text": text, "room": room, "wing": wing, "score": score}
|
||||
|
||||
|
||||
def test_search_remote_room_deduplicates():
|
||||
entry = _make_entry("CI passed")
|
||||
# Same entry returned from multiple queries — should only appear once
|
||||
with patch("mempalace.tunnel_sync._get", return_value={"results": [entry]}):
|
||||
results = search_remote_room("http://peer:7771", "forge", n=50)
|
||||
assert len(results) == 1
|
||||
assert results[0]["text"] == "CI passed"
|
||||
|
||||
|
||||
def test_search_remote_room_respects_n_limit():
|
||||
entries = [_make_entry(f"item {i}") for i in range(100)]
|
||||
with patch("mempalace.tunnel_sync._get", return_value={"results": entries}):
|
||||
results = search_remote_room("http://peer:7771", "forge", n=5)
|
||||
assert len(results) <= 5
|
||||
|
||||
|
||||
def test_search_remote_room_handles_request_error():
|
||||
import urllib.error
|
||||
with patch("mempalace.tunnel_sync._get", side_effect=urllib.error.URLError("refused")):
|
||||
results = search_remote_room("http://peer:7771", "forge")
|
||||
assert results == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _write_closet
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_write_closet_creates_file(tmp_path):
|
||||
entries = [_make_entry("a memory")]
|
||||
ok = _write_closet(tmp_path, "bezalel", "forge", entries, dry_run=False)
|
||||
assert ok is True
|
||||
closet = tmp_path / "bezalel" / "forge.closet.json"
|
||||
assert closet.exists()
|
||||
data = json.loads(closet.read_text())
|
||||
assert data["wing"] == "bezalel"
|
||||
assert data["room"] == "forge"
|
||||
assert len(data["drawers"]) == 1
|
||||
assert data["drawers"][0]["closet"] is True
|
||||
assert data["drawers"][0]["text"] == "a memory"
|
||||
|
||||
|
||||
def test_write_closet_dry_run_does_not_create(tmp_path):
|
||||
entries = [_make_entry("a memory")]
|
||||
ok = _write_closet(tmp_path, "bezalel", "forge", entries, dry_run=True)
|
||||
assert ok is True
|
||||
closet = tmp_path / "bezalel" / "forge.closet.json"
|
||||
assert not closet.exists()
|
||||
|
||||
|
||||
def test_write_closet_creates_wing_subdirectory(tmp_path):
|
||||
entries = [_make_entry("memory")]
|
||||
_write_closet(tmp_path, "timmy", "hermes", entries, dry_run=False)
|
||||
assert (tmp_path / "timmy").is_dir()
|
||||
|
||||
|
||||
def test_write_closet_source_file_is_tunnel_tagged(tmp_path):
|
||||
entries = [_make_entry("memory")]
|
||||
_write_closet(tmp_path, "bezalel", "hermes", entries, dry_run=False)
|
||||
closet = tmp_path / "bezalel" / "hermes.closet.json"
|
||||
data = json.loads(closet.read_text())
|
||||
assert data["drawers"][0]["source_file"].startswith("tunnel:")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# sync_peer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_get_responses(peer_url: str) -> dict:
|
||||
"""Minimal mock _get returning health, wings, and search results."""
|
||||
def _get(url: str) -> dict:
|
||||
if url.endswith("/health"):
|
||||
return {"status": "ok", "palace": "/var/lib/mempalace/fleet"}
|
||||
if url.endswith("/wings"):
|
||||
return {"wings": ["bezalel"]}
|
||||
if "/search" in url:
|
||||
return {"results": [_make_entry("test memory")]}
|
||||
return {}
|
||||
return _get
|
||||
|
||||
|
||||
def test_sync_peer_writes_closets(tmp_path):
|
||||
(tmp_path / ".gitkeep").touch() # ensure palace dir exists
|
||||
|
||||
with patch("mempalace.tunnel_sync._get", side_effect=_mock_get_responses("http://peer:7771")):
|
||||
result = sync_peer("http://peer:7771", tmp_path, n_results=10)
|
||||
|
||||
assert result.ok
|
||||
assert "bezalel" in result.wings_found
|
||||
assert result.closets_written > 0
|
||||
|
||||
|
||||
def test_sync_peer_dry_run_no_files(tmp_path):
|
||||
(tmp_path / ".gitkeep").touch()
|
||||
|
||||
with patch("mempalace.tunnel_sync._get", side_effect=_mock_get_responses("http://peer:7771")):
|
||||
result = sync_peer("http://peer:7771", tmp_path, n_results=10, dry_run=True)
|
||||
|
||||
assert result.ok
|
||||
# No closet files should be written
|
||||
closets = list(tmp_path.rglob("*.closet.json"))
|
||||
assert closets == []
|
||||
|
||||
|
||||
def test_sync_peer_unreachable_returns_error(tmp_path):
|
||||
import urllib.error
|
||||
with patch("mempalace.tunnel_sync._get", side_effect=urllib.error.URLError("refused")):
|
||||
result = sync_peer("http://unreachable:7771", tmp_path)
|
||||
|
||||
assert not result.ok
|
||||
assert any("unreachable" in e or "refused" in e for e in result.errors)
|
||||
|
||||
|
||||
def test_sync_peer_unhealthy_returns_error(tmp_path):
|
||||
with patch("mempalace.tunnel_sync._get", return_value={"status": "degraded"}):
|
||||
result = sync_peer("http://peer:7771", tmp_path)
|
||||
|
||||
assert not result.ok
|
||||
assert any("unhealthy" in e for e in result.errors)
|
||||
|
||||
|
||||
def test_sync_peer_no_wings_is_ok(tmp_path):
|
||||
def _get(url: str) -> dict:
|
||||
if "/health" in url:
|
||||
return {"status": "ok"}
|
||||
return {"wings": []}
|
||||
|
||||
with patch("mempalace.tunnel_sync._get", side_effect=_get):
|
||||
result = sync_peer("http://peer:7771", tmp_path)
|
||||
|
||||
assert result.ok
|
||||
assert result.closets_written == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SyncResult.ok
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_sync_result_ok_no_errors():
|
||||
r = SyncResult(wings_found=["bezalel"], rooms_pulled=5, closets_written=5)
|
||||
assert r.ok is True
|
||||
|
||||
|
||||
def test_sync_result_not_ok_with_errors():
|
||||
r = SyncResult(errors=["connection refused"])
|
||||
assert r.ok is False
|
||||
Reference in New Issue
Block a user