Compare commits
5 Commits
groq/issue
...
groq/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b445c04037 | ||
| 60bd9a05ff | |||
| c7468a3c6a | |||
| 07a4be3bb9 | |||
| 804536a3f2 |
21
.gitea/workflows/review_gate.yml
Normal file
21
.gitea/workflows/review_gate.yml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
name: Review Approval Gate
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
branches: [main]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
verify-review:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Verify PR has approving review
|
||||||
|
env:
|
||||||
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
|
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||||
|
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||||
|
PR_NUMBER: ${{ gitea.event.pull_request.number }}
|
||||||
|
run: |
|
||||||
|
python3 scripts/review_gate.py
|
||||||
20
.gitea/workflows/staging_gate.yml
Normal file
20
.gitea/workflows/staging_gate.yml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
name: Staging Verification Gate
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
verify-staging:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Verify staging label on merge PR
|
||||||
|
env:
|
||||||
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
|
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||||
|
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||||
|
run: |
|
||||||
|
python3 scripts/staging_gate.py
|
||||||
28
.gitea/workflows/weekly-audit.yml
Normal file
28
.gitea/workflows/weekly-audit.yml
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
name: Weekly Privacy Audit
|
||||||
|
|
||||||
|
# Runs every Monday at 05:00 UTC against a CI test fixture.
|
||||||
|
# On production wizards this same script should be run via cron:
|
||||||
|
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
|
||||||
|
#
|
||||||
|
# Refs: #1083, #1075
|
||||||
|
|
||||||
|
on:
|
||||||
|
schedule:
|
||||||
|
- cron: "0 5 * * 1" # Monday 05:00 UTC
|
||||||
|
workflow_dispatch: {} # allow manual trigger
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
privacy-audit:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Setup Python
|
||||||
|
uses: actions/setup-python@v4
|
||||||
|
with:
|
||||||
|
python-version: "3.x"
|
||||||
|
|
||||||
|
- name: Run privacy audit against CI fixture
|
||||||
|
run: |
|
||||||
|
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
|
||||||
@@ -1,44 +1,6 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
# Wrapper for the canonical branch-protection sync script.
|
||||||
# Apply branch protections to all repositories
|
# Usage: ./gitea-branch-protection.sh
|
||||||
# Requires GITEA_TOKEN env var
|
set -euo pipefail
|
||||||
|
cd "$(dirname "$0")"
|
||||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
python3 scripts/sync_branch_protection.py
|
||||||
|
|
||||||
for repo in "${REPOS[@]}"
|
|
||||||
do
|
|
||||||
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
|
|
||||||
-H "Authorization: token $GITEA_TOKEN" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{
|
|
||||||
"required_reviews": 1,
|
|
||||||
"dismiss_stale_reviews": true,
|
|
||||||
"block_force_push": true,
|
|
||||||
"block_deletions": true
|
|
||||||
}'
|
|
||||||
done
|
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
# Gitea API credentials
|
|
||||||
GITEA_TOKEN="your-personal-access-token"
|
|
||||||
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
|
|
||||||
|
|
||||||
# Repos to protect
|
|
||||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
|
||||||
|
|
||||||
for REPO in "${REPO[@]}"; do
|
|
||||||
echo "Configuring branch protection for $REPO..."
|
|
||||||
|
|
||||||
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
|
|
||||||
-H "Content-Type: application/json" \
|
|
||||||
-d '{
|
|
||||||
"name": "main",
|
|
||||||
"require_pull_request": true,
|
|
||||||
"required_approvals": 1,
|
|
||||||
"dismiss_stale_approvals": true,
|
|
||||||
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
|
|
||||||
"block_force_push": true,
|
|
||||||
"block_delete": true
|
|
||||||
}' \
|
|
||||||
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
|
|
||||||
done
|
|
||||||
|
|||||||
186
mempalace/fleet_api.py
Normal file
186
mempalace/fleet_api.py
Normal file
@@ -0,0 +1,186 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
|
||||||
|
|
||||||
|
Exposes fleet memory search over HTTP so that Alpha servers and other
|
||||||
|
wizard deployments can query the palace without direct filesystem access.
|
||||||
|
|
||||||
|
Endpoints:
|
||||||
|
GET /health
|
||||||
|
Returns {"status": "ok", "palace": "<path>"}
|
||||||
|
|
||||||
|
GET /search?q=<query>[&room=<room>][&n=<int>]
|
||||||
|
Returns {"results": [...], "query": "...", "room": "...", "count": N}
|
||||||
|
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
|
||||||
|
|
||||||
|
GET /wings
|
||||||
|
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
|
||||||
|
|
||||||
|
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
|
||||||
|
python mempalace/fleet_api.py
|
||||||
|
|
||||||
|
# Custom host/port/palace:
|
||||||
|
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
|
||||||
|
|
||||||
|
Refs: #1078, #1075
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||||
|
from pathlib import Path
|
||||||
|
from urllib.parse import parse_qs, urlparse
|
||||||
|
|
||||||
|
# Add repo root to path so we can import nexus.mempalace
|
||||||
|
_HERE = Path(__file__).resolve().parent
|
||||||
|
_REPO_ROOT = _HERE.parent
|
||||||
|
if str(_REPO_ROOT) not in sys.path:
|
||||||
|
sys.path.insert(0, str(_REPO_ROOT))
|
||||||
|
|
||||||
|
DEFAULT_HOST = "127.0.0.1"
|
||||||
|
DEFAULT_PORT = 7771
|
||||||
|
MAX_RESULTS = 50
|
||||||
|
|
||||||
|
|
||||||
|
def _get_palace_path() -> Path:
|
||||||
|
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
|
||||||
|
|
||||||
|
|
||||||
|
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
|
||||||
|
payload = json.dumps(body).encode()
|
||||||
|
handler.send_response(status)
|
||||||
|
handler.send_header("Content-Type", "application/json")
|
||||||
|
handler.send_header("Content-Length", str(len(payload)))
|
||||||
|
handler.end_headers()
|
||||||
|
handler.wfile.write(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
|
||||||
|
palace = _get_palace_path()
|
||||||
|
_json_response(handler, 200, {
|
||||||
|
"status": "ok",
|
||||||
|
"palace": str(palace),
|
||||||
|
"palace_exists": palace.exists(),
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
|
||||||
|
query_terms = qs.get("q", [""])
|
||||||
|
q = query_terms[0].strip() if query_terms else ""
|
||||||
|
if not q:
|
||||||
|
_json_response(handler, 400, {"error": "Missing required parameter: q"})
|
||||||
|
return
|
||||||
|
|
||||||
|
room_terms = qs.get("room", [])
|
||||||
|
room = room_terms[0].strip() if room_terms else None
|
||||||
|
|
||||||
|
n_terms = qs.get("n", [])
|
||||||
|
try:
|
||||||
|
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
|
||||||
|
except ImportError as exc:
|
||||||
|
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
results = search_fleet(q, room=room, n_results=n)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
_json_response(handler, 503, {"error": str(exc)})
|
||||||
|
return
|
||||||
|
|
||||||
|
_json_response(handler, 200, {
|
||||||
|
"query": q,
|
||||||
|
"room": room,
|
||||||
|
"count": len(results),
|
||||||
|
"results": [
|
||||||
|
{
|
||||||
|
"text": r.text,
|
||||||
|
"room": r.room,
|
||||||
|
"wing": r.wing,
|
||||||
|
"score": round(r.score, 4),
|
||||||
|
}
|
||||||
|
for r in results
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
|
||||||
|
"""Return distinct wizard wing names found in the fleet palace directory."""
|
||||||
|
palace = _get_palace_path()
|
||||||
|
if not palace.exists():
|
||||||
|
_json_response(handler, 503, {
|
||||||
|
"error": f"Fleet palace not found: {palace}",
|
||||||
|
})
|
||||||
|
return
|
||||||
|
|
||||||
|
wings = sorted({
|
||||||
|
p.name for p in palace.iterdir() if p.is_dir()
|
||||||
|
})
|
||||||
|
_json_response(handler, 200, {"wings": wings})
|
||||||
|
|
||||||
|
|
||||||
|
class FleetAPIHandler(BaseHTTPRequestHandler):
|
||||||
|
"""Request handler for the fleet memory API."""
|
||||||
|
|
||||||
|
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
|
||||||
|
# Prefix with tag for easier log filtering
|
||||||
|
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
|
||||||
|
|
||||||
|
def do_GET(self) -> None: # noqa: N802
|
||||||
|
parsed = urlparse(self.path)
|
||||||
|
path = parsed.path.rstrip("/") or "/"
|
||||||
|
qs = parse_qs(parsed.query)
|
||||||
|
|
||||||
|
if path == "/health":
|
||||||
|
_handle_health(self)
|
||||||
|
elif path == "/search":
|
||||||
|
_handle_search(self, qs)
|
||||||
|
elif path == "/wings":
|
||||||
|
_handle_wings(self)
|
||||||
|
else:
|
||||||
|
_json_response(self, 404, {
|
||||||
|
"error": f"Unknown endpoint: {path}",
|
||||||
|
"endpoints": ["/health", "/search", "/wings"],
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
|
||||||
|
return HTTPServer((host, port), FleetAPIHandler)
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Fleet palace HTTP API server."
|
||||||
|
)
|
||||||
|
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
|
||||||
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
|
palace = _get_palace_path()
|
||||||
|
print(f"[fleet_api] Palace: {palace}")
|
||||||
|
if not palace.exists():
|
||||||
|
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
|
||||||
|
|
||||||
|
server = make_server(args.host, args.port)
|
||||||
|
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
|
||||||
|
try:
|
||||||
|
server.serve_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n[fleet_api] Shutting down.")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Fleet Merge Review Audit
|
||||||
|
========================
|
||||||
|
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||||
|
and validates that each merged PR had at least one approving review.
|
||||||
|
|
||||||
|
Exit 0 = no unreviewed merges
|
||||||
|
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python scripts/audit_merge_reviews.py
|
||||||
|
python scripts/audit_merge_reviews.py --create-issues
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
import urllib.request
|
||||||
|
import urllib.error
|
||||||
|
import json
|
||||||
|
|
||||||
|
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||||
|
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||||
|
ORG = "Timmy_Foundation"
|
||||||
|
DAYS_BACK = 7
|
||||||
|
SECURITY_LABEL = "security"
|
||||||
|
|
||||||
|
|
||||||
|
def api_request(path: str) -> dict | list:
|
||||||
|
url = f"{GITEA_URL}/api/v1{path}"
|
||||||
|
req = urllib.request.Request(url, headers={
|
||||||
|
"Authorization": f"token {GITEA_TOKEN}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
})
|
||||||
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||||
|
return json.loads(resp.read().decode())
|
||||||
|
|
||||||
|
|
||||||
|
def api_post(path: str, payload: dict) -> dict:
|
||||||
|
url = f"{GITEA_URL}/api/v1{path}"
|
||||||
|
data = json.dumps(payload).encode()
|
||||||
|
req = urllib.request.Request(url, data=data, headers={
|
||||||
|
"Authorization": f"token {GITEA_TOKEN}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
})
|
||||||
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||||
|
return json.loads(resp.read().decode())
|
||||||
|
|
||||||
|
|
||||||
|
def get_repos() -> list[str]:
|
||||||
|
repos = []
|
||||||
|
page = 1
|
||||||
|
while True:
|
||||||
|
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||||
|
if not batch:
|
||||||
|
break
|
||||||
|
repos.extend([r["name"] for r in batch])
|
||||||
|
page += 1
|
||||||
|
return repos
|
||||||
|
|
||||||
|
|
||||||
|
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||||
|
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||||
|
prs = []
|
||||||
|
page = 1
|
||||||
|
while True:
|
||||||
|
batch = api_request(
|
||||||
|
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||||
|
)
|
||||||
|
if not batch:
|
||||||
|
break
|
||||||
|
for pr in batch:
|
||||||
|
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||||
|
prs.append(pr)
|
||||||
|
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||||
|
return prs
|
||||||
|
page += 1
|
||||||
|
return prs
|
||||||
|
|
||||||
|
|
||||||
|
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||||
|
try:
|
||||||
|
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
if e.code == 404:
|
||||||
|
return []
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||||
|
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||||
|
body = (
|
||||||
|
f"## Unreviewed Merge Detected\n\n"
|
||||||
|
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||||
|
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||||
|
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||||
|
f"- **Merged at:** {pr['merged_at']}\n"
|
||||||
|
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||||
|
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||||
|
f"### Required Actions\n"
|
||||||
|
f"1. Validate the merge contents are safe.\n"
|
||||||
|
f"2. If malicious or incorrect, revert immediately.\n"
|
||||||
|
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||||
|
"title": title,
|
||||||
|
"body": body,
|
||||||
|
"labels": [SECURITY_LABEL],
|
||||||
|
})
|
||||||
|
return issue.get("number")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" FAILED to create issue: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if not GITEA_TOKEN:
|
||||||
|
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||||
|
since = since_dt.isoformat()
|
||||||
|
|
||||||
|
repos = get_repos()
|
||||||
|
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||||
|
|
||||||
|
unreviewed_count = 0
|
||||||
|
for repo in repos:
|
||||||
|
merged = get_merged_prs(repo, since)
|
||||||
|
if not merged:
|
||||||
|
continue
|
||||||
|
|
||||||
|
repo_unreviewed = []
|
||||||
|
for pr in merged:
|
||||||
|
reviews = get_reviews(repo, pr["number"])
|
||||||
|
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||||
|
if not approvals:
|
||||||
|
repo_unreviewed.append(pr)
|
||||||
|
|
||||||
|
if repo_unreviewed:
|
||||||
|
print(f"\n{repo}:")
|
||||||
|
for pr in repo_unreviewed:
|
||||||
|
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||||
|
unreviewed_count += 1
|
||||||
|
if args.create_issues:
|
||||||
|
issue_num = create_post_mortem(repo, pr)
|
||||||
|
if issue_num:
|
||||||
|
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||||
|
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
if unreviewed_count == 0:
|
||||||
|
print("All merges in the last 7 days had at least one approving review.")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
70
scripts/review_gate.py
Normal file
70
scripts/review_gate.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Review Gate — Poka-yoke for unreviewed merges.
|
||||||
|
Fails if the current PR has fewer than 1 approving review.
|
||||||
|
|
||||||
|
Usage in Gitea workflow:
|
||||||
|
- name: Review Approval Gate
|
||||||
|
run: python scripts/review_gate.py
|
||||||
|
env:
|
||||||
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
from urllib import request, error
|
||||||
|
|
||||||
|
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||||
|
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||||
|
REPO = os.environ.get("GITEA_REPO", "")
|
||||||
|
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
||||||
|
|
||||||
|
|
||||||
|
def api_call(method, path):
|
||||||
|
url = f"{GITEA_URL}/api/v1{path}"
|
||||||
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||||
|
req = request.Request(url, method=method, headers=headers)
|
||||||
|
try:
|
||||||
|
with request.urlopen(req, timeout=30) as resp:
|
||||||
|
return json.loads(resp.read().decode())
|
||||||
|
except error.HTTPError as e:
|
||||||
|
return {"error": e.read().decode(), "status": e.code}
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if not GITEA_TOKEN:
|
||||||
|
print("ERROR: GITEA_TOKEN not set")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not REPO:
|
||||||
|
print("ERROR: GITEA_REPO not set")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
pr_number = PR_NUMBER
|
||||||
|
if not pr_number:
|
||||||
|
# Try to infer from Gitea Actions environment
|
||||||
|
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
||||||
|
|
||||||
|
if not pr_number:
|
||||||
|
print("ERROR: Could not determine PR number")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
|
||||||
|
if isinstance(reviews, dict) and "error" in reviews:
|
||||||
|
print(f"ERROR fetching reviews: {reviews}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||||
|
if len(approvals) >= 1:
|
||||||
|
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
|
||||||
|
print("Merges are not permitted without at least one approval.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
77
scripts/staging_gate.py
Normal file
77
scripts/staging_gate.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Staging Gate — Poka-yoke for production deployments.
|
||||||
|
Checks if the PR that introduced the current commit was marked `staging-verified`.
|
||||||
|
Fails the workflow if not, blocking deploy.yml from proceeding.
|
||||||
|
|
||||||
|
Usage in Gitea workflow:
|
||||||
|
- name: Staging Verification Gate
|
||||||
|
run: python scripts/staging_gate.py
|
||||||
|
env:
|
||||||
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
from urllib import request, error
|
||||||
|
|
||||||
|
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||||
|
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||||
|
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
|
||||||
|
|
||||||
|
|
||||||
|
def api_call(method, path):
|
||||||
|
url = f"{GITEA_URL}/api/v1{path}"
|
||||||
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||||
|
req = request.Request(url, method=method, headers=headers)
|
||||||
|
try:
|
||||||
|
with request.urlopen(req, timeout=30) as resp:
|
||||||
|
return json.loads(resp.read().decode())
|
||||||
|
except error.HTTPError as e:
|
||||||
|
return {"error": e.read().decode(), "status": e.code}
|
||||||
|
|
||||||
|
|
||||||
|
def get_commit_sha():
|
||||||
|
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
|
||||||
|
return result.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def get_pr_for_commit(sha):
|
||||||
|
# Search open and closed PRs for this commit
|
||||||
|
for state in ["closed", "open"]:
|
||||||
|
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
|
||||||
|
if isinstance(prs, list):
|
||||||
|
for pr in prs:
|
||||||
|
if pr.get("merge_commit_sha") == sha:
|
||||||
|
return pr
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if not GITEA_TOKEN:
|
||||||
|
print("ERROR: GITEA_TOKEN not set")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
sha = get_commit_sha()
|
||||||
|
pr = get_pr_for_commit(sha)
|
||||||
|
|
||||||
|
if not pr:
|
||||||
|
# Direct push to main without PR — block unless explicitly forced
|
||||||
|
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
|
||||||
|
print("To bypass, merge via PR and add the 'staging-verified' label.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
labels = {label["name"] for label in pr.get("labels", [])}
|
||||||
|
if "staging-verified" in labels:
|
||||||
|
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
|
||||||
|
print("Deploy to production is not permitted until staging is verified.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
81
scripts/sync_branch_protection.py
Normal file
81
scripts/sync_branch_protection.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
|
||||||
|
Correctly uses the Gitea 1.25+ API (not GitHub-style).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import urllib.request
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||||
|
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||||
|
ORG = "Timmy_Foundation"
|
||||||
|
CONFIG_DIR = ".gitea/branch-protection"
|
||||||
|
|
||||||
|
|
||||||
|
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
|
||||||
|
url = f"{GITEA_URL}/api/v1{path}"
|
||||||
|
data = json.dumps(payload).encode() if payload else None
|
||||||
|
req = urllib.request.Request(url, data=data, method=method, headers={
|
||||||
|
"Authorization": f"token {GITEA_TOKEN}",
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
})
|
||||||
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||||
|
return json.loads(resp.read().decode())
|
||||||
|
|
||||||
|
|
||||||
|
def apply_protection(repo: str, rules: dict) -> bool:
|
||||||
|
branch = rules.pop("branch", "main")
|
||||||
|
# Check if protection already exists
|
||||||
|
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
|
||||||
|
exists = any(r.get("branch_name") == branch for r in existing)
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"branch_name": branch,
|
||||||
|
"rule_name": branch,
|
||||||
|
"required_approvals": rules.get("required_approvals", 1),
|
||||||
|
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
|
||||||
|
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
|
||||||
|
"block_deletions": rules.get("block_deletions", True),
|
||||||
|
"block_force_push": rules.get("block_force_push", True),
|
||||||
|
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
|
||||||
|
"enable_status_check": rules.get("require_ci_to_merge", False),
|
||||||
|
"status_check_contexts": rules.get("status_check_contexts", []),
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
if exists:
|
||||||
|
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
|
||||||
|
else:
|
||||||
|
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
|
||||||
|
print(f"✅ {repo}:{branch} synced")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ {repo}:{branch} failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
if not GITEA_TOKEN:
|
||||||
|
print("ERROR: GITEA_TOKEN not set")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
ok = 0
|
||||||
|
for fname in os.listdir(CONFIG_DIR):
|
||||||
|
if not fname.endswith(".yml"):
|
||||||
|
continue
|
||||||
|
repo = fname[:-4]
|
||||||
|
with open(os.path.join(CONFIG_DIR, fname)) as f:
|
||||||
|
cfg = yaml.safe_load(f)
|
||||||
|
if apply_protection(repo, cfg.get("rules", {})):
|
||||||
|
ok += 1
|
||||||
|
|
||||||
|
print(f"\nSynced {ok} repo(s)")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
"wizard": "bezalel",
|
||||||
|
"room": "forge",
|
||||||
|
"drawers": [
|
||||||
|
{
|
||||||
|
"text": "CI pipeline green on main. All 253 tests passing.",
|
||||||
|
"source_file": "forge.closet.json",
|
||||||
|
"closet": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
|
||||||
|
"source_file": "forge.closet.json",
|
||||||
|
"closet": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"wizard": "bezalel",
|
||||||
|
"room": "hermes",
|
||||||
|
"drawers": [
|
||||||
|
{
|
||||||
|
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
|
||||||
|
"source_file": "hermes.closet.json",
|
||||||
|
"closet": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"wizard": "bezalel",
|
||||||
|
"room": "issues",
|
||||||
|
"drawers": [
|
||||||
|
{
|
||||||
|
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
|
||||||
|
"source_file": "issues.closet.json",
|
||||||
|
"closet": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
239
tests/test_mempalace_fleet_api.py
Normal file
239
tests/test_mempalace_fleet_api.py
Normal file
@@ -0,0 +1,239 @@
|
|||||||
|
"""
|
||||||
|
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
|
||||||
|
|
||||||
|
Refs: #1078, #1075
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import json
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Import handler directly so we can test without running a server process.
|
||||||
|
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class _FakeSocket:
|
||||||
|
"""Minimal socket stub for BaseHTTPRequestHandler."""
|
||||||
|
|
||||||
|
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
|
||||||
|
return io.BytesIO(b"")
|
||||||
|
|
||||||
|
|
||||||
|
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
|
||||||
|
"""Construct a handler pointed at *path*, capture wfile output."""
|
||||||
|
buf = io.BytesIO()
|
||||||
|
request = _FakeSocket()
|
||||||
|
client_address = ("127.0.0.1", 0)
|
||||||
|
|
||||||
|
handler = FleetAPIHandler.__new__(FleetAPIHandler)
|
||||||
|
handler.path = path
|
||||||
|
handler.request = request
|
||||||
|
handler.client_address = client_address
|
||||||
|
handler.server = MagicMock()
|
||||||
|
handler.wfile = buf
|
||||||
|
handler.rfile = io.BytesIO(b"")
|
||||||
|
handler.command = "GET"
|
||||||
|
handler._headers_buffer = []
|
||||||
|
|
||||||
|
# Stub send_response / send_header / end_headers to write minimal HTTP
|
||||||
|
handler._response_code = None
|
||||||
|
def _send_response(code, message=None): # noqa: ANN001
|
||||||
|
handler._response_code = code
|
||||||
|
def _send_header(k, v): # noqa: ANN001
|
||||||
|
pass
|
||||||
|
def _end_headers(): # noqa: ANN001
|
||||||
|
pass
|
||||||
|
handler.send_response = _send_response
|
||||||
|
handler.send_header = _send_header
|
||||||
|
handler.end_headers = _end_headers
|
||||||
|
|
||||||
|
return handler, buf
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_response(buf: io.BytesIO) -> dict:
|
||||||
|
buf.seek(0)
|
||||||
|
return json.loads(buf.read())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# /health
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_health_returns_ok(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||||
|
handler, buf = _make_handler("/health")
|
||||||
|
_handle_health(handler)
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert data["status"] == "ok"
|
||||||
|
assert data["palace_exists"] is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_missing_palace(tmp_path, monkeypatch):
|
||||||
|
missing = tmp_path / "nonexistent"
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||||
|
handler, buf = _make_handler("/health")
|
||||||
|
_handle_health(handler)
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert data["status"] == "ok"
|
||||||
|
assert data["palace_exists"] is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# /search
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _mock_search_fleet(results):
|
||||||
|
"""Return a patch target that returns *results*."""
|
||||||
|
mock = MagicMock(return_value=results)
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
|
||||||
|
from nexus.mempalace.searcher import MemPalaceResult
|
||||||
|
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_missing_q_param():
|
||||||
|
handler, buf = _make_handler("/search")
|
||||||
|
_handle_search(handler, {})
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert "error" in data
|
||||||
|
assert handler._response_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_returns_results(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||||
|
(tmp_path / "chroma.sqlite3").touch()
|
||||||
|
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
|
||||||
|
|
||||||
|
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
|
||||||
|
handler, buf = _make_handler("/search?q=CI")
|
||||||
|
|
||||||
|
import nexus.mempalace.searcher as s_module
|
||||||
|
with patch.object(s_module, "search_fleet", return_value=[result]):
|
||||||
|
import importlib
|
||||||
|
import mempalace.fleet_api as api_module
|
||||||
|
# Patch search_fleet inside the handler's import context
|
||||||
|
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
|
||||||
|
_handle_search(handler, {"q": ["CI"]})
|
||||||
|
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert data["count"] == 1
|
||||||
|
assert data["results"][0]["text"] == "CI green"
|
||||||
|
assert data["results"][0]["room"] == "forge"
|
||||||
|
assert data["results"][0]["wing"] == "bezalel"
|
||||||
|
assert data["results"][0]["score"] == 0.95
|
||||||
|
assert handler._response_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_with_room_filter(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||||
|
result = _make_result()
|
||||||
|
|
||||||
|
import nexus.mempalace.searcher as s_module
|
||||||
|
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
|
||||||
|
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
|
||||||
|
|
||||||
|
# Verify room was passed through
|
||||||
|
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_invalid_n_param():
|
||||||
|
handler, buf = _make_handler("/search?q=test&n=bad")
|
||||||
|
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert "error" in data
|
||||||
|
assert handler._response_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_palace_unavailable(monkeypatch):
|
||||||
|
from nexus.mempalace.searcher import MemPalaceUnavailable
|
||||||
|
|
||||||
|
handler, buf = _make_handler("/search?q=test")
|
||||||
|
|
||||||
|
import nexus.mempalace.searcher as s_module
|
||||||
|
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
|
||||||
|
_handle_search(handler, {"q": ["test"]})
|
||||||
|
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert "error" in data
|
||||||
|
assert handler._response_code == 503
|
||||||
|
|
||||||
|
|
||||||
|
def test_search_n_clamped_to_max():
|
||||||
|
"""n > MAX_RESULTS is silently clamped."""
|
||||||
|
import nexus.mempalace.searcher as s_module
|
||||||
|
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
|
||||||
|
handler = MagicMock()
|
||||||
|
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
|
||||||
|
|
||||||
|
mock_sf.assert_called_once_with("test", room=None, n_results=50)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# /wings
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_wings_returns_list(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||||
|
(tmp_path / "bezalel").mkdir()
|
||||||
|
(tmp_path / "timmy").mkdir()
|
||||||
|
# A file should not appear in wings
|
||||||
|
(tmp_path / "README.txt").touch()
|
||||||
|
|
||||||
|
handler, buf = _make_handler("/wings")
|
||||||
|
_handle_wings(handler)
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert set(data["wings"]) == {"bezalel", "timmy"}
|
||||||
|
assert handler._response_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_wings_missing_palace(tmp_path, monkeypatch):
|
||||||
|
missing = tmp_path / "nonexistent"
|
||||||
|
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||||
|
handler, buf = _make_handler("/wings")
|
||||||
|
_handle_wings(handler)
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert "error" in data
|
||||||
|
assert handler._response_code == 503
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 404 unknown endpoint
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_unknown_endpoint():
|
||||||
|
handler, buf = _make_handler("/foobar")
|
||||||
|
handler.do_GET()
|
||||||
|
data = _parse_response(buf)
|
||||||
|
assert "error" in data
|
||||||
|
assert handler._response_code == 404
|
||||||
|
assert "/search" in data["endpoints"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# audit fixture smoke test
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def test_audit_fixture_is_clean():
|
||||||
|
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
|
||||||
|
from mempalace.audit_privacy import audit_palace
|
||||||
|
|
||||||
|
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
|
||||||
|
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
|
||||||
|
|
||||||
|
result = audit_palace(fixture_dir)
|
||||||
|
assert result.clean, (
|
||||||
|
f"Privacy violations found in CI fixture:\n" +
|
||||||
|
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user