Compare commits
9 Commits
groq/issue
...
groq/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b445c04037 | ||
| 60bd9a05ff | |||
| c7468a3c6a | |||
| 07a4be3bb9 | |||
| 804536a3f2 | |||
|
|
a0ee7858ff | ||
| 34ec13bc29 | |||
| ea3cc6b393 | |||
| caa7823cdd |
21
.gitea/workflows/review_gate.yml
Normal file
21
.gitea/workflows/review_gate.yml
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Review Approval Gate
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-review:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify PR has approving review
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
PR_NUMBER: ${{ gitea.event.pull_request.number }}
|
||||
run: |
|
||||
python3 scripts/review_gate.py
|
||||
20
.gitea/workflows/staging_gate.yml
Normal file
20
.gitea/workflows/staging_gate.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: Staging Verification Gate
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
verify-staging:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Verify staging label on merge PR
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
GITEA_URL: ${{ vars.GITEA_URL || 'https://forge.alexanderwhitestone.com' }}
|
||||
GITEA_REPO: Timmy_Foundation/the-nexus
|
||||
run: |
|
||||
python3 scripts/staging_gate.py
|
||||
28
.gitea/workflows/weekly-audit.yml
Normal file
28
.gitea/workflows/weekly-audit.yml
Normal file
@@ -0,0 +1,28 @@
|
||||
name: Weekly Privacy Audit
|
||||
|
||||
# Runs every Monday at 05:00 UTC against a CI test fixture.
|
||||
# On production wizards this same script should be run via cron:
|
||||
# 0 5 * * 1 python /opt/nexus/mempalace/audit_privacy.py /var/lib/mempalace/fleet
|
||||
#
|
||||
# Refs: #1083, #1075
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 5 * * 1" # Monday 05:00 UTC
|
||||
workflow_dispatch: {} # allow manual trigger
|
||||
|
||||
jobs:
|
||||
privacy-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python
|
||||
uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: "3.x"
|
||||
|
||||
- name: Run privacy audit against CI fixture
|
||||
run: |
|
||||
python mempalace/audit_privacy.py tests/fixtures/fleet_palace
|
||||
326
bin/bezalel_heartbeat_check.py
Executable file
326
bin/bezalel_heartbeat_check.py
Executable file
@@ -0,0 +1,326 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bezalel Meta-Heartbeat Checker — stale cron detection (poka-yoke #1096)
|
||||
|
||||
Monitors all cron job heartbeat files and alerts P1 when any job has been
|
||||
silent for more than 2× its declared interval.
|
||||
|
||||
POKA-YOKE design:
|
||||
Prevention — cron-heartbeat-write.sh writes a .last file atomically after
|
||||
every successful cron job completion, stamping its interval.
|
||||
Detection — this script runs every 15 minutes (via systemd timer) and
|
||||
raises P1 on stderr + writes an alert file for any stale job.
|
||||
Correction — alerts are loud enough (P1 stderr + alert files) for
|
||||
monitoring/humans to intervene before the next run window.
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
=================
|
||||
Pure stdlib. No pip installs.
|
||||
|
||||
USAGE
|
||||
=====
|
||||
# One-shot check (default dir)
|
||||
python bin/bezalel_heartbeat_check.py
|
||||
|
||||
# Override heartbeat dir
|
||||
python bin/bezalel_heartbeat_check.py --heartbeat-dir /tmp/test-beats
|
||||
|
||||
# Dry-run (check + report, don't write alert files)
|
||||
python bin/bezalel_heartbeat_check.py --dry-run
|
||||
|
||||
# JSON output (for piping into other tools)
|
||||
python bin/bezalel_heartbeat_check.py --json
|
||||
|
||||
EXIT CODES
|
||||
==========
|
||||
0 — all jobs healthy (or no .last files found yet)
|
||||
1 — one or more stale beats detected
|
||||
2 — heartbeat dir unreadable
|
||||
|
||||
IMPORTABLE API
|
||||
==============
|
||||
from bin.bezalel_heartbeat_check import check_cron_heartbeats
|
||||
|
||||
result = check_cron_heartbeats("/var/run/bezalel/heartbeats")
|
||||
# Returns dict with keys: checked_at, jobs, stale_count, healthy_count
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.heartbeat")
|
||||
|
||||
# ── Configuration ────────────────────────────────────────────────────
|
||||
|
||||
DEFAULT_HEARTBEAT_DIR = "/var/run/bezalel/heartbeats"
|
||||
|
||||
|
||||
# ── Core checker ─────────────────────────────────────────────────────
|
||||
|
||||
def check_cron_heartbeats(heartbeat_dir: str = DEFAULT_HEARTBEAT_DIR) -> Dict[str, Any]:
|
||||
"""
|
||||
Scan all .last files in heartbeat_dir and determine which jobs are stale.
|
||||
|
||||
Returns a dict:
|
||||
{
|
||||
"checked_at": "<ISO 8601 timestamp>",
|
||||
"jobs": [
|
||||
{
|
||||
"job": str,
|
||||
"healthy": bool,
|
||||
"age_secs": float,
|
||||
"interval": int,
|
||||
"last_seen": str or None, # ISO timestamp of last heartbeat
|
||||
"message": str,
|
||||
},
|
||||
...
|
||||
],
|
||||
"stale_count": int,
|
||||
"healthy_count": int,
|
||||
}
|
||||
|
||||
On empty dir (no .last files), returns jobs=[] with stale_count=0.
|
||||
On corrupt .last file, reports that job as stale with an error message.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
now_ts = time.time()
|
||||
checked_at = datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat()
|
||||
|
||||
hb_path = Path(heartbeat_dir)
|
||||
jobs: List[Dict[str, Any]] = []
|
||||
|
||||
if not hb_path.exists():
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": [],
|
||||
"stale_count": 0,
|
||||
"healthy_count": 0,
|
||||
}
|
||||
|
||||
last_files = sorted(hb_path.glob("*.last"))
|
||||
|
||||
for last_file in last_files:
|
||||
job_name = last_file.stem # filename without .last extension
|
||||
|
||||
# Read and parse the heartbeat file
|
||||
try:
|
||||
raw = last_file.read_text(encoding="utf-8")
|
||||
data = json.loads(raw)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": False,
|
||||
"age_secs": float("inf"),
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": f"CORRUPT: cannot read/parse heartbeat file: {exc}",
|
||||
})
|
||||
continue
|
||||
|
||||
# Extract fields with safe defaults
|
||||
beat_timestamp = float(data.get("timestamp", 0))
|
||||
interval = int(data.get("interval", 3600))
|
||||
pid = data.get("pid", "?")
|
||||
|
||||
age_secs = now_ts - beat_timestamp
|
||||
|
||||
# Convert beat_timestamp to a readable ISO string
|
||||
try:
|
||||
last_seen = datetime.fromtimestamp(beat_timestamp, tz=timezone.utc).isoformat()
|
||||
except (OSError, OverflowError, ValueError):
|
||||
last_seen = None
|
||||
|
||||
# Stale = silent for more than 2× the declared interval
|
||||
threshold = 2 * interval
|
||||
is_stale = age_secs > threshold
|
||||
|
||||
if is_stale:
|
||||
message = (
|
||||
f"STALE (last {age_secs:.0f}s ago, interval {interval}s"
|
||||
f" — exceeds 2x threshold of {threshold}s)"
|
||||
)
|
||||
else:
|
||||
message = f"OK (last {age_secs:.0f}s ago, interval {interval}s)"
|
||||
|
||||
jobs.append({
|
||||
"job": job_name,
|
||||
"healthy": not is_stale,
|
||||
"age_secs": age_secs,
|
||||
"interval": interval,
|
||||
"last_seen": last_seen,
|
||||
"message": message,
|
||||
})
|
||||
|
||||
stale_count = sum(1 for j in jobs if not j["healthy"])
|
||||
healthy_count = sum(1 for j in jobs if j["healthy"])
|
||||
|
||||
return {
|
||||
"checked_at": checked_at,
|
||||
"jobs": jobs,
|
||||
"stale_count": stale_count,
|
||||
"healthy_count": healthy_count,
|
||||
}
|
||||
|
||||
|
||||
# ── Alert file writer ────────────────────────────────────────────────
|
||||
|
||||
def write_alert(heartbeat_dir: str, job_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Write an alert file for a stale job to <heartbeat_dir>/alerts/<job>.alert
|
||||
|
||||
Alert files are watched by external monitoring. They persist until the
|
||||
job runs again and clears stale status on the next check cycle.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
alerts_dir = Path(heartbeat_dir) / "alerts"
|
||||
try:
|
||||
alerts_dir.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as exc:
|
||||
logger.warning("Cannot create alerts dir %s: %s", alerts_dir, exc)
|
||||
return
|
||||
|
||||
alert_file = alerts_dir / f"{job_info['job']}.alert"
|
||||
now_str = datetime.now(tz=timezone.utc).isoformat()
|
||||
|
||||
content = {
|
||||
"alert_level": "P1",
|
||||
"job": job_info["job"],
|
||||
"message": job_info["message"],
|
||||
"age_secs": job_info["age_secs"],
|
||||
"interval": job_info["interval"],
|
||||
"last_seen": job_info["last_seen"],
|
||||
"detected_at": now_str,
|
||||
}
|
||||
|
||||
# Atomic write via temp + rename (same poka-yoke pattern as the writer)
|
||||
tmp_file = alert_file.with_suffix(f".alert.tmp.{os.getpid()}")
|
||||
try:
|
||||
tmp_file.write_text(json.dumps(content, indent=2), encoding="utf-8")
|
||||
tmp_file.rename(alert_file)
|
||||
except OSError as exc:
|
||||
logger.warning("Failed to write alert file %s: %s", alert_file, exc)
|
||||
tmp_file.unlink(missing_ok=True)
|
||||
|
||||
|
||||
# ── Main runner ──────────────────────────────────────────────────────
|
||||
|
||||
def run_check(heartbeat_dir: str, dry_run: bool = False, output_json: bool = False) -> int:
|
||||
"""
|
||||
Run a full heartbeat check cycle. Returns exit code (0/1/2).
|
||||
|
||||
Exit codes:
|
||||
0 — all healthy (or no .last files found yet)
|
||||
1 — stale beats detected
|
||||
2 — heartbeat dir unreadable (permissions, etc.)
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
hb_path = Path(heartbeat_dir)
|
||||
|
||||
# Check if dir exists but is unreadable (permissions)
|
||||
if hb_path.exists() and not os.access(heartbeat_dir, os.R_OK):
|
||||
logger.error("Heartbeat dir unreadable: %s", heartbeat_dir)
|
||||
return 2
|
||||
|
||||
result = check_cron_heartbeats(heartbeat_dir)
|
||||
|
||||
if output_json:
|
||||
print(json.dumps(result, indent=2))
|
||||
return 1 if result["stale_count"] > 0 else 0
|
||||
|
||||
# Human-readable output
|
||||
if not result["jobs"]:
|
||||
logger.warning(
|
||||
"No .last files found in %s — bezalel not yet provisioned or no jobs registered.",
|
||||
heartbeat_dir,
|
||||
)
|
||||
return 0
|
||||
|
||||
for job in result["jobs"]:
|
||||
if job["healthy"]:
|
||||
logger.info(" + %s: %s", job["job"], job["message"])
|
||||
else:
|
||||
logger.error(" - %s: %s", job["job"], job["message"])
|
||||
|
||||
if result["stale_count"] > 0:
|
||||
for job in result["jobs"]:
|
||||
if not job["healthy"]:
|
||||
# P1 alert to stderr
|
||||
print(
|
||||
f"[P1-ALERT] STALE CRON JOB: {job['job']} — {job['message']}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if not dry_run:
|
||||
write_alert(heartbeat_dir, job)
|
||||
else:
|
||||
logger.info("DRY RUN — would write alert for stale job: %s", job["job"])
|
||||
|
||||
logger.error(
|
||||
"Heartbeat check FAILED: %d stale, %d healthy",
|
||||
result["stale_count"],
|
||||
result["healthy_count"],
|
||||
)
|
||||
return 1
|
||||
|
||||
logger.info(
|
||||
"Heartbeat check PASSED: %d healthy, %d stale",
|
||||
result["healthy_count"],
|
||||
result["stale_count"],
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
# ── CLI entrypoint ───────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Bezalel Meta-Heartbeat Checker — detect silent cron failures (poka-yoke #1096)"
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-dir",
|
||||
default=DEFAULT_HEARTBEAT_DIR,
|
||||
help=f"Directory containing .last heartbeat files (default: {DEFAULT_HEARTBEAT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Check and report but do not write alert files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json",
|
||||
action="store_true",
|
||||
dest="output_json",
|
||||
help="Output results as JSON (for integration with other tools)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
exit_code = run_check(
|
||||
heartbeat_dir=args.heartbeat_dir,
|
||||
dry_run=args.dry_run,
|
||||
output_json=args.output_json,
|
||||
)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
449
bin/check_cron_heartbeats.py
Normal file
449
bin/check_cron_heartbeats.py
Normal file
@@ -0,0 +1,449 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Meta-heartbeat checker — makes silent cron failures impossible.
|
||||
|
||||
Reads every ``*.last`` file in the heartbeat directory and verifies that no
|
||||
job has been silent for longer than **2× its declared interval**. If any job
|
||||
is stale, a Gitea alert issue is created (or an existing one is updated).
|
||||
When all jobs recover, the issue is closed automatically.
|
||||
|
||||
This script itself should be run as a cron job every 15 minutes so the
|
||||
meta-level is also covered:
|
||||
|
||||
*/15 * * * * cd /path/to/the-nexus && \\
|
||||
python bin/check_cron_heartbeats.py >> /var/log/bezalel/heartbeat-check.log 2>&1
|
||||
|
||||
USAGE
|
||||
-----
|
||||
# Check all jobs; create/update Gitea alert if any stale:
|
||||
python bin/check_cron_heartbeats.py
|
||||
|
||||
# Dry-run (no Gitea writes):
|
||||
python bin/check_cron_heartbeats.py --dry-run
|
||||
|
||||
# Output Night Watch heartbeat panel markdown:
|
||||
python bin/check_cron_heartbeats.py --panel
|
||||
|
||||
# Output JSON (for integration with other tools):
|
||||
python bin/check_cron_heartbeats.py --json
|
||||
|
||||
# Use a custom heartbeat directory:
|
||||
python bin/check_cron_heartbeats.py --dir /tmp/test-heartbeats
|
||||
|
||||
HEARTBEAT DIRECTORY
|
||||
-------------------
|
||||
Primary: /var/run/bezalel/heartbeats/ (set by ops, writable by cron user)
|
||||
Fallback: ~/.bezalel/heartbeats/ (dev machines)
|
||||
Override: BEZALEL_HEARTBEAT_DIR env var
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
-----------------
|
||||
Pure stdlib. No pip installs required.
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.heartbeat_checker")
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────
|
||||
|
||||
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
|
||||
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_REPO = os.environ.get("NEXUS_REPO", "Timmy_Foundation/the-nexus")
|
||||
ALERT_TITLE_PREFIX = "[heartbeat-checker]"
|
||||
|
||||
# A job is stale when its age exceeds this multiple of its declared interval
|
||||
STALE_RATIO = 2.0
|
||||
# Never flag a job as stale if it completed less than this many seconds ago
|
||||
# (prevents noise immediately after deployment)
|
||||
MIN_STALE_AGE = 60
|
||||
|
||||
|
||||
def _resolve_heartbeat_dir() -> Path:
|
||||
"""Return the active heartbeat directory."""
|
||||
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
|
||||
if env:
|
||||
return Path(env)
|
||||
if PRIMARY_HEARTBEAT_DIR.exists():
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
# Try to create it; fall back to home dir if not permitted
|
||||
try:
|
||||
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
|
||||
probe.touch()
|
||||
probe.unlink()
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
except (PermissionError, OSError):
|
||||
return FALLBACK_HEARTBEAT_DIR
|
||||
|
||||
|
||||
# ── Data model ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class JobStatus:
|
||||
"""Health status for a single cron job's heartbeat."""
|
||||
job: str
|
||||
path: Path
|
||||
healthy: bool
|
||||
age_seconds: float # -1 if unknown (missing/corrupt)
|
||||
interval_seconds: int # 0 if unknown
|
||||
staleness_ratio: float # age / interval; -1 if unknown; >STALE_RATIO = stale
|
||||
last_timestamp: Optional[float]
|
||||
pid: Optional[int]
|
||||
raw_status: str # value from the .last file: "ok" / "warn" / "error"
|
||||
message: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeartbeatReport:
|
||||
"""Aggregate report for all cron job heartbeats in a directory."""
|
||||
timestamp: float
|
||||
heartbeat_dir: Path
|
||||
jobs: List[JobStatus] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def stale_jobs(self) -> List[JobStatus]:
|
||||
return [j for j in self.jobs if not j.healthy]
|
||||
|
||||
@property
|
||||
def overall_healthy(self) -> bool:
|
||||
return len(self.stale_jobs) == 0
|
||||
|
||||
# ── Rendering ─────────────────────────────────────────────────────
|
||||
|
||||
def to_panel_markdown(self) -> str:
|
||||
"""Night Watch heartbeat panel — a table of all jobs with their status."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime(self.timestamp))
|
||||
overall = "OK" if self.overall_healthy else "ALERT"
|
||||
|
||||
lines = [
|
||||
f"## Heartbeat Panel — {ts}",
|
||||
"",
|
||||
f"**Overall:** {overall}",
|
||||
"",
|
||||
"| Job | Status | Age | Interval | Ratio |",
|
||||
"|-----|--------|-----|----------|-------|",
|
||||
]
|
||||
|
||||
if not self.jobs:
|
||||
lines.append("| *(no heartbeat files found)* | — | — | — | — |")
|
||||
else:
|
||||
for j in self.jobs:
|
||||
icon = "OK" if j.healthy else "STALE"
|
||||
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
|
||||
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
|
||||
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
|
||||
lines.append(
|
||||
f"| `{j.job}` | {icon} | {age_str} | {interval_str} | {ratio_str} |"
|
||||
)
|
||||
|
||||
if self.stale_jobs:
|
||||
lines += ["", "**Stale jobs:**"]
|
||||
for j in self.stale_jobs:
|
||||
lines.append(f"- `{j.job}`: {j.message}")
|
||||
|
||||
lines += [
|
||||
"",
|
||||
f"*Heartbeat dir: `{self.heartbeat_dir}`*",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_alert_body(self) -> str:
|
||||
"""Gitea issue body when stale jobs are detected."""
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(self.timestamp))
|
||||
stale = self.stale_jobs
|
||||
|
||||
lines = [
|
||||
f"## Cron Heartbeat Alert — {ts}",
|
||||
"",
|
||||
f"**{len(stale)} job(s) have gone silent** (stale > {STALE_RATIO}x interval).",
|
||||
"",
|
||||
"| Job | Age | Interval | Ratio | Detail |",
|
||||
"|-----|-----|----------|-------|--------|",
|
||||
]
|
||||
|
||||
for j in stale:
|
||||
age_str = _fmt_duration(j.age_seconds) if j.age_seconds >= 0 else "N/A"
|
||||
interval_str = _fmt_duration(j.interval_seconds) if j.interval_seconds > 0 else "N/A"
|
||||
ratio_str = f"{j.staleness_ratio:.1f}x" if j.staleness_ratio >= 0 else "N/A"
|
||||
lines.append(
|
||||
f"| `{j.job}` | {age_str} | {interval_str} | {ratio_str} | {j.message} |"
|
||||
)
|
||||
|
||||
lines += [
|
||||
"",
|
||||
"### What to do",
|
||||
"1. `crontab -l` — confirm the job is still scheduled",
|
||||
"2. Check the job's log for errors",
|
||||
"3. Restart the job if needed",
|
||||
"4. Close this issue once fresh heartbeats appear",
|
||||
"",
|
||||
f"*Generated by `check_cron_heartbeats.py` — dir: `{self.heartbeat_dir}`*",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"healthy": self.overall_healthy,
|
||||
"timestamp": self.timestamp,
|
||||
"heartbeat_dir": str(self.heartbeat_dir),
|
||||
"jobs": [
|
||||
{
|
||||
"job": j.job,
|
||||
"healthy": j.healthy,
|
||||
"age_seconds": j.age_seconds,
|
||||
"interval_seconds": j.interval_seconds,
|
||||
"staleness_ratio": j.staleness_ratio,
|
||||
"raw_status": j.raw_status,
|
||||
"message": j.message,
|
||||
}
|
||||
for j in self.jobs
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _fmt_duration(seconds: float) -> str:
|
||||
"""Format a duration in seconds as a human-readable string."""
|
||||
s = int(seconds)
|
||||
if s < 60:
|
||||
return f"{s}s"
|
||||
if s < 3600:
|
||||
return f"{s // 60}m {s % 60}s"
|
||||
return f"{s // 3600}h {(s % 3600) // 60}m"
|
||||
|
||||
|
||||
# ── Job scanning ──────────────────────────────────────────────────────
|
||||
|
||||
def scan_heartbeats(directory: Path) -> List[JobStatus]:
|
||||
"""Read every ``*.last`` file in *directory* and return their statuses."""
|
||||
if not directory.exists():
|
||||
return []
|
||||
return [_read_job_status(p.stem, p) for p in sorted(directory.glob("*.last"))]
|
||||
|
||||
|
||||
def _read_job_status(job: str, path: Path) -> JobStatus:
|
||||
"""Parse one ``.last`` file and produce a ``JobStatus``."""
|
||||
now = time.time()
|
||||
|
||||
if not path.exists():
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=False,
|
||||
age_seconds=-1,
|
||||
interval_seconds=0,
|
||||
staleness_ratio=-1,
|
||||
last_timestamp=None,
|
||||
pid=None,
|
||||
raw_status="missing",
|
||||
message=f"Heartbeat file missing: {path}",
|
||||
)
|
||||
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
except (json.JSONDecodeError, OSError) as exc:
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=False,
|
||||
age_seconds=-1,
|
||||
interval_seconds=0,
|
||||
staleness_ratio=-1,
|
||||
last_timestamp=None,
|
||||
pid=None,
|
||||
raw_status="corrupt",
|
||||
message=f"Corrupt heartbeat: {exc}",
|
||||
)
|
||||
|
||||
timestamp = float(data.get("timestamp", 0))
|
||||
interval = int(data.get("interval_seconds", 0))
|
||||
pid = data.get("pid")
|
||||
raw_status = data.get("status", "ok")
|
||||
|
||||
age = now - timestamp
|
||||
ratio = age / interval if interval > 0 else float("inf")
|
||||
stale = ratio > STALE_RATIO and age > MIN_STALE_AGE
|
||||
|
||||
if stale:
|
||||
message = (
|
||||
f"Silent for {_fmt_duration(age)} "
|
||||
f"({ratio:.1f}x interval of {_fmt_duration(interval)})"
|
||||
)
|
||||
else:
|
||||
message = f"Last beat {_fmt_duration(age)} ago (ratio {ratio:.1f}x)"
|
||||
|
||||
return JobStatus(
|
||||
job=job, path=path,
|
||||
healthy=not stale,
|
||||
age_seconds=age,
|
||||
interval_seconds=interval,
|
||||
staleness_ratio=ratio,
|
||||
last_timestamp=timestamp,
|
||||
pid=pid,
|
||||
raw_status=raw_status if not stale else "stale",
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
# ── Gitea alerting ────────────────────────────────────────────────────
|
||||
|
||||
def _gitea_request(method: str, path: str, data: Optional[dict] = None) -> Any:
|
||||
"""Make a Gitea API request; return parsed JSON or None on error."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
url = f"{GITEA_URL.rstrip('/')}/api/v1{path}"
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, method=method)
|
||||
if GITEA_TOKEN:
|
||||
req.add_header("Authorization", f"token {GITEA_TOKEN}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Accept", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
raw = resp.read().decode()
|
||||
return json.loads(raw) if raw.strip() else {}
|
||||
except urllib.error.HTTPError as exc:
|
||||
logger.warning("Gitea %d: %s", exc.code, exc.read().decode()[:200])
|
||||
return None
|
||||
except Exception as exc:
|
||||
logger.warning("Gitea request failed: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _find_open_alert_issue() -> Optional[dict]:
|
||||
issues = _gitea_request(
|
||||
"GET",
|
||||
f"/repos/{GITEA_REPO}/issues?state=open&type=issues&limit=20",
|
||||
)
|
||||
if not isinstance(issues, list):
|
||||
return None
|
||||
for issue in issues:
|
||||
if issue.get("title", "").startswith(ALERT_TITLE_PREFIX):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def alert_on_stale(report: HeartbeatReport, dry_run: bool = False) -> None:
|
||||
"""Create, update, or close a Gitea alert issue based on report health."""
|
||||
if dry_run:
|
||||
action = "close" if report.overall_healthy else "create/update"
|
||||
logger.info("DRY RUN — would %s Gitea issue", action)
|
||||
return
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
logger.warning("GITEA_TOKEN not set — skipping Gitea alert")
|
||||
return
|
||||
|
||||
existing = _find_open_alert_issue()
|
||||
|
||||
if report.overall_healthy:
|
||||
if existing:
|
||||
logger.info("All heartbeats healthy — closing issue #%d", existing["number"])
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
|
||||
data={"body": "All cron heartbeats are now fresh. Closing."},
|
||||
)
|
||||
_gitea_request(
|
||||
"PATCH",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}",
|
||||
data={"state": "closed"},
|
||||
)
|
||||
return
|
||||
|
||||
stale_names = ", ".join(j.job for j in report.stale_jobs)
|
||||
title = f"{ALERT_TITLE_PREFIX} Stale cron heartbeats: {stale_names}"
|
||||
body = report.to_alert_body()
|
||||
|
||||
if existing:
|
||||
logger.info("Still stale — updating issue #%d", existing["number"])
|
||||
_gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues/{existing['number']}/comments",
|
||||
data={"body": body},
|
||||
)
|
||||
else:
|
||||
result = _gitea_request(
|
||||
"POST",
|
||||
f"/repos/{GITEA_REPO}/issues",
|
||||
data={"title": title, "body": body, "assignees": ["Timmy"]},
|
||||
)
|
||||
if result and result.get("number"):
|
||||
logger.info("Created alert issue #%d", result["number"])
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────
|
||||
|
||||
def build_report(directory: Optional[Path] = None) -> HeartbeatReport:
|
||||
"""Scan heartbeats and return a report. Exposed for Night Watch import."""
|
||||
hb_dir = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
jobs = scan_heartbeats(hb_dir)
|
||||
return HeartbeatReport(timestamp=time.time(), heartbeat_dir=hb_dir, jobs=jobs)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Meta-heartbeat checker — detects silent cron failures",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dir", default=None,
|
||||
help="Heartbeat directory (default: auto-detect)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--panel", action="store_true",
|
||||
help="Output Night Watch heartbeat panel markdown and exit",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json", action="store_true", dest="output_json",
|
||||
help="Output results as JSON and exit",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Log results without writing Gitea issues",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
report = build_report(Path(args.dir) if args.dir else None)
|
||||
|
||||
if args.panel:
|
||||
print(report.to_panel_markdown())
|
||||
return
|
||||
|
||||
if args.output_json:
|
||||
print(json.dumps(report.to_json(), indent=2))
|
||||
sys.exit(0 if report.overall_healthy else 1)
|
||||
|
||||
# Default: log + alert
|
||||
if not report.jobs:
|
||||
logger.info("No heartbeat files found in %s", report.heartbeat_dir)
|
||||
else:
|
||||
for j in report.jobs:
|
||||
level = logging.INFO if j.healthy else logging.ERROR
|
||||
icon = "OK " if j.healthy else "STALE"
|
||||
logger.log(level, "[%s] %s: %s", icon, j.job, j.message)
|
||||
|
||||
alert_on_stale(report, dry_run=args.dry_run)
|
||||
sys.exit(0 if report.overall_healthy else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -80,6 +80,15 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# Poka-yoke: write a cron heartbeat so check_cron_heartbeats.py can detect
|
||||
# if *this* watchdog stops running. Import lazily to stay zero-dep if the
|
||||
# nexus package is unavailable (e.g. very minimal test environments).
|
||||
try:
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat as _write_cron_heartbeat
|
||||
_HAS_CRON_HEARTBEAT = True
|
||||
except ImportError:
|
||||
_HAS_CRON_HEARTBEAT = False
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
@@ -488,6 +497,15 @@ def run_once(args: argparse.Namespace) -> bool:
|
||||
elif not args.dry_run:
|
||||
alert_on_failure(report, dry_run=args.dry_run)
|
||||
|
||||
# Poka-yoke: stamp our own heartbeat so the meta-checker can detect
|
||||
# if this watchdog cron job itself goes silent. Runs every 5 minutes
|
||||
# by convention (*/5 * * * *).
|
||||
if _HAS_CRON_HEARTBEAT:
|
||||
try:
|
||||
_write_cron_heartbeat("nexus_watchdog", interval_seconds=300)
|
||||
except Exception:
|
||||
pass # never crash the watchdog over its own heartbeat
|
||||
|
||||
return report.overall_healthy
|
||||
|
||||
|
||||
|
||||
247
bin/night_watch.py
Normal file
247
bin/night_watch.py
Normal file
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Night Watch — Bezalel nightly report generator.
|
||||
|
||||
Runs once per night (typically at 03:00 local time via cron) and writes a
|
||||
markdown report to ``reports/bezalel/nightly/<YYYY-MM-DD>.md``.
|
||||
|
||||
The report always includes a **Heartbeat Panel** (acceptance criterion #3 of
|
||||
issue #1096) so silent cron failures are visible in the morning brief.
|
||||
|
||||
USAGE
|
||||
-----
|
||||
python bin/night_watch.py # write today's report
|
||||
python bin/night_watch.py --dry-run # print to stdout, don't write file
|
||||
python bin/night_watch.py --date 2026-04-08 # specific date
|
||||
|
||||
CRONTAB
|
||||
-------
|
||||
0 3 * * * cd /path/to/the-nexus && python bin/night_watch.py \\
|
||||
>> /var/log/bezalel/night-watch.log 2>&1
|
||||
|
||||
ZERO DEPENDENCIES
|
||||
-----------------
|
||||
Pure stdlib, plus ``check_cron_heartbeats`` from this repo (also stdlib).
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import importlib.util
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger("bezalel.night_watch")
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
REPORTS_DIR = PROJECT_ROOT / "reports" / "bezalel" / "nightly"
|
||||
|
||||
# ── Load check_cron_heartbeats without relying on sys.path hacks ──────
|
||||
|
||||
def _load_checker():
|
||||
"""Import bin/check_cron_heartbeats.py as a module."""
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_check_cron_heartbeats",
|
||||
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
# ── System checks ─────────────────────────────────────────────────────
|
||||
|
||||
def _check_service(service_name: str) -> tuple[str, str]:
|
||||
"""Return (status, detail) for a systemd service."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["systemctl", "is-active", service_name],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
active = result.stdout.strip()
|
||||
if active == "active":
|
||||
return "OK", f"{service_name} is active"
|
||||
return "WARN", f"{service_name} is {active}"
|
||||
except FileNotFoundError:
|
||||
return "OK", f"{service_name} status unknown (systemctl not available)"
|
||||
except Exception as exc:
|
||||
return "WARN", f"systemctl error: {exc}"
|
||||
|
||||
|
||||
def _check_disk(threshold_pct: int = 90) -> tuple[str, str]:
|
||||
"""Return (status, detail) for disk usage on /."""
|
||||
try:
|
||||
usage = shutil.disk_usage("/")
|
||||
pct = int(usage.used / usage.total * 100)
|
||||
status = "OK" if pct < threshold_pct else "WARN"
|
||||
return status, f"disk usage {pct}%"
|
||||
except Exception as exc:
|
||||
return "WARN", f"disk check failed: {exc}"
|
||||
|
||||
|
||||
def _check_memory(threshold_pct: int = 90) -> tuple[str, str]:
|
||||
"""Return (status, detail) for memory usage."""
|
||||
try:
|
||||
meminfo = Path("/proc/meminfo").read_text()
|
||||
data = {}
|
||||
for line in meminfo.splitlines():
|
||||
parts = line.split()
|
||||
if len(parts) >= 2:
|
||||
data[parts[0].rstrip(":")] = int(parts[1])
|
||||
total = data.get("MemTotal", 0)
|
||||
available = data.get("MemAvailable", 0)
|
||||
if total == 0:
|
||||
return "OK", "memory info unavailable"
|
||||
pct = int((total - available) / total * 100)
|
||||
status = "OK" if pct < threshold_pct else "WARN"
|
||||
return status, f"memory usage {pct}%"
|
||||
except FileNotFoundError:
|
||||
# Not Linux (e.g. macOS dev machine)
|
||||
return "OK", "memory check skipped (not Linux)"
|
||||
except Exception as exc:
|
||||
return "WARN", f"memory check failed: {exc}"
|
||||
|
||||
|
||||
def _check_gitea_reachability(gitea_url: str = "https://forge.alexanderwhitestone.com") -> tuple[str, str]:
|
||||
"""Return (status, detail) for Gitea HTTPS reachability."""
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
try:
|
||||
with urllib.request.urlopen(gitea_url, timeout=10) as resp:
|
||||
code = resp.status
|
||||
if code == 200:
|
||||
return "OK", f"Alpha SSH not configured from Beta, but Gitea HTTPS is responding ({code})"
|
||||
return "WARN", f"Gitea returned HTTP {code}"
|
||||
except Exception as exc:
|
||||
return "WARN", f"Gitea unreachable: {exc}"
|
||||
|
||||
|
||||
def _check_world_readable_secrets() -> tuple[str, str]:
|
||||
"""Return (status, detail) for world-readable sensitive files."""
|
||||
sensitive_patterns = ["*.key", "*.pem", "*.secret", ".env", "*.token"]
|
||||
found = []
|
||||
try:
|
||||
for pattern in sensitive_patterns:
|
||||
for path in PROJECT_ROOT.rglob(pattern):
|
||||
try:
|
||||
mode = path.stat().st_mode
|
||||
if mode & 0o004: # world-readable
|
||||
found.append(str(path.relative_to(PROJECT_ROOT)))
|
||||
except OSError:
|
||||
pass
|
||||
if found:
|
||||
return "WARN", f"world-readable sensitive files: {', '.join(found[:3])}"
|
||||
return "OK", "no sensitive recently-modified world-readable files found"
|
||||
except Exception as exc:
|
||||
return "WARN", f"security check failed: {exc}"
|
||||
|
||||
|
||||
# ── Report generation ─────────────────────────────────────────────────
|
||||
|
||||
def generate_report(date_str: str, checker_mod) -> str:
|
||||
"""Build the full nightly report markdown string."""
|
||||
now_utc = datetime.now(timezone.utc)
|
||||
ts = now_utc.strftime("%Y-%m-%d %02H:%M UTC")
|
||||
|
||||
rows: list[tuple[str, str, str]] = []
|
||||
|
||||
service_status, service_detail = _check_service("hermes-bezalel")
|
||||
rows.append(("Service", service_status, service_detail))
|
||||
|
||||
disk_status, disk_detail = _check_disk()
|
||||
rows.append(("Disk", disk_status, disk_detail))
|
||||
|
||||
mem_status, mem_detail = _check_memory()
|
||||
rows.append(("Memory", mem_status, mem_detail))
|
||||
|
||||
gitea_status, gitea_detail = _check_gitea_reachability()
|
||||
rows.append(("Alpha VPS", gitea_status, gitea_detail))
|
||||
|
||||
sec_status, sec_detail = _check_world_readable_secrets()
|
||||
rows.append(("Security", sec_status, sec_detail))
|
||||
|
||||
overall = "OK" if all(r[1] == "OK" for r in rows) else "WARN"
|
||||
|
||||
lines = [
|
||||
f"# Bezalel Night Watch — {ts}",
|
||||
"",
|
||||
f"**Overall:** {overall}",
|
||||
"",
|
||||
"| Check | Status | Detail |",
|
||||
"|-------|--------|--------|",
|
||||
]
|
||||
for check, status, detail in rows:
|
||||
lines.append(f"| {check} | {status} | {detail} |")
|
||||
|
||||
lines.append("")
|
||||
lines.append("---")
|
||||
lines.append("")
|
||||
|
||||
# ── Heartbeat Panel (acceptance criterion #1096) ──────────────────
|
||||
try:
|
||||
hb_report = checker_mod.build_report()
|
||||
lines.append(hb_report.to_panel_markdown())
|
||||
except Exception as exc:
|
||||
lines += [
|
||||
"## Heartbeat Panel",
|
||||
"",
|
||||
f"*(heartbeat check failed: {exc})*",
|
||||
]
|
||||
|
||||
lines += [
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
"*Automated by Bezalel Night Watch*",
|
||||
"",
|
||||
]
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Bezalel Night Watch — nightly report generator",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--date", default=None,
|
||||
help="Report date as YYYY-MM-DD (default: today UTC)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true",
|
||||
help="Print report to stdout instead of writing to disk",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
date_str = args.date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
checker = _load_checker()
|
||||
report_text = generate_report(date_str, checker)
|
||||
|
||||
if args.dry_run:
|
||||
print(report_text)
|
||||
return
|
||||
|
||||
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
report_path = REPORTS_DIR / f"{date_str}.md"
|
||||
report_path.write_text(report_text)
|
||||
logger.info("Night Watch report written to %s", report_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
168
docs/QUARANTINE_PROCESS.md
Normal file
168
docs/QUARANTINE_PROCESS.md
Normal file
@@ -0,0 +1,168 @@
|
||||
# Quarantine Process
|
||||
|
||||
**Poka-yoke principle:** a flaky or broken test must never silently rot in
|
||||
place. Quarantine is the correction step in the
|
||||
Prevention → Detection → Correction triad described in issue #1094.
|
||||
|
||||
---
|
||||
|
||||
## When to quarantine
|
||||
|
||||
Quarantine a test when **any** of the following are true:
|
||||
|
||||
| Signal | Source |
|
||||
|--------|--------|
|
||||
| `flake_detector.py` flags the test at < 95 % consistency | Automated |
|
||||
| The test fails intermittently in CI over two consecutive runs | Manual observation |
|
||||
| The test depends on infrastructure that is temporarily unavailable | Manual observation |
|
||||
| You are fixing a bug and need to defer a related test | Developer judgement |
|
||||
|
||||
Do **not** use quarantine as a way to ignore tests indefinitely. The
|
||||
quarantine directory is a **30-day time-box** — see the escalation rule below.
|
||||
|
||||
---
|
||||
|
||||
## Step-by-step workflow
|
||||
|
||||
### 1 File an issue
|
||||
|
||||
Open a Gitea issue with the title prefix `[FLAKY]` or `[BROKEN]`:
|
||||
|
||||
```
|
||||
[FLAKY] test_foo_bar non-deterministically fails with assertion error
|
||||
```
|
||||
|
||||
Note the issue number — you will need it in the next step.
|
||||
|
||||
### 2 Move the test file
|
||||
|
||||
Move (or copy) the test from `tests/` into `tests/quarantine/`.
|
||||
|
||||
```bash
|
||||
git mv tests/test_my_thing.py tests/quarantine/test_my_thing.py
|
||||
```
|
||||
|
||||
If only individual test functions are flaky, extract them into a new file in
|
||||
`tests/quarantine/` rather than moving the whole module.
|
||||
|
||||
### 3 Annotate the test
|
||||
|
||||
Add the `@pytest.mark.quarantine` marker with the issue reference:
|
||||
|
||||
```python
|
||||
import pytest
|
||||
|
||||
@pytest.mark.quarantine(reason="Flaky until #NNN is resolved")
|
||||
def test_my_thing():
|
||||
...
|
||||
```
|
||||
|
||||
This satisfies the poka-yoke skip-enforcement rule: the test is allowed to
|
||||
skip/be excluded because it is explicitly linked to a tracking issue.
|
||||
|
||||
### 4 Verify CI still passes
|
||||
|
||||
```bash
|
||||
pytest # default run — quarantine tests are excluded
|
||||
pytest --run-quarantine # optional: run quarantined tests explicitly
|
||||
```
|
||||
|
||||
The main CI run must be green before merging.
|
||||
|
||||
### 5 Add to `.test-history.json` exclusions (optional)
|
||||
|
||||
If the flake detector is tracking the test, add it to the `quarantine_list` in
|
||||
`.test-history.json` so it is excluded from the consistency report:
|
||||
|
||||
```json
|
||||
{
|
||||
"quarantine_list": [
|
||||
"tests/quarantine/test_my_thing.py::test_my_thing"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Escalation rule
|
||||
|
||||
If a quarantined test's tracking issue has had **no activity for 30 days**,
|
||||
the next developer to touch that file must:
|
||||
|
||||
1. Attempt to fix and un-quarantine the test, **or**
|
||||
2. Delete the test and close the issue with a comment explaining why, **or**
|
||||
3. Leave a comment on the issue explaining the blocker and reset the 30-day
|
||||
clock explicitly.
|
||||
|
||||
**A test may not stay in quarantine indefinitely without active attention.**
|
||||
|
||||
---
|
||||
|
||||
## Un-quarantining a test
|
||||
|
||||
When the underlying issue is resolved:
|
||||
|
||||
1. Remove `@pytest.mark.quarantine` from the test.
|
||||
2. Move the file back from `tests/quarantine/` to `tests/`.
|
||||
3. Run the full suite to confirm it passes consistently (at least 3 local runs).
|
||||
4. Close the tracking issue.
|
||||
5. Remove any entries from `.test-history.json`'s `quarantine_list`.
|
||||
|
||||
---
|
||||
|
||||
## Flake detector integration
|
||||
|
||||
The flake detector (`scripts/flake_detector.py`) is run after every CI test
|
||||
execution. It reads `.test-report.json` (produced by `pytest --json-report`)
|
||||
and updates `.test-history.json`.
|
||||
|
||||
**CI integration example (shell script or CI step):**
|
||||
|
||||
```bash
|
||||
pytest --json-report --json-report-file=.test-report.json
|
||||
python scripts/flake_detector.py
|
||||
```
|
||||
|
||||
If the flake detector exits non-zero, the CI step fails and the output lists
|
||||
the offending tests with their consistency percentages.
|
||||
|
||||
**Local usage:**
|
||||
|
||||
```bash
|
||||
# After running tests with JSON report:
|
||||
python scripts/flake_detector.py
|
||||
|
||||
# Just view current statistics without ingesting a new report:
|
||||
python scripts/flake_detector.py --no-update
|
||||
|
||||
# Lower threshold for local dev:
|
||||
python scripts/flake_detector.py --threshold 0.90
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
```
|
||||
Test fails intermittently
|
||||
│
|
||||
▼
|
||||
File [FLAKY] issue
|
||||
│
|
||||
▼
|
||||
git mv test → tests/quarantine/
|
||||
│
|
||||
▼
|
||||
Add @pytest.mark.quarantine(reason="#NNN")
|
||||
│
|
||||
▼
|
||||
Main CI green ✓
|
||||
│
|
||||
▼
|
||||
Fix the root cause (within 30 days)
|
||||
│
|
||||
▼
|
||||
git mv back → tests/
|
||||
Remove quarantine marker
|
||||
Close issue ✓
|
||||
```
|
||||
246
docs/bezalel/evennia/cmd_palace.py
Normal file
246
docs/bezalel/evennia/cmd_palace.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""
|
||||
Palace commands — bridge Evennia to the local MemPalace memory system.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object, search_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
def _search_mempalace(query, wing=None, room=None, n=5, fleet=False):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _get_wing(caller):
|
||||
"""Return the caller's wing, defaulting to their key or 'general'."""
|
||||
return caller.db.wing if caller.attributes.has("wing") else (caller.key.lower() if caller.key else "general")
|
||||
|
||||
|
||||
class CmdPalaceSearch(Command):
|
||||
"""
|
||||
Search your memory palace.
|
||||
|
||||
Usage:
|
||||
palace/search <query>
|
||||
palace/search <query> [--room <room>]
|
||||
palace/recall <topic>
|
||||
palace/file <name> = <content>
|
||||
palace/status
|
||||
"""
|
||||
|
||||
key = "palace"
|
||||
aliases = ["pal"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: palace/search <query> | palace/recall <topic> | palace/file <name> = <content> | palace/status")
|
||||
return
|
||||
|
||||
parts = self.args.strip().split(" ", 1)
|
||||
subcmd = parts[0].lower()
|
||||
rest = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
if subcmd == "search":
|
||||
self._do_search(rest)
|
||||
elif subcmd == "recall":
|
||||
self._do_recall(rest)
|
||||
elif subcmd == "file":
|
||||
self._do_file(rest)
|
||||
elif subcmd == "status":
|
||||
self._do_status()
|
||||
else:
|
||||
self._do_search(self.args.strip())
|
||||
|
||||
def _do_search(self, query):
|
||||
if not query:
|
||||
self.caller.msg("Search for what?")
|
||||
return
|
||||
self.caller.msg(f"Searching the palace for: |c{query}|n...")
|
||||
wing = _get_wing(self.caller)
|
||||
results = _search_mempalace(query, wing=wing)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
lines.append(f"\n|g[{i}]|n |c{room}|n — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
def _do_recall(self, topic):
|
||||
if not topic:
|
||||
self.caller.msg("Recall what topic?")
|
||||
return
|
||||
results = _search_mempalace(topic, wing=_get_wing(self.caller), n=1)
|
||||
if not results:
|
||||
self.caller.msg("Nothing to recall.")
|
||||
return
|
||||
|
||||
r = results[0]
|
||||
content = r.get("content", "")
|
||||
source = r.get("source", "unknown")
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{topic}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = source
|
||||
obj.db.room_name = r.get("room", "general")
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() conjure() a memory shard from the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_file(self, rest):
|
||||
if "=" not in rest:
|
||||
self.caller.msg("Usage: palace/file <name> = <content>")
|
||||
return
|
||||
name, content = rest.split("=", 1)
|
||||
name = name.strip()
|
||||
content = content.strip()
|
||||
if not name or not content:
|
||||
self.caller.msg("Both name and content are required.")
|
||||
return
|
||||
|
||||
from typeclasses.memory_object import MemoryObject
|
||||
obj = create_object(
|
||||
MemoryObject,
|
||||
key=f"memory:{name}",
|
||||
location=self.caller.location,
|
||||
)
|
||||
obj.db.memory_content = content
|
||||
obj.db.source_file = f"filed by {self.caller.key}"
|
||||
obj.db.room_name = self.caller.location.key if self.caller.location else "general"
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() file() a new memory in the palace: |m{obj.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
|
||||
def _do_status(self):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/mempalace",
|
||||
"--palace", "/root/wizards/bezalel/.mempalace/palace",
|
||||
"status"
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
self.caller.msg(result.stdout or result.stderr)
|
||||
except Exception as e:
|
||||
self.caller.msg(f"Could not reach the palace: {e}")
|
||||
|
||||
|
||||
class CmdRecall(Command):
|
||||
"""
|
||||
Recall a memory from the palace.
|
||||
|
||||
Usage:
|
||||
recall <query>
|
||||
recall <query> --fleet
|
||||
recall <query> --room <room>
|
||||
"""
|
||||
|
||||
key = "recall"
|
||||
aliases = ["remember", "mem"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Recall what? Usage: recall <query> [--fleet] [--room <room>]")
|
||||
return
|
||||
|
||||
args = self.args.strip()
|
||||
fleet = "--fleet" in args
|
||||
room = None
|
||||
|
||||
if "--room" in args:
|
||||
parts = args.split("--room")
|
||||
args = parts[0].strip()
|
||||
room = parts[1].strip().split()[0] if len(parts) > 1 else None
|
||||
|
||||
if "--fleet" in args:
|
||||
args = args.replace("--fleet", "").strip()
|
||||
|
||||
self.caller.msg(f"Recalling from the {'fleet' if fleet else 'personal'} palace: |c{args}|n...")
|
||||
|
||||
wing = None if fleet else _get_wing(self.caller)
|
||||
results = _search_mempalace(args, wing=wing, room=room, n=5, fleet=fleet)
|
||||
if not results:
|
||||
self.caller.msg("The palace is silent on that matter.")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for i, r in enumerate(results[:5], 1):
|
||||
room_name = r.get("room", "unknown")
|
||||
source = r.get("source", "unknown")
|
||||
content = r.get("content", "")[:400]
|
||||
wing_label = r.get("wing", "unknown")
|
||||
wing_tag = f" |y[{wing_label}]|n" if fleet else ""
|
||||
lines.append(f"\n|g[{i}]|n |c{room_name}|n{wing_tag} — |x{source}|n")
|
||||
lines.append(f"{content}\n")
|
||||
self.caller.msg("\n".join(lines))
|
||||
|
||||
|
||||
class CmdEnterRoom(Command):
|
||||
"""
|
||||
Enter a room in the mind palace by topic.
|
||||
|
||||
Usage:
|
||||
enter room <topic>
|
||||
"""
|
||||
|
||||
key = "enter room"
|
||||
aliases = ["enter palace", "go room"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Enter which room? Usage: enter room <topic>")
|
||||
return
|
||||
|
||||
topic = self.args.strip().lower().replace(" ", "-")
|
||||
wing = _get_wing(self.caller)
|
||||
room_key = f"palace:{wing}:{topic}"
|
||||
|
||||
# Search for existing room
|
||||
rooms = search_object(room_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if rooms:
|
||||
room = rooms[0]
|
||||
else:
|
||||
# Create the room dynamically
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
room = create_object(
|
||||
PalaceRoom,
|
||||
key=room_key,
|
||||
)
|
||||
room.db.memory_topic = topic
|
||||
room.db.wing = wing
|
||||
room.update_description()
|
||||
|
||||
self.caller.move_to(room, move_type="teleport")
|
||||
self.caller.msg(f"You step into the |c{topic}|n room of your mind palace.")
|
||||
166
docs/bezalel/evennia/cmd_record.py
Normal file
166
docs/bezalel/evennia/cmd_record.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""
|
||||
Live memory commands — write new memories into the palace from Evennia.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.commands.command import Command
|
||||
from evennia import create_object
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
ADDER_SCRIPT = "/root/wizards/bezalel/evennia/palace_add.py"
|
||||
|
||||
|
||||
def _add_drawer(content, wing, room, source):
|
||||
"""Add a verbatim drawer to the palace via the helper script."""
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
ADDER_SCRIPT,
|
||||
content,
|
||||
wing,
|
||||
room,
|
||||
source,
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
|
||||
return result.returncode == 0 and "OK" in result.stdout
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class CmdRecord(Command):
|
||||
"""
|
||||
Record a decision into the palace hall_facts.
|
||||
|
||||
Usage:
|
||||
record <text>
|
||||
record We decided to use PostgreSQL over MySQL.
|
||||
"""
|
||||
|
||||
key = "record"
|
||||
aliases = ["decide"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Record what decision? Usage: record <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"DECISION ({wing}): {text}\nRecorded by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() record() a decision in the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdNote(Command):
|
||||
"""
|
||||
Note a breakthrough into the palace hall_discoveries.
|
||||
|
||||
Usage:
|
||||
note <text>
|
||||
note The GraphQL schema can be auto-generated from our typeclasses.
|
||||
"""
|
||||
|
||||
key = "note"
|
||||
aliases = ["jot"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Note what? Usage: note <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"BREAKTHROUGH ({wing}): {text}\nNoted by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() inscribe() a breakthrough into the palace scrolls.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdEvent(Command):
|
||||
"""
|
||||
Log an event into the palace hall_events.
|
||||
|
||||
Usage:
|
||||
event <text>
|
||||
event Gitea runner came back online after being offline for 6 hours.
|
||||
"""
|
||||
|
||||
key = "event"
|
||||
aliases = ["log"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Log what event? Usage: event <text>")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
text = self.args.strip()
|
||||
full_text = f"EVENT ({wing}): {text}\nLogged by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, "general", f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() chronicle() an event in the palace records.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
|
||||
|
||||
class CmdPalaceWrite(Command):
|
||||
"""
|
||||
Directly write a memory into a specific palace room.
|
||||
|
||||
Usage:
|
||||
palace/write <room> = <text>
|
||||
"""
|
||||
|
||||
key = "palace/write"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
if "=" not in self.args:
|
||||
self.caller.msg("Usage: palace/write <room> = <text>")
|
||||
return
|
||||
|
||||
room, text = self.args.split("=", 1)
|
||||
room = room.strip()
|
||||
text = text.strip()
|
||||
|
||||
if not room or not text:
|
||||
self.caller.msg("Both room and text are required.")
|
||||
return
|
||||
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
full_text = f"MEMORY ({wing}/{room}): {text}\nWritten by {self.caller.key} via Evennia."
|
||||
|
||||
ok = _add_drawer(full_text, wing, room, f"evennia:{self.caller.key}")
|
||||
if ok:
|
||||
self.caller.location.msg_contents(
|
||||
f"$You() etch() a memory into the |c{room}|n room of the palace.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
self.caller.msg("The palace scribes could not write that down.")
|
||||
105
docs/bezalel/evennia/cmd_steward.py
Normal file
105
docs/bezalel/evennia/cmd_steward.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""
|
||||
Steward commands — ask a palace steward about memories.
|
||||
"""
|
||||
|
||||
from evennia.commands.command import Command
|
||||
from evennia import search_object
|
||||
|
||||
|
||||
class CmdAskSteward(Command):
|
||||
"""
|
||||
Ask a steward NPC about a topic from the palace memory.
|
||||
|
||||
Usage:
|
||||
ask <steward> about <topic>
|
||||
ask <steward> about <topic> --fleet
|
||||
|
||||
Example:
|
||||
ask bezalel-steward about nightly watch
|
||||
ask bezalel-steward about runner outage --fleet
|
||||
"""
|
||||
|
||||
key = "ask"
|
||||
aliases = ["question"]
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def parse(self):
|
||||
"""Parse 'ask <target> about <topic>' syntax."""
|
||||
raw = self.args.strip()
|
||||
fleet = "--fleet" in raw
|
||||
if fleet:
|
||||
raw = raw.replace("--fleet", "").strip()
|
||||
|
||||
if " about " in raw.lower():
|
||||
parts = raw.split(" about ", 1)
|
||||
self.target_name = parts[0].strip()
|
||||
self.topic = parts[1].strip()
|
||||
else:
|
||||
self.target_name = ""
|
||||
self.topic = raw
|
||||
self.fleet = fleet
|
||||
|
||||
def func(self):
|
||||
if not self.args.strip():
|
||||
self.caller.msg("Usage: ask <steward> about <topic> [--fleet]")
|
||||
return
|
||||
|
||||
self.parse()
|
||||
|
||||
if not self.target_name:
|
||||
self.caller.msg("Ask whom? Usage: ask <steward> about <topic>")
|
||||
return
|
||||
|
||||
# Find steward NPC in current room
|
||||
stewards = [
|
||||
obj for obj in self.caller.location.contents
|
||||
if hasattr(obj, "respond_to_question")
|
||||
and self.target_name.lower() in obj.key.lower()
|
||||
]
|
||||
|
||||
if not stewards:
|
||||
self.caller.msg(f"There is no steward here matching '{self.target_name}'.")
|
||||
return
|
||||
|
||||
steward = stewards[0]
|
||||
self.caller.msg(f"You ask |c{steward.key}|n about '{self.topic}'...")
|
||||
steward.respond_to_question(self.topic, self.caller, fleet=self.fleet)
|
||||
|
||||
|
||||
class CmdSummonSteward(Command):
|
||||
"""
|
||||
Summon your wing's steward NPC to your current location.
|
||||
|
||||
Usage:
|
||||
summon steward
|
||||
"""
|
||||
|
||||
key = "summon steward"
|
||||
locks = "cmd:all()"
|
||||
help_category = "Mind Palace"
|
||||
|
||||
def func(self):
|
||||
wing = self.caller.db.wing if self.caller.attributes.has("wing") else (self.caller.key.lower() if self.caller.key else "general")
|
||||
steward_key = f"{wing}-steward"
|
||||
|
||||
# Search for existing steward
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"A shimmer of light coalesces into |c{steward.key}|n.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
else:
|
||||
steward = StewardNPC.create(steward_key)[0]
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = self.caller.key
|
||||
steward.move_to(self.caller.location, move_type="teleport")
|
||||
self.caller.location.msg_contents(
|
||||
f"You call forth |c{steward.key}|n from the palace archives.",
|
||||
from_obj=self.caller,
|
||||
)
|
||||
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
83
docs/bezalel/evennia/hall_of_wings.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""
|
||||
Hall of Wings — Builds the central MemPalace zone in Evennia.
|
||||
|
||||
Usage (from Evennia shell or script):
|
||||
from world.hall_of_wings import build_hall_of_wings
|
||||
build_hall_of_wings()
|
||||
"""
|
||||
|
||||
from evennia import create_object
|
||||
from typeclasses.palace_room import PalaceRoom
|
||||
from typeclasses.steward_npc import StewardNPC
|
||||
from typeclasses.rooms import Room
|
||||
from typeclasses.exits import Exit
|
||||
|
||||
HALL_KEY = "hall_of_wings"
|
||||
HALL_NAME = "Hall of Wings"
|
||||
|
||||
DEFAULT_WINGS = [
|
||||
"bezalel",
|
||||
"timmy",
|
||||
"allegro",
|
||||
"ezra",
|
||||
]
|
||||
|
||||
|
||||
def build_hall_of_wings():
|
||||
"""Create or update the central Hall of Wings and attach steward chambers."""
|
||||
# Find or create the hall
|
||||
from evennia import search_object
|
||||
halls = search_object(HALL_KEY, typeclass="typeclasses.rooms.Room")
|
||||
if halls:
|
||||
hall = halls[0]
|
||||
else:
|
||||
hall = create_object(Room, key=HALL_KEY)
|
||||
hall.db.desc = (
|
||||
"|cThe Hall of Wings|n\n"
|
||||
"A vast circular chamber of pale stone and shifting starlight.\n"
|
||||
"Arched doorways line the perimeter, each leading to a steward's chamber.\n"
|
||||
"Here, the memories of the fleet converge.\n\n"
|
||||
"Use |wsummon steward|n to call your wing's steward, or\n"
|
||||
"|wask <steward> about <topic>|n to query the palace archives."
|
||||
)
|
||||
|
||||
for wing in DEFAULT_WINGS:
|
||||
chamber_key = f"chamber:{wing}"
|
||||
chambers = search_object(chamber_key, typeclass="typeclasses.palace_room.PalaceRoom")
|
||||
if chambers:
|
||||
chamber = chambers[0]
|
||||
else:
|
||||
chamber = create_object(PalaceRoom, key=chamber_key)
|
||||
chamber.db.memory_topic = wing
|
||||
chamber.db.wing = wing
|
||||
chamber.db.desc = (
|
||||
f"|cThe Chamber of {wing.title()}|n\n"
|
||||
f"This room holds the accumulated memories of the {wing} wing.\n"
|
||||
f"A steward stands ready to answer questions."
|
||||
)
|
||||
chamber.update_description()
|
||||
|
||||
# Link hall <-> chamber with exits
|
||||
exit_name = f"{wing}-chamber"
|
||||
existing_exits = [ex for ex in hall.exits if ex.key == exit_name]
|
||||
if not existing_exits:
|
||||
create_object(Exit, key=exit_name, location=hall, destination=chamber)
|
||||
|
||||
return_exits = [ex for ex in chamber.exits if ex.key == "hall"]
|
||||
if not return_exits:
|
||||
create_object(Exit, key="hall", location=chamber, destination=hall)
|
||||
|
||||
# Place or summon steward
|
||||
steward_key = f"{wing}-steward"
|
||||
stewards = search_object(steward_key, typeclass="typeclasses.steward_npc.StewardNPC")
|
||||
if stewards:
|
||||
steward = stewards[0]
|
||||
if steward.location != chamber:
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
else:
|
||||
steward = create_object(StewardNPC, key=steward_key)
|
||||
steward.db.wing = wing
|
||||
steward.db.steward_name = wing.title()
|
||||
steward.move_to(chamber, move_type="teleport")
|
||||
|
||||
return hall
|
||||
87
docs/bezalel/evennia/palace_room.py
Normal file
87
docs/bezalel/evennia/palace_room.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""
|
||||
PalaceRoom
|
||||
|
||||
A Room that represents a topic in the memory palace.
|
||||
Memory objects spawned here embody concepts retrieved from mempalace.
|
||||
Its description auto-populates from a palace search on the memory topic.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultRoom
|
||||
from .objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class PalaceRoom(ObjectParent, DefaultRoom):
|
||||
"""
|
||||
A room in the mind palace. Its db.memory_topic describes what
|
||||
kind of memories are stored here. The description is populated
|
||||
from a live MemPalace search.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.memory_topic = ""
|
||||
self.db.wing = "bezalel"
|
||||
self.db.desc = (
|
||||
f"This is the |c{self.key}|n room of your mind palace.\n"
|
||||
"Memories and concepts drift here like motes of light.\n"
|
||||
"Use |wpalace/search <query>|n or |wrecall <topic>|n to summon memories."
|
||||
)
|
||||
|
||||
def _search_palace(self, query, wing=None, room=None, n=3):
|
||||
"""Call the helper script and return parsed results."""
|
||||
cmd = ["/root/wizards/bezalel/hermes/venv/bin/python", PALACE_SCRIPT, query]
|
||||
cmd.append(wing or "none")
|
||||
cmd.append(room or "none")
|
||||
cmd.append(str(n))
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def update_description(self):
|
||||
"""Refresh the room description from a palace search on its topic."""
|
||||
topic = self.db.memory_topic or self.key.split(":")[-1] if ":" in self.key else self.key
|
||||
wing = self.db.wing or "bezalel"
|
||||
results = self._search_palace(topic, wing=wing, n=3)
|
||||
|
||||
header = (
|
||||
f"=|c {topic.upper()} |n="
|
||||
)
|
||||
desc_lines = [
|
||||
header,
|
||||
f"You stand in the |c{topic}|n room of the |y{wing}|n wing.",
|
||||
"Memories drift here like motes of light.",
|
||||
"",
|
||||
]
|
||||
|
||||
if results:
|
||||
desc_lines.append("|gNearby memories:|n")
|
||||
for i, r in enumerate(results, 1):
|
||||
content = r.get("content", "")[:200]
|
||||
source = r.get("source", "unknown")
|
||||
room_name = r.get("room", "unknown")
|
||||
desc_lines.append(f" |m[{i}]|n |c{room_name}|n — {content}... |x({source})|n")
|
||||
else:
|
||||
desc_lines.append("|xThe palace is quiet here. No memories resonate with this topic yet.|n")
|
||||
|
||||
desc_lines.append("")
|
||||
desc_lines.append("Use |wrecall <query>|n to search deeper, or |wpalace/search <query>|n.")
|
||||
self.db.desc = "\n".join(desc_lines)
|
||||
|
||||
def at_object_receive(self, moved_obj, source_location, **kwargs):
|
||||
"""Refresh description when someone enters."""
|
||||
if moved_obj.has_account:
|
||||
self.update_description()
|
||||
super().at_object_receive(moved_obj, source_location, **kwargs)
|
||||
|
||||
def return_appearance(self, looker):
|
||||
text = super().return_appearance(looker)
|
||||
if self.db.memory_topic:
|
||||
text += f"\n|xTopic: {self.db.memory_topic}|n"
|
||||
return text
|
||||
70
docs/bezalel/evennia/steward_npc.py
Normal file
70
docs/bezalel/evennia/steward_npc.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
StewardNPC
|
||||
|
||||
A palace steward NPC that answers questions by querying the local
|
||||
or fleet MemPalace backend. One steward per wizard wing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from evennia.objects.objects import DefaultCharacter
|
||||
from typeclasses.objects import ObjectParent
|
||||
|
||||
PALACE_SCRIPT = "/root/wizards/bezalel/evennia/palace_search.py"
|
||||
|
||||
|
||||
class StewardNPC(ObjectParent, DefaultCharacter):
|
||||
"""
|
||||
A steward of the mind palace. Ask it about memories,
|
||||
decisions, or events from its wing.
|
||||
"""
|
||||
|
||||
def at_object_creation(self):
|
||||
super().at_object_creation()
|
||||
self.db.wing = "bezalel"
|
||||
self.db.steward_name = "Bezalel"
|
||||
self.db.desc = (
|
||||
f"|c{self.key}|n stands here quietly, eyes like polished steel, "
|
||||
"waiting to recall anything from the palace archives."
|
||||
)
|
||||
self.locks.add("get:false();delete:perm(Admin)")
|
||||
|
||||
def _search_palace(self, query, fleet=False, n=3):
|
||||
cmd = [
|
||||
"/root/wizards/bezalel/hermes/venv/bin/python",
|
||||
PALACE_SCRIPT,
|
||||
query,
|
||||
"none" if fleet else self.db.wing,
|
||||
"none",
|
||||
str(n),
|
||||
]
|
||||
if fleet:
|
||||
cmd.append("--fleet")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(result.stdout)
|
||||
return data.get("results", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _summarize_for_speech(self, results, query):
|
||||
"""Convert search results into in-character dialogue."""
|
||||
if not results:
|
||||
return "I find no memory of that in the palace."
|
||||
|
||||
lines = [f"Regarding '{query}':"]
|
||||
for r in results:
|
||||
room = r.get("room", "unknown")
|
||||
content = r.get("content", "")[:300]
|
||||
source = r.get("source", "unknown")
|
||||
lines.append(f" From the |c{room}|n room: {content}... |x[{source}]|n")
|
||||
return "\n".join(lines)
|
||||
|
||||
def respond_to_question(self, question, asker, fleet=False):
|
||||
results = self._search_palace(question, fleet=fleet, n=3)
|
||||
speech = self._summarize_for_speech(results, question)
|
||||
self.location.msg_contents(
|
||||
f"|c{self.key}|n says to $you(asker): \"{speech}\"",
|
||||
mapping={"asker": asker},
|
||||
from_obj=self,
|
||||
)
|
||||
@@ -1,44 +1,6 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Apply branch protections to all repositories
|
||||
# Requires GITEA_TOKEN env var
|
||||
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for repo in "${REPOS[@]}"
|
||||
do
|
||||
curl -X POST "https://forge.alexanderwhitestone.com/api/v1/repos/Timmy_Foundation/$repo/branches/main/protection" \
|
||||
-H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"required_reviews": 1,
|
||||
"dismiss_stale_reviews": true,
|
||||
"block_force_push": true,
|
||||
"block_deletions": true
|
||||
}'
|
||||
done
|
||||
#!/bin/bash
|
||||
|
||||
# Gitea API credentials
|
||||
GITEA_TOKEN="your-personal-access-token"
|
||||
GITEA_API="https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
# Repos to protect
|
||||
REPOS=("hermes-agent" "the-nexus" "timmy-home" "timmy-config")
|
||||
|
||||
for REPO in "${REPO[@]}"; do
|
||||
echo "Configuring branch protection for $REPO..."
|
||||
|
||||
curl -X POST -H "Authorization: token $GITEA_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"name": "main",
|
||||
"require_pull_request": true,
|
||||
"required_approvals": 1,
|
||||
"dismiss_stale_approvals": true,
|
||||
"required_status_checks": '"$(test "$REPO" = "hermes-agent" && echo "true" || echo "false")"',
|
||||
"block_force_push": true,
|
||||
"block_delete": true
|
||||
}' \
|
||||
"$GITEA_API/repos/Timmy_Foundation/$REPO/branch_protection"
|
||||
done
|
||||
# Wrapper for the canonical branch-protection sync script.
|
||||
# Usage: ./gitea-branch-protection.sh
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
python3 scripts/sync_branch_protection.py
|
||||
|
||||
186
mempalace/fleet_api.py
Normal file
186
mempalace/fleet_api.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fleet_api.py — Lightweight HTTP API for the shared fleet palace.
|
||||
|
||||
Exposes fleet memory search over HTTP so that Alpha servers and other
|
||||
wizard deployments can query the palace without direct filesystem access.
|
||||
|
||||
Endpoints:
|
||||
GET /health
|
||||
Returns {"status": "ok", "palace": "<path>"}
|
||||
|
||||
GET /search?q=<query>[&room=<room>][&n=<int>]
|
||||
Returns {"results": [...], "query": "...", "room": "...", "count": N}
|
||||
Each result: {"text": "...", "room": "...", "wing": "...", "score": 0.9}
|
||||
|
||||
GET /wings
|
||||
Returns {"wings": ["bezalel", ...]} — distinct wizard wings present
|
||||
|
||||
Error responses use {"error": "<message>"} with appropriate HTTP status codes.
|
||||
|
||||
Usage:
|
||||
# Default: localhost:7771, fleet palace at /var/lib/mempalace/fleet
|
||||
python mempalace/fleet_api.py
|
||||
|
||||
# Custom host/port/palace:
|
||||
FLEET_PALACE_PATH=/data/fleet python mempalace/fleet_api.py --host 0.0.0.0 --port 8080
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
# Add repo root to path so we can import nexus.mempalace
|
||||
_HERE = Path(__file__).resolve().parent
|
||||
_REPO_ROOT = _HERE.parent
|
||||
if str(_REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
DEFAULT_HOST = "127.0.0.1"
|
||||
DEFAULT_PORT = 7771
|
||||
MAX_RESULTS = 50
|
||||
|
||||
|
||||
def _get_palace_path() -> Path:
|
||||
return Path(os.environ.get("FLEET_PALACE_PATH", "/var/lib/mempalace/fleet"))
|
||||
|
||||
|
||||
def _json_response(handler: BaseHTTPRequestHandler, status: int, body: dict) -> None:
|
||||
payload = json.dumps(body).encode()
|
||||
handler.send_response(status)
|
||||
handler.send_header("Content-Type", "application/json")
|
||||
handler.send_header("Content-Length", str(len(payload)))
|
||||
handler.end_headers()
|
||||
handler.wfile.write(payload)
|
||||
|
||||
|
||||
def _handle_health(handler: BaseHTTPRequestHandler) -> None:
|
||||
palace = _get_palace_path()
|
||||
_json_response(handler, 200, {
|
||||
"status": "ok",
|
||||
"palace": str(palace),
|
||||
"palace_exists": palace.exists(),
|
||||
})
|
||||
|
||||
|
||||
def _handle_search(handler: BaseHTTPRequestHandler, qs: dict) -> None:
|
||||
query_terms = qs.get("q", [""])
|
||||
q = query_terms[0].strip() if query_terms else ""
|
||||
if not q:
|
||||
_json_response(handler, 400, {"error": "Missing required parameter: q"})
|
||||
return
|
||||
|
||||
room_terms = qs.get("room", [])
|
||||
room = room_terms[0].strip() if room_terms else None
|
||||
|
||||
n_terms = qs.get("n", [])
|
||||
try:
|
||||
n = max(1, min(int(n_terms[0]), MAX_RESULTS)) if n_terms else 10
|
||||
except (ValueError, IndexError):
|
||||
_json_response(handler, 400, {"error": "Invalid parameter: n must be an integer"})
|
||||
return
|
||||
|
||||
try:
|
||||
from nexus.mempalace.searcher import search_fleet, MemPalaceUnavailable
|
||||
except ImportError as exc:
|
||||
_json_response(handler, 503, {"error": f"MemPalace module not available: {exc}"})
|
||||
return
|
||||
|
||||
try:
|
||||
results = search_fleet(q, room=room, n_results=n)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_json_response(handler, 503, {"error": str(exc)})
|
||||
return
|
||||
|
||||
_json_response(handler, 200, {
|
||||
"query": q,
|
||||
"room": room,
|
||||
"count": len(results),
|
||||
"results": [
|
||||
{
|
||||
"text": r.text,
|
||||
"room": r.room,
|
||||
"wing": r.wing,
|
||||
"score": round(r.score, 4),
|
||||
}
|
||||
for r in results
|
||||
],
|
||||
})
|
||||
|
||||
|
||||
def _handle_wings(handler: BaseHTTPRequestHandler) -> None:
|
||||
"""Return distinct wizard wing names found in the fleet palace directory."""
|
||||
palace = _get_palace_path()
|
||||
if not palace.exists():
|
||||
_json_response(handler, 503, {
|
||||
"error": f"Fleet palace not found: {palace}",
|
||||
})
|
||||
return
|
||||
|
||||
wings = sorted({
|
||||
p.name for p in palace.iterdir() if p.is_dir()
|
||||
})
|
||||
_json_response(handler, 200, {"wings": wings})
|
||||
|
||||
|
||||
class FleetAPIHandler(BaseHTTPRequestHandler):
|
||||
"""Request handler for the fleet memory API."""
|
||||
|
||||
def log_message(self, fmt: str, *args) -> None: # noqa: ANN001
|
||||
# Prefix with tag for easier log filtering
|
||||
sys.stderr.write(f"[fleet_api] {fmt % args}\n")
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
parsed = urlparse(self.path)
|
||||
path = parsed.path.rstrip("/") or "/"
|
||||
qs = parse_qs(parsed.query)
|
||||
|
||||
if path == "/health":
|
||||
_handle_health(self)
|
||||
elif path == "/search":
|
||||
_handle_search(self, qs)
|
||||
elif path == "/wings":
|
||||
_handle_wings(self)
|
||||
else:
|
||||
_json_response(self, 404, {
|
||||
"error": f"Unknown endpoint: {path}",
|
||||
"endpoints": ["/health", "/search", "/wings"],
|
||||
})
|
||||
|
||||
|
||||
def make_server(host: str = DEFAULT_HOST, port: int = DEFAULT_PORT) -> HTTPServer:
|
||||
return HTTPServer((host, port), FleetAPIHandler)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet palace HTTP API server."
|
||||
)
|
||||
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Bind host (default: {DEFAULT_HOST})")
|
||||
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Bind port (default: {DEFAULT_PORT})")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
palace = _get_palace_path()
|
||||
print(f"[fleet_api] Palace: {palace}")
|
||||
if not palace.exists():
|
||||
print(f"[fleet_api] WARNING: palace path does not exist yet: {palace}", file=sys.stderr)
|
||||
|
||||
server = make_server(args.host, args.port)
|
||||
print(f"[fleet_api] Listening on http://{args.host}:{args.port}")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[fleet_api] Shutting down.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
136
nexus/cron_heartbeat.py
Normal file
136
nexus/cron_heartbeat.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Poka-yoke heartbeat writer for cron jobs.
|
||||
|
||||
Every scheduled job calls write_cron_heartbeat() on successful completion so
|
||||
the meta-heartbeat checker (bin/check_cron_heartbeats.py) can verify that all
|
||||
jobs are still alive. Absence of a fresh heartbeat = silent failure.
|
||||
|
||||
Path convention
|
||||
---------------
|
||||
Primary: /var/run/bezalel/heartbeats/<job>.last
|
||||
Fallback: ~/.bezalel/heartbeats/<job>.last
|
||||
(used when /var/run/bezalel is not writable, e.g. dev machines)
|
||||
Override: BEZALEL_HEARTBEAT_DIR environment variable
|
||||
|
||||
Heartbeat file format (JSON)
|
||||
----------------------------
|
||||
{
|
||||
"job": "nexus_watchdog",
|
||||
"timestamp": 1744000000.0,
|
||||
"interval_seconds": 300,
|
||||
"pid": 12345,
|
||||
"status": "ok"
|
||||
}
|
||||
|
||||
Usage in a cron job
|
||||
-------------------
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat
|
||||
|
||||
def main():
|
||||
# ... do the work ...
|
||||
write_cron_heartbeat("my_job_name", interval_seconds=300)
|
||||
|
||||
Zero-dependency shell one-liner (for scripts that can't import Python)
|
||||
-----------------------------------------------------------------------
|
||||
python -c "
|
||||
from nexus.cron_heartbeat import write_cron_heartbeat
|
||||
write_cron_heartbeat('my_job', interval_seconds=300)
|
||||
"
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
PRIMARY_HEARTBEAT_DIR = Path("/var/run/bezalel/heartbeats")
|
||||
FALLBACK_HEARTBEAT_DIR = Path.home() / ".bezalel" / "heartbeats"
|
||||
|
||||
|
||||
def _resolve_heartbeat_dir() -> Path:
|
||||
"""Return the heartbeat directory, trying primary then fallback.
|
||||
|
||||
If BEZALEL_HEARTBEAT_DIR is set in the environment that wins outright
|
||||
(useful for tests and non-standard deployments).
|
||||
"""
|
||||
env = os.environ.get("BEZALEL_HEARTBEAT_DIR")
|
||||
if env:
|
||||
return Path(env)
|
||||
|
||||
# Try to create and write-test the primary path
|
||||
try:
|
||||
PRIMARY_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
probe = PRIMARY_HEARTBEAT_DIR / ".write_probe"
|
||||
probe.touch()
|
||||
probe.unlink()
|
||||
return PRIMARY_HEARTBEAT_DIR
|
||||
except (PermissionError, OSError):
|
||||
pass
|
||||
|
||||
FALLBACK_HEARTBEAT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
return FALLBACK_HEARTBEAT_DIR
|
||||
|
||||
|
||||
def heartbeat_path(job: str, directory: Path | None = None) -> Path:
|
||||
"""Return the Path where *job*'s heartbeat file lives.
|
||||
|
||||
Useful for readers (e.g. the Night Watch report) that just need the
|
||||
location without writing anything.
|
||||
"""
|
||||
d = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
return d / f"{job}.last"
|
||||
|
||||
|
||||
def write_cron_heartbeat(
|
||||
job: str,
|
||||
interval_seconds: int,
|
||||
status: str = "ok",
|
||||
directory: Path | None = None,
|
||||
) -> Path:
|
||||
"""Write a poka-yoke heartbeat file for a cron job.
|
||||
|
||||
Call this at the end of your job's main function. The file is written
|
||||
atomically (write-to-temp + rename) so the checker never reads a partial
|
||||
file.
|
||||
|
||||
Args:
|
||||
job: Unique job name, e.g. ``"nexus_watchdog"``.
|
||||
interval_seconds: Expected run cadence, e.g. ``300`` for every 5 min.
|
||||
status: Completion status: ``"ok"``, ``"warn"``, or
|
||||
``"error"``. Only ``"ok"`` resets the stale clock.
|
||||
directory: Override the heartbeat directory (mainly for tests).
|
||||
|
||||
Returns:
|
||||
Path to the written heartbeat file.
|
||||
"""
|
||||
d = directory if directory is not None else _resolve_heartbeat_dir()
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
path = d / f"{job}.last"
|
||||
|
||||
data = {
|
||||
"job": job,
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": interval_seconds,
|
||||
"pid": os.getpid(),
|
||||
"status": status,
|
||||
}
|
||||
|
||||
# Atomic write: temp file in same directory + rename.
|
||||
# Guarantees the checker never sees a half-written file.
|
||||
fd, tmp = tempfile.mkstemp(dir=str(d), prefix=f".{job}-", suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump(data, f)
|
||||
os.replace(tmp, str(path))
|
||||
except Exception:
|
||||
# Best-effort — never crash the job over a heartbeat failure
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return path
|
||||
@@ -2,11 +2,17 @@
|
||||
Morning Report Generator — runs at 0600 to compile overnight activity.
|
||||
Gathers: cycles executed, issues closed, PRs merged, commits pushed.
|
||||
Outputs a structured report for delivery to the main channel.
|
||||
|
||||
Includes a HEARTBEAT PANEL that checks all cron job heartbeats via
|
||||
bezalel_heartbeat_check.py (poka-yoke #1096). Any stale jobs surface
|
||||
as blockers in the report.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
@@ -118,7 +124,46 @@ def generate_morning_report():
|
||||
if not report["highlights"] and not report["blockers"]:
|
||||
print("No significant activity or blockers detected.")
|
||||
print("")
|
||||
|
||||
|
||||
# ── Heartbeat panel (poka-yoke #1096) ────────────────────────────────────
|
||||
# Import bezalel_heartbeat_check via importlib so we don't need __init__.py
|
||||
# or a sys.path hack. If the module is missing or the dir doesn't exist,
|
||||
# we print a "not provisioned" notice and continue — never crash the report.
|
||||
_hb_result = None
|
||||
try:
|
||||
_project_root = Path(__file__).parent.parent
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check",
|
||||
_project_root / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
if _hb_spec is not None:
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules.setdefault("bezalel_heartbeat_check", _hb_mod)
|
||||
_hb_spec.loader.exec_module(_hb_mod) # type: ignore[union-attr]
|
||||
_hb_result = _hb_mod.check_cron_heartbeats()
|
||||
except Exception:
|
||||
_hb_result = None
|
||||
|
||||
print("HEARTBEAT PANEL:")
|
||||
if _hb_result is None or not _hb_result.get("jobs"):
|
||||
print(" HEARTBEAT PANEL: no data (bezalel not provisioned)")
|
||||
report["heartbeat_panel"] = {"status": "not_provisioned"}
|
||||
else:
|
||||
for _job in _hb_result["jobs"]:
|
||||
_prefix = "+" if _job["healthy"] else "-"
|
||||
print(f" {_prefix} {_job['job']}: {_job['message']}")
|
||||
if not _job["healthy"]:
|
||||
report["blockers"].append(
|
||||
f"Stale heartbeat: {_job['job']} — {_job['message']}"
|
||||
)
|
||||
print("")
|
||||
report["heartbeat_panel"] = {
|
||||
"checked_at": _hb_result.get("checked_at"),
|
||||
"healthy_count": _hb_result.get("healthy_count", 0),
|
||||
"stale_count": _hb_result.get("stale_count", 0),
|
||||
"jobs": _hb_result.get("jobs", []),
|
||||
}
|
||||
|
||||
# Save report
|
||||
report_dir = Path(os.path.expanduser("~/.local/timmy/reports"))
|
||||
report_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
14
pytest.ini
Normal file
14
pytest.ini
Normal file
@@ -0,0 +1,14 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
asyncio_mode = auto
|
||||
|
||||
# Show full diffs and verbose skip/fail reasons
|
||||
addopts =
|
||||
-v
|
||||
--tb=short
|
||||
--strict-markers
|
||||
|
||||
# Markers registered here (also registered in conftest.py for programmatic use)
|
||||
markers =
|
||||
integration: mark test as integration test (requires MCP servers)
|
||||
quarantine: mark test as quarantined (flaky/broken, tracked by issue)
|
||||
95
scripts/audit_mempalace_privacy.py
Normal file
95
scripts/audit_mempalace_privacy.py
Normal file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Audit the fleet shared palace for privacy violations.
|
||||
Ensures no raw drawers, full source paths, or private workspace leaks exist.
|
||||
|
||||
Usage:
|
||||
python audit_mempalace_privacy.py /path/to/fleet/palace
|
||||
|
||||
Exit codes:
|
||||
0 = clean
|
||||
1 = violations found
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import chromadb
|
||||
except ImportError:
|
||||
print("ERROR: chromadb not installed")
|
||||
sys.exit(1)
|
||||
|
||||
VIOLATION_KEYWORDS = [
|
||||
"/root/wizards/",
|
||||
"/home/",
|
||||
"/Users/",
|
||||
"private_key",
|
||||
"-----BEGIN",
|
||||
"GITEA_TOKEN=",
|
||||
"OPENAI_API_KEY",
|
||||
"ANTHROPIC_API_KEY",
|
||||
]
|
||||
|
||||
|
||||
def audit(palace_path: Path):
|
||||
violations = []
|
||||
client = chromadb.PersistentClient(path=str(palace_path))
|
||||
try:
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
except Exception as e:
|
||||
print(f"ERROR: Could not open collection: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
all_data = col.get(include=["documents", "metadatas"])
|
||||
docs = all_data["documents"]
|
||||
metas = all_data["metadatas"]
|
||||
|
||||
for doc, meta in zip(docs, metas):
|
||||
source = meta.get("source_file", "")
|
||||
doc_type = meta.get("type", "")
|
||||
|
||||
# Rule 1: Fleet palace should only contain closets or explicitly typed entries
|
||||
if doc_type not in ("closet", "summary", "fleet"):
|
||||
violations.append(
|
||||
f"VIOLATION: Document type is '{doc_type}' (expected closet/summary/fleet). "
|
||||
f"Source: {source}"
|
||||
)
|
||||
|
||||
# Rule 2: No full absolute paths from private workspaces
|
||||
if any(abs_path in source for abs_path in ["/root/wizards/", "/home/", "/Users/"]):
|
||||
violations.append(
|
||||
f"VIOLATION: Source contains absolute path: {source}"
|
||||
)
|
||||
|
||||
# Rule 3: No raw secrets in document text
|
||||
for kw in VIOLATION_KEYWORDS:
|
||||
if kw in doc:
|
||||
violations.append(
|
||||
f"VIOLATION: Document contains sensitive keyword '{kw}'. Source: {source}"
|
||||
)
|
||||
break # one violation per doc is enough
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Audit fleet palace privacy")
|
||||
parser.add_argument("palace", default="/var/lib/mempalace/fleet", nargs="?", help="Path to fleet palace")
|
||||
args = parser.parse_args()
|
||||
|
||||
violations = audit(Path(args.palace))
|
||||
|
||||
if violations:
|
||||
print(f"FAIL: {len(violations)} privacy violation(s) found")
|
||||
for v in violations:
|
||||
print(f" {v}")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("PASS: No privacy violations detected")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet Merge Review Audit
|
||||
========================
|
||||
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||
and validates that each merged PR had at least one approving review.
|
||||
|
||||
Exit 0 = no unreviewed merges
|
||||
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||
|
||||
Usage:
|
||||
python scripts/audit_merge_reviews.py
|
||||
python scripts/audit_merge_reviews.py --create-issues
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import json
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
DAYS_BACK = 7
|
||||
SECURITY_LABEL = "security"
|
||||
|
||||
|
||||
def api_request(path: str) -> dict | list:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def api_post(path: str, payload: dict) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repos() -> list[str]:
|
||||
repos = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||
if not batch:
|
||||
break
|
||||
repos.extend([r["name"] for r in batch])
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(
|
||||
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||
prs.append(pr)
|
||||
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||
return prs
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
|
||||
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||
try:
|
||||
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
|
||||
|
||||
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||
body = (
|
||||
f"## Unreviewed Merge Detected\n\n"
|
||||
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||
f"- **Merged at:** {pr['merged_at']}\n"
|
||||
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||
f"### Required Actions\n"
|
||||
f"1. Validate the merge contents are safe.\n"
|
||||
f"2. If malicious or incorrect, revert immediately.\n"
|
||||
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||
)
|
||||
try:
|
||||
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [SECURITY_LABEL],
|
||||
})
|
||||
return issue.get("number")
|
||||
except Exception as e:
|
||||
print(f" FAILED to create issue: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||
return 1
|
||||
|
||||
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||
since = since_dt.isoformat()
|
||||
|
||||
repos = get_repos()
|
||||
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||
|
||||
unreviewed_count = 0
|
||||
for repo in repos:
|
||||
merged = get_merged_prs(repo, since)
|
||||
if not merged:
|
||||
continue
|
||||
|
||||
repo_unreviewed = []
|
||||
for pr in merged:
|
||||
reviews = get_reviews(repo, pr["number"])
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if not approvals:
|
||||
repo_unreviewed.append(pr)
|
||||
|
||||
if repo_unreviewed:
|
||||
print(f"\n{repo}:")
|
||||
for pr in repo_unreviewed:
|
||||
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||
unreviewed_count += 1
|
||||
if args.create_issues:
|
||||
issue_num = create_post_mortem(repo, pr)
|
||||
if issue_num:
|
||||
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
if unreviewed_count == 0:
|
||||
print("All merges in the last 7 days had at least one approving review.")
|
||||
return 0
|
||||
else:
|
||||
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
50
scripts/backup_databases.sh
Executable file
50
scripts/backup_databases.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel Database Backup — MemPalace + Evennia + Fleet
|
||||
# Runs nightly after re-mine completes. Keeps 7 days of rolling backups.
|
||||
set -euo pipefail
|
||||
|
||||
BACKUP_BASE="/root/wizards/bezalel/home/backups"
|
||||
DATE=$(date +%Y%m%d_%H%M%S)
|
||||
LOG="/var/log/bezalel_db_backup.log"
|
||||
|
||||
# Sources
|
||||
LOCAL_PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_PALACE="/var/lib/mempalace/fleet"
|
||||
EVENNIA_DB="/root/wizards/bezalel/evennia/bezalel_world/server/evennia.db3"
|
||||
|
||||
# Destinations
|
||||
LOCAL_BACKUP="${BACKUP_BASE}/mempalace/mempalace_${DATE}.tar.gz"
|
||||
FLEET_BACKUP="${BACKUP_BASE}/fleet/fleet_${DATE}.tar.gz"
|
||||
EVENNIA_BACKUP="${BACKUP_BASE}/evennia/evennia_${DATE}.db3.gz"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting database backup cycle..."
|
||||
|
||||
# 1. Backup local MemPalace
|
||||
tar -czf "$LOCAL_BACKUP" -C "$(dirname "$LOCAL_PALACE")" "$(basename "$LOCAL_PALACE")"
|
||||
log "Local palace backed up: ${LOCAL_BACKUP} ($(du -h "$LOCAL_BACKUP" | cut -f1))"
|
||||
|
||||
# 2. Backup fleet MemPalace
|
||||
tar -czf "$FLEET_BACKUP" -C "$(dirname "$FLEET_PALACE")" "$(basename "$FLEET_PALACE")"
|
||||
log "Fleet palace backed up: ${FLEET_BACKUP} ($(du -h "$FLEET_BACKUP" | cut -f1))"
|
||||
|
||||
# 3. Backup Evennia DB (gzip for space)
|
||||
gzip -c "$EVENNIA_DB" > "$EVENNIA_BACKUP"
|
||||
log "Evennia DB backed up: ${EVENNIA_BACKUP} ($(du -h "$EVENNIA_BACKUP" | cut -f1))"
|
||||
|
||||
# 4. Prune backups older than 7 days
|
||||
find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' -mtime +7 -delete
|
||||
find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' -mtime +7 -delete
|
||||
log "Pruned backups older than 7 days"
|
||||
|
||||
# 5. Report counts
|
||||
MP_COUNT=$(find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' | wc -l)
|
||||
FL_COUNT=$(find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' | wc -l)
|
||||
EV_COUNT=$(find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' | wc -l)
|
||||
log "Backup cycle complete. Retained: mempalace=${MP_COUNT}, fleet=${FL_COUNT}, evennia=${EV_COUNT}"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/db_backup.last
|
||||
135
scripts/ci_auto_revert.py
Normal file
135
scripts/ci_auto_revert.py
Normal file
@@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CI Auto-Revert — Poka-yoke for broken merges.
|
||||
Monitors the main branch post-merge and auto-reverts via local git if CI fails.
|
||||
|
||||
Usage:
|
||||
python ci_auto_revert.py <repo_owner>/<repo_name>
|
||||
python ci_auto_revert.py Timmy_Foundation/hermes-agent
|
||||
|
||||
Recommended cron: */10 * * * *
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REVERT_WINDOW_MINUTES = 10
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_recent_commits(owner, repo, since):
|
||||
since_iso = since.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits?sha=main&since={since_iso}&limit=20")
|
||||
|
||||
|
||||
def get_commit_status(owner, repo, sha):
|
||||
return api_call("GET", f"/repos/{owner}/{repo}/commits/{sha}/status")
|
||||
|
||||
|
||||
def revert_via_git(clone_url, sha, msg, owner, repo):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Clone with token
|
||||
auth_url = clone_url.replace("https://", f"https://bezalel:{GITEA_TOKEN}@")
|
||||
subprocess.run(["git", "clone", "--depth", "10", auth_url, tmpdir], check=True, capture_output=True)
|
||||
|
||||
# Configure git
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.email", "bezalel@timmy.foundation"], check=True, capture_output=True)
|
||||
subprocess.run(["git", "-C", tmpdir, "config", "user.name", "Bezalel"], check=True, capture_output=True)
|
||||
|
||||
# Revert the commit
|
||||
revert_msg = f"[auto-revert] {msg}\n\nOriginal commit {sha} failed CI."
|
||||
result = subprocess.run(
|
||||
["git", "-C", tmpdir, "revert", "--no-edit", "-m", revert_msg, sha],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return {"error": f"git revert failed: {result.stderr}"}
|
||||
|
||||
# Push
|
||||
push_result = subprocess.run(
|
||||
["git", "-C", tmpdir, "push", "origin", "main"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if push_result.returncode != 0:
|
||||
return {"error": f"git push failed: {push_result.stderr}"}
|
||||
|
||||
return {"ok": True, "reverted_sha": sha}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: {sys.argv[0]} <owner/repo>")
|
||||
sys.exit(1)
|
||||
|
||||
repo_full = sys.argv[1]
|
||||
owner, repo = repo_full.split("/", 1)
|
||||
|
||||
since = datetime.now(timezone.utc) - timedelta(minutes=REVERT_WINDOW_MINUTES + 5)
|
||||
commits = get_recent_commits(owner, repo, since)
|
||||
|
||||
if not isinstance(commits, list):
|
||||
print(f"ERROR fetching commits: {commits}")
|
||||
sys.exit(1)
|
||||
|
||||
reverted = 0
|
||||
for commit in commits:
|
||||
sha = commit.get("sha", "")
|
||||
msg = commit.get("commit", {}).get("message", "").split("\n")[0]
|
||||
commit_time = commit.get("commit", {}).get("committer", {}).get("date", "")
|
||||
if not commit_time:
|
||||
continue
|
||||
|
||||
commit_dt = datetime.fromisoformat(commit_time.replace("Z", "+00:00"))
|
||||
age_min = (datetime.now(timezone.utc) - commit_dt).total_seconds() / 60
|
||||
|
||||
if age_min > REVERT_WINDOW_MINUTES:
|
||||
continue
|
||||
|
||||
status = get_commit_status(owner, repo, sha)
|
||||
state = status.get("state", "")
|
||||
|
||||
if state == "failure":
|
||||
print(f"ALERT: Commit {sha[:8]} '{msg}' failed CI ({age_min:.1f}m old). Initiating revert...")
|
||||
repo_info = api_call("GET", f"/repos/{owner}/{repo}")
|
||||
clone_url = repo_info.get("clone_url", "")
|
||||
if not clone_url:
|
||||
print(f" Cannot find clone URL")
|
||||
continue
|
||||
result = revert_via_git(clone_url, sha, msg, owner, repo)
|
||||
if "error" in result:
|
||||
print(f" Revert failed: {result['error']}")
|
||||
else:
|
||||
print(f" Reverted successfully.")
|
||||
reverted += 1
|
||||
elif state == "success":
|
||||
print(f"OK: Commit {sha[:8]} '{msg}' passed CI.")
|
||||
elif state == "pending":
|
||||
print(f"PENDING: Commit {sha[:8]} '{msg}' still running CI.")
|
||||
else:
|
||||
print(f"UNKNOWN: Commit {sha[:8]} '{msg}' has CI state '{state}'.")
|
||||
|
||||
if reverted == 0:
|
||||
print("No broken merges found in the last 10 minutes.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
115
scripts/cron-heartbeat-write.sh
Executable file
115
scripts/cron-heartbeat-write.sh
Executable file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env bash
|
||||
# cron-heartbeat-write.sh — Bezalel Cron Heartbeat Writer (poka-yoke #1096)
|
||||
# Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
#
|
||||
# POKA-YOKE design:
|
||||
# Prevention — Cron jobs declare their identity + expected interval up front.
|
||||
# Detection — bezalel_heartbeat_check.py reads these files every 15 min and
|
||||
# alerts P1 if any job is silent for > 2× its interval.
|
||||
# Correction — Alerts fire fast enough for manual intervention or auto-restart
|
||||
# before the next scheduled run window expires.
|
||||
#
|
||||
# Usage:
|
||||
# cron-heartbeat-write.sh <job-name> [interval-seconds]
|
||||
#
|
||||
# <job-name> Unique identifier for this cron job (e.g. "morning-report")
|
||||
# [interval-seconds] Expected run interval in seconds (default: 3600)
|
||||
#
|
||||
# The heartbeat file is written to:
|
||||
# /var/run/bezalel/heartbeats/<job-name>.last
|
||||
#
|
||||
# File format (JSON):
|
||||
# {"job":"<name>","timestamp":<epoch_float>,"interval":<secs>,"pid":<pid>}
|
||||
#
|
||||
# This script ALWAYS exits 0 — it must never crash the calling cron job.
|
||||
#
|
||||
# Typical crontab usage:
|
||||
# 0 * * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh hourly-job 3600
|
||||
# 0 6 * * * /root/wizards/the-nexus/scripts/cron-heartbeat-write.sh morning-report 86400
|
||||
|
||||
set -uo pipefail
|
||||
|
||||
# ── Configuration ─────────────────────────────────────────────────────────────
|
||||
HEARTBEAT_DIR="${BEZALEL_HEARTBEAT_DIR:-/var/run/bezalel/heartbeats}"
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT: $*"; }
|
||||
warn() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] HEARTBEAT WARNING: $*" >&2; }
|
||||
|
||||
# ── Input validation ──────────────────────────────────────────────────────────
|
||||
if [[ $# -lt 1 ]]; then
|
||||
warn "Usage: $0 <job-name> [interval-seconds]"
|
||||
warn "No job name provided — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
JOB_NAME="$1"
|
||||
INTERVAL_SECS="${2:-3600}"
|
||||
|
||||
# Sanitize job name to prevent path traversal / weird filenames
|
||||
# Allow alphanumeric, dash, underscore, dot only
|
||||
SAFE_JOB_NAME="${JOB_NAME//[^a-zA-Z0-9_.-]/}"
|
||||
if [[ -z "$SAFE_JOB_NAME" ]]; then
|
||||
warn "Job name '${JOB_NAME}' contains only unsafe characters — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [[ "$SAFE_JOB_NAME" != "$JOB_NAME" ]]; then
|
||||
warn "Job name sanitized: '${JOB_NAME}' → '${SAFE_JOB_NAME}'"
|
||||
fi
|
||||
|
||||
# Validate interval is a positive integer
|
||||
if ! [[ "$INTERVAL_SECS" =~ ^[0-9]+$ ]] || (( INTERVAL_SECS < 1 )); then
|
||||
warn "Invalid interval '${INTERVAL_SECS}' — using default 3600."
|
||||
INTERVAL_SECS=3600
|
||||
fi
|
||||
|
||||
# ── Create heartbeat directory ────────────────────────────────────────────────
|
||||
if ! mkdir -p "$HEARTBEAT_DIR" 2>/dev/null; then
|
||||
warn "Cannot create heartbeat dir '${HEARTBEAT_DIR}' — heartbeat not written."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ── Build JSON payload ────────────────────────────────────────────────────────
|
||||
# Use python3 for reliable epoch float and JSON encoding.
|
||||
# Falls back to date-based approach if python3 unavailable.
|
||||
TIMESTAMP=$(python3 -c "import time; print(time.time())" 2>/dev/null \
|
||||
|| date +%s)
|
||||
|
||||
CURRENT_PID=$$
|
||||
|
||||
PAYLOAD=$(python3 -c "
|
||||
import json, sys
|
||||
print(json.dumps({
|
||||
'job': sys.argv[1],
|
||||
'timestamp': float(sys.argv[2]),
|
||||
'interval': int(sys.argv[3]),
|
||||
'pid': int(sys.argv[4]),
|
||||
}))
|
||||
" "$SAFE_JOB_NAME" "$TIMESTAMP" "$INTERVAL_SECS" "$CURRENT_PID" 2>/dev/null)
|
||||
|
||||
if [[ -z "$PAYLOAD" ]]; then
|
||||
# Minimal fallback if python3 fails
|
||||
PAYLOAD="{\"job\":\"${SAFE_JOB_NAME}\",\"timestamp\":${TIMESTAMP},\"interval\":${INTERVAL_SECS},\"pid\":${CURRENT_PID}}"
|
||||
fi
|
||||
|
||||
# ── Atomic write via temp + rename ────────────────────────────────────────────
|
||||
# Writes to a temp file first then renames, so bezalel_heartbeat_check.py
|
||||
# never sees a partial file mid-write. This is the poka-yoke atomic guarantee.
|
||||
TARGET_FILE="${HEARTBEAT_DIR}/${SAFE_JOB_NAME}.last"
|
||||
TMP_FILE="${HEARTBEAT_DIR}/.${SAFE_JOB_NAME}.last.tmp.$$"
|
||||
|
||||
if printf '%s\n' "$PAYLOAD" > "$TMP_FILE" 2>/dev/null; then
|
||||
if mv "$TMP_FILE" "$TARGET_FILE" 2>/dev/null; then
|
||||
log "Heartbeat written: ${TARGET_FILE} (job=${SAFE_JOB_NAME}, interval=${INTERVAL_SECS}s)"
|
||||
else
|
||||
warn "mv failed for '${TMP_FILE}' → '${TARGET_FILE}' — heartbeat not committed."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
else
|
||||
warn "Write to temp file '${TMP_FILE}' failed — heartbeat not written."
|
||||
rm -f "$TMP_FILE" 2>/dev/null || true
|
||||
fi
|
||||
|
||||
# Always exit 0 — never crash the calling cron job.
|
||||
exit 0
|
||||
256
scripts/flake_detector.py
Executable file
256
scripts/flake_detector.py
Executable file
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Flake detector for the Nexus test suite.
|
||||
|
||||
Reads pytest JSON reports (produced by pytest-json-report) and maintains a
|
||||
rolling history file at .test-history.json. After each update it prints a
|
||||
report of any test whose pass rate has dropped below the 95 % consistency
|
||||
threshold and exits non-zero if any flaky tests are found.
|
||||
|
||||
Usage
|
||||
-----
|
||||
Install pytest-json-report once::
|
||||
|
||||
pip install pytest-json-report
|
||||
|
||||
Then run tests with JSON output::
|
||||
|
||||
pytest --json-report --json-report-file=.test-report.json
|
||||
|
||||
Then call this script::
|
||||
|
||||
python scripts/flake_detector.py # uses .test-report.json + .test-history.json
|
||||
python scripts/flake_detector.py --report path/to/report.json
|
||||
python scripts/flake_detector.py --history path/to/history.json
|
||||
python scripts/flake_detector.py --threshold 0.90 # lower threshold for local dev
|
||||
|
||||
The script is also safe to call with no report file — it will just print the
|
||||
current history statistics without updating anything.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import TypedDict
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRecord(TypedDict):
|
||||
"""Per-test rolling history."""
|
||||
runs: int
|
||||
passes: int
|
||||
failures: int
|
||||
skips: int
|
||||
last_outcome: str # "passed" | "failed" | "skipped" | "error"
|
||||
|
||||
|
||||
class HistoryFile(TypedDict):
|
||||
total_runs: int
|
||||
tests: dict[str, TestRecord]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
DEFAULT_REPORT = Path(".test-report.json")
|
||||
DEFAULT_HISTORY = Path(".test-history.json")
|
||||
DEFAULT_THRESHOLD = 0.95 # 95 % consistency required
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_history(history_path: Path) -> HistoryFile:
|
||||
if history_path.exists():
|
||||
with history_path.open() as fh:
|
||||
return json.load(fh)
|
||||
return {"total_runs": 0, "tests": {}}
|
||||
|
||||
|
||||
def save_history(history: HistoryFile, history_path: Path) -> None:
|
||||
with history_path.open("w") as fh:
|
||||
json.dump(history, fh, indent=2, sort_keys=True)
|
||||
print(f"[flake-detector] History saved → {history_path}", file=sys.stderr)
|
||||
|
||||
|
||||
def ingest_report(report_path: Path, history: HistoryFile) -> int:
|
||||
"""Merge a pytest JSON report into *history*. Returns the number of tests updated."""
|
||||
with report_path.open() as fh:
|
||||
report = json.load(fh)
|
||||
|
||||
history["total_runs"] = history.get("total_runs", 0) + 1
|
||||
tests_section = report.get("tests", [])
|
||||
|
||||
for test in tests_section:
|
||||
node_id: str = test.get("nodeid", "unknown")
|
||||
outcome: str = test.get("outcome", "unknown")
|
||||
|
||||
rec: TestRecord = history["tests"].setdefault(
|
||||
node_id,
|
||||
{"runs": 0, "passes": 0, "failures": 0, "skips": 0, "last_outcome": ""},
|
||||
)
|
||||
rec["runs"] += 1
|
||||
rec["last_outcome"] = outcome
|
||||
if outcome == "passed":
|
||||
rec["passes"] += 1
|
||||
elif outcome in ("failed", "error"):
|
||||
rec["failures"] += 1
|
||||
elif outcome == "skipped":
|
||||
rec["skips"] += 1
|
||||
|
||||
return len(tests_section)
|
||||
|
||||
|
||||
def consistency(rec: TestRecord) -> float:
|
||||
"""Return fraction of runs that produced a deterministic (pass or fail) outcome.
|
||||
|
||||
A test that always passes → 1.0 (stable green).
|
||||
A test that always fails → 0.0 (stable red — broken, not flaky).
|
||||
A test that passes 9 out of 10 times → 0.9 (flaky).
|
||||
|
||||
We define *consistency* as the rate at which the test's outcome matches
|
||||
its dominant outcome (pass or fail). A test with fewer than
|
||||
MIN_RUNS runs is not judged.
|
||||
"""
|
||||
runs = rec["runs"]
|
||||
if runs == 0:
|
||||
return 1.0
|
||||
passes = rec["passes"]
|
||||
failures = rec["failures"]
|
||||
dominant = max(passes, failures)
|
||||
return dominant / runs
|
||||
|
||||
|
||||
MIN_RUNS = 5 # need at least this many runs before flagging
|
||||
|
||||
|
||||
def find_flaky_tests(
|
||||
history: HistoryFile,
|
||||
threshold: float = DEFAULT_THRESHOLD,
|
||||
) -> list[tuple[str, TestRecord, float]]:
|
||||
"""Return (node_id, record, consistency_rate) for all tests below threshold."""
|
||||
flaky: list[tuple[str, TestRecord, float]] = []
|
||||
for node_id, rec in history["tests"].items():
|
||||
if rec["runs"] < MIN_RUNS:
|
||||
continue
|
||||
rate = consistency(rec)
|
||||
if rate < threshold:
|
||||
flaky.append((node_id, rec, rate))
|
||||
flaky.sort(key=lambda x: x[2]) # worst first
|
||||
return flaky
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reporting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def print_report(
|
||||
flaky: list[tuple[str, TestRecord, float]],
|
||||
history: HistoryFile,
|
||||
threshold: float,
|
||||
) -> None:
|
||||
total_tests = len(history["tests"])
|
||||
total_runs = history.get("total_runs", 0)
|
||||
|
||||
print(f"\n{'=' * 70}")
|
||||
print(" FLAKE DETECTOR REPORT")
|
||||
print(f"{'=' * 70}")
|
||||
print(f" Total suite runs tracked : {total_runs}")
|
||||
print(f" Total distinct tests : {total_tests}")
|
||||
print(f" Consistency threshold : {threshold:.0%}")
|
||||
print(f" Min runs before judging : {MIN_RUNS}")
|
||||
print(f"{'=' * 70}")
|
||||
|
||||
if not flaky:
|
||||
print(" ✓ No flaky tests detected — all tests above consistency threshold.")
|
||||
print(f"{'=' * 70}\n")
|
||||
return
|
||||
|
||||
print(f" ✗ {len(flaky)} FLAKY TEST(S) DETECTED:\n")
|
||||
for node_id, rec, rate in flaky:
|
||||
print(f" [{rate:.0%}] {node_id}")
|
||||
print(
|
||||
f" runs={rec['runs']} passes={rec['passes']} "
|
||||
f"failures={rec['failures']} skips={rec['skips']} "
|
||||
f"last={rec['last_outcome']}"
|
||||
)
|
||||
print()
|
||||
|
||||
print(" ACTION REQUIRED:")
|
||||
print(" 1. Move each flaky test to tests/quarantine/")
|
||||
print(" 2. File a tracking issue with [FLAKY] in the title")
|
||||
print(" 3. Add @pytest.mark.quarantine(reason='#NNN') to the test")
|
||||
print(" See docs/QUARANTINE_PROCESS.md for full instructions.")
|
||||
print(f"{'=' * 70}\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Detect flaky tests by analysing pytest JSON report history."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--report",
|
||||
type=Path,
|
||||
default=DEFAULT_REPORT,
|
||||
help=f"Path to pytest JSON report file (default: {DEFAULT_REPORT})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--history",
|
||||
type=Path,
|
||||
default=DEFAULT_HISTORY,
|
||||
help=f"Path to rolling history JSON file (default: {DEFAULT_HISTORY})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--threshold",
|
||||
type=float,
|
||||
default=DEFAULT_THRESHOLD,
|
||||
help=f"Consistency threshold 0–1 (default: {DEFAULT_THRESHOLD})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-update",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Print current statistics without ingesting a new report",
|
||||
)
|
||||
return parser.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv)
|
||||
history = load_history(args.history)
|
||||
|
||||
if not args.no_update:
|
||||
if not args.report.exists():
|
||||
print(
|
||||
f"[flake-detector] No report file at {args.report} — "
|
||||
"run pytest with --json-report first.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
# Not a fatal error; just print current state.
|
||||
else:
|
||||
n = ingest_report(args.report, history)
|
||||
print(
|
||||
f"[flake-detector] Ingested {n} test results from {args.report}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
save_history(history, args.history)
|
||||
|
||||
flaky = find_flaky_tests(history, threshold=args.threshold)
|
||||
print_report(flaky, history, threshold=args.threshold)
|
||||
|
||||
return 1 if flaky else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
75
scripts/mempalace_export.py
Normal file
75
scripts/mempalace_export.py
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Export closets from a local MemPalace wing for fleet-wide sharing.
|
||||
|
||||
Privacy rule: only summaries/closets are exported. No raw source_file paths.
|
||||
Source filenames are anonymized to just the basename.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import chromadb
|
||||
|
||||
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
|
||||
FLEET_INCOMING = "/var/lib/mempalace/fleet/incoming"
|
||||
WING = "bezalel"
|
||||
DOCS_PER_ROOM = 5
|
||||
|
||||
|
||||
def main():
|
||||
client = chromadb.PersistentClient(path=PALACE_PATH)
|
||||
col = client.get_collection("mempalace_drawers")
|
||||
|
||||
# Discover rooms in this wing
|
||||
all_meta = col.get(include=["metadatas"])["metadatas"]
|
||||
rooms = set()
|
||||
for m in all_meta:
|
||||
if m.get("wing") == WING:
|
||||
rooms.add(m.get("room", "general"))
|
||||
|
||||
Path(FLEET_INCOMING).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
closets = []
|
||||
for room in sorted(rooms):
|
||||
results = col.query(
|
||||
query_texts=[room],
|
||||
n_results=DOCS_PER_ROOM,
|
||||
where={"$and": [{"wing": WING}, {"room": room}]},
|
||||
include=["documents", "metadatas"],
|
||||
)
|
||||
docs = results["documents"][0]
|
||||
metas = results["metadatas"][0]
|
||||
|
||||
entries = []
|
||||
for doc, meta in zip(docs, metas):
|
||||
# Sanitize content: strip absolute workspace paths
|
||||
sanitized = doc[:800]
|
||||
sanitized = sanitized.replace("/root/wizards/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/root/wizards/", "~/")
|
||||
sanitized = sanitized.replace("/home/bezalel/", "~/")
|
||||
sanitized = sanitized.replace("/home/", "~/")
|
||||
entries.append({
|
||||
"content": sanitized,
|
||||
"source_basename": Path(meta.get("source_file", "?")).name,
|
||||
})
|
||||
|
||||
closet = {
|
||||
"wing": WING,
|
||||
"room": room,
|
||||
"type": "closet",
|
||||
"entries": entries,
|
||||
}
|
||||
closets.append(closet)
|
||||
|
||||
out_file = Path(FLEET_INCOMING) / f"{WING}_closets.json"
|
||||
with open(out_file, "w") as f:
|
||||
json.dump(closets, f, indent=2)
|
||||
|
||||
print(f"Exported {len(closets)} closets to {out_file}")
|
||||
for c in closets:
|
||||
print(f" {c['wing']} / {c['room']} : {len(c['entries'])} entries")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
24
scripts/mempalace_nightly.sh
Executable file
24
scripts/mempalace_nightly.sh
Executable file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env bash
|
||||
# Bezalel MemPalace Nightly Re-mine + Fleet Sync
|
||||
set -euo pipefail
|
||||
|
||||
PALACE="/root/wizards/bezalel/.mempalace/palace"
|
||||
MINER="/root/wizards/bezalel/hermes/venv/bin/mempalace"
|
||||
WING_DIR="/root/wizards/bezalel"
|
||||
LOG="/var/log/bezalel_mempalace.log"
|
||||
EXPORTER="/root/wizards/bezalel/hermes/venv/bin/python /root/wizards/bezalel/mempalace_export.py"
|
||||
IMPORTER="/root/wizards/bezalel/hermes/venv/bin/python /var/lib/mempalace/fleet_import.py"
|
||||
|
||||
echo "[$(date -Iseconds)] Starting mempalace re-mine" >> "$LOG"
|
||||
cd "$WING_DIR"
|
||||
"$MINER" --palace "$PALACE" mine "$WING_DIR" --agent bezalel >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Finished mempalace re-mine" >> "$LOG"
|
||||
"$MINER" --palace "$PALACE" status >> "$LOG" 2>&1 || true
|
||||
|
||||
echo "[$(date -Iseconds)] Starting fleet closet export" >> "$LOG"
|
||||
$EXPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Starting fleet closet import" >> "$LOG"
|
||||
$IMPORTER >> "$LOG" 2>&1 || true
|
||||
echo "[$(date -Iseconds)] Fleet sync complete" >> "$LOG"
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/mempalace_nightly.last
|
||||
53
scripts/meta_heartbeat.sh
Executable file
53
scripts/meta_heartbeat.sh
Executable file
@@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env bash
|
||||
# Meta-heartbeat — checks all Bezalel cron jobs for stale timestamps
|
||||
set -euo pipefail
|
||||
|
||||
HEARTBEAT_DIR="/var/lib/bezalel/heartbeats"
|
||||
ALERT_LOG="/var/log/bezalel_meta_heartbeat.log"
|
||||
STALE_MINUTES=30
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
mkdir -p "$HEARTBEAT_DIR"
|
||||
|
||||
# Define expected heartbeats: name => max_stale_minutes
|
||||
HEARTBEATS=(
|
||||
"nightly_watch:150" # 2.5h — runs at 02:00
|
||||
"mempalace_nightly:150" # 2.5h — runs at 03:00
|
||||
"db_backup:150" # 2.5h — runs at 03:30
|
||||
"runner_health:15" # 15m — every 5 min
|
||||
)
|
||||
|
||||
NOW_EPOCH=$(date +%s)
|
||||
FAILURES=0
|
||||
|
||||
for entry in "${HEARTBEATS[@]}"; do
|
||||
name="${entry%%:*}"
|
||||
max_minutes="${entry##*:}"
|
||||
file="${HEARTBEAT_DIR}/${name}.last"
|
||||
|
||||
if [[ ! -f "$file" ]]; then
|
||||
log "MISSING: $name heartbeat file not found ($file)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
continue
|
||||
fi
|
||||
|
||||
LAST_EPOCH=$(stat -c %Y "$file")
|
||||
AGE_MIN=$(( (NOW_EPOCH - LAST_EPOCH) / 60 ))
|
||||
|
||||
if [[ $AGE_MIN -gt $max_minutes ]]; then
|
||||
log "STALE: $name is ${AGE_MIN}m old (max ${max_minutes}m)"
|
||||
FAILURES=$((FAILURES + 1))
|
||||
else
|
||||
log "OK: $name is ${AGE_MIN}m old"
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ $FAILURES -gt 0 ]]; then
|
||||
log "ALERT: $FAILURES stale/missing heartbeat(s) detected."
|
||||
exit 1
|
||||
else
|
||||
log "ALL_OK: All heartbeats healthy."
|
||||
fi
|
||||
70
scripts/review_gate.py
Normal file
70
scripts/review_gate.py
Normal file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Review Gate — Poka-yoke for unreviewed merges.
|
||||
Fails if the current PR has fewer than 1 approving review.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Review Approval Gate
|
||||
run: python scripts/review_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "")
|
||||
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
if not REPO:
|
||||
print("ERROR: GITEA_REPO not set")
|
||||
sys.exit(1)
|
||||
|
||||
pr_number = PR_NUMBER
|
||||
if not pr_number:
|
||||
# Try to infer from Gitea Actions environment
|
||||
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
||||
|
||||
if not pr_number:
|
||||
print("ERROR: Could not determine PR number")
|
||||
sys.exit(1)
|
||||
|
||||
reviews = api_call("GET", f"/repos/{REPO}/pulls/{pr_number}/reviews")
|
||||
if isinstance(reviews, dict) and "error" in reviews:
|
||||
print(f"ERROR fetching reviews: {reviews}")
|
||||
sys.exit(1)
|
||||
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if len(approvals) >= 1:
|
||||
print(f"OK: PR #{pr_number} has {len(approvals)} approving review(s).")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr_number} has no approving reviews.")
|
||||
print("Merges are not permitted without at least one approval.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
46
scripts/runner_health_probe.sh
Executable file
46
scripts/runner_health_probe.sh
Executable file
@@ -0,0 +1,46 @@
|
||||
#!/usr/bin/env bash
|
||||
# Gitea Runner Health Probe — Poka-yoke for unregistered runners
|
||||
set -euo pipefail
|
||||
|
||||
GITEA_TOKEN="${GITEA_TOKEN:-}"
|
||||
GITEA_URL="https://forge.alexanderwhitestone.com"
|
||||
ALERT_LOG="/var/log/bezalel_runner_health.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
if [[ -z "$GITEA_TOKEN" ]]; then
|
||||
log "ERROR: GITEA_TOKEN not set"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ACTIVE_RUNNERS=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
|
||||
log "Active runners: ${ACTIVE_RUNNERS}"
|
||||
|
||||
if [[ "$ACTIVE_RUNNERS" -eq 0 ]]; then
|
||||
log "CRITICAL: Zero active runners detected. Attempting self-healing restart."
|
||||
pkill -f "act_runner daemon" 2>/dev/null || true
|
||||
sleep 2
|
||||
cd /opt/gitea-runner && nohup ./act_runner daemon > /var/log/gitea-runner.log 2>&1 &
|
||||
sleep 3
|
||||
# Re-check
|
||||
ACTIVE_RUNNERS_AFTER=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
|
||||
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
|
||||
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
|
||||
log "Active runners after restart: ${ACTIVE_RUNNERS_AFTER}"
|
||||
if [[ "$ACTIVE_RUNNERS_AFTER" -eq 0 ]]; then
|
||||
log "CRITICAL: Self-healing failed. Runner still offline."
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
exit 1
|
||||
else
|
||||
log "RECOVERED: Runner back online."
|
||||
fi
|
||||
else
|
||||
log "OK: ${ACTIVE_RUNNERS} runner(s) online."
|
||||
fi
|
||||
|
||||
touch /var/lib/bezalel/heartbeats/runner_health.last
|
||||
50
scripts/secret_guard.sh
Executable file
50
scripts/secret_guard.sh
Executable file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/env bash
|
||||
# Secret Guard — Poka-yoke for world-readable credentials
|
||||
set -euo pipefail
|
||||
|
||||
ALERT_LOG="/var/log/bezalel_secret_guard.log"
|
||||
QUARANTINE_DIR="/root/wizards/bezalel/home/quarantine"
|
||||
|
||||
mkdir -p "$QUARANTINE_DIR"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
|
||||
# Scan for world-readable files with sensitive keywords in /root, /home, /etc, /tmp, /var/log
|
||||
# Exclude binary files, large files (>1MB), and known safe paths
|
||||
BAD_FILES=$(find /root /home /etc /tmp /var/log -maxdepth 4 -type f -perm /o+r 2>/dev/null \
|
||||
! -path "*/.git/*" \
|
||||
! -path "*/node_modules/*" \
|
||||
! -path "*/venv/*" \
|
||||
! -path "*/.venv/*" \
|
||||
! -path "*/__pycache__/*" \
|
||||
! -path "*/.pyc" \
|
||||
! -size +1M \
|
||||
-exec grep -l -i -E 'password|token|secret|nsec|api_key|private_key|aws_access_key_id|aws_secret_access_key' {} + 2>/dev/null | head -50)
|
||||
|
||||
VIOLATIONS=0
|
||||
for file in $BAD_FILES; do
|
||||
# Skip if already quarantined
|
||||
if [[ "$file" == "$QUARANTINE_DIR"* ]]; then
|
||||
continue
|
||||
fi
|
||||
# Skip log files that are expected to be world-readable
|
||||
if [[ "$file" == /var/log/* ]]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
VIOLATIONS=$((VIOLATIONS + 1))
|
||||
basename=$(basename "$file")
|
||||
quarantine_path="${QUARANTINE_DIR}/${basename}.$(date +%s)"
|
||||
cp "$file" "$quarantine_path"
|
||||
chmod 600 "$quarantine_path"
|
||||
chmod 600 "$file"
|
||||
log "QUARANTINED: $file -> $quarantine_path (permissions fixed to 600)"
|
||||
done
|
||||
|
||||
if [[ $VIOLATIONS -gt 0 ]]; then
|
||||
log "ALERT: $VIOLATIONS world-readable secret file(s) detected and quarantined."
|
||||
else
|
||||
log "OK: No world-readable secret files found."
|
||||
fi
|
||||
77
scripts/staging_gate.py
Normal file
77
scripts/staging_gate.py
Normal file
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Staging Gate — Poka-yoke for production deployments.
|
||||
Checks if the PR that introduced the current commit was marked `staging-verified`.
|
||||
Fails the workflow if not, blocking deploy.yml from proceeding.
|
||||
|
||||
Usage in Gitea workflow:
|
||||
- name: Staging Verification Gate
|
||||
run: python scripts/staging_gate.py
|
||||
env:
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
from urllib import request, error
|
||||
|
||||
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
REPO = os.environ.get("GITEA_REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
|
||||
def api_call(method, path):
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
||||
req = request.Request(url, method=method, headers=headers)
|
||||
try:
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except error.HTTPError as e:
|
||||
return {"error": e.read().decode(), "status": e.code}
|
||||
|
||||
|
||||
def get_commit_sha():
|
||||
result = subprocess.run(["git", "rev-parse", "HEAD"], capture_output=True, text=True)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def get_pr_for_commit(sha):
|
||||
# Search open and closed PRs for this commit
|
||||
for state in ["closed", "open"]:
|
||||
prs = api_call("GET", f"/repos/{REPO}/pulls?state={state}&limit=50")
|
||||
if isinstance(prs, list):
|
||||
for pr in prs:
|
||||
if pr.get("merge_commit_sha") == sha:
|
||||
return pr
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
sys.exit(1)
|
||||
|
||||
sha = get_commit_sha()
|
||||
pr = get_pr_for_commit(sha)
|
||||
|
||||
if not pr:
|
||||
# Direct push to main without PR — block unless explicitly forced
|
||||
print("WARNING: No PR found for this commit. Blocking deploy as a safety measure.")
|
||||
print("To bypass, merge via PR and add the 'staging-verified' label.")
|
||||
sys.exit(1)
|
||||
|
||||
labels = {label["name"] for label in pr.get("labels", [])}
|
||||
if "staging-verified" in labels:
|
||||
print(f"OK: PR #{pr['number']} has 'staging-verified' label. Deploy permitted.")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print(f"BLOCKED: PR #{pr['number']} is missing the 'staging-verified' label.")
|
||||
print("Deploy to production is not permitted until staging is verified.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
81
scripts/sync_branch_protection.py
Normal file
81
scripts/sync_branch_protection.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
|
||||
Correctly uses the Gitea 1.25+ API (not GitHub-style).
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
import yaml
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
CONFIG_DIR = ".gitea/branch-protection"
|
||||
|
||||
|
||||
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode() if payload else None
|
||||
req = urllib.request.Request(url, data=data, method=method, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def apply_protection(repo: str, rules: dict) -> bool:
|
||||
branch = rules.pop("branch", "main")
|
||||
# Check if protection already exists
|
||||
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
|
||||
exists = any(r.get("branch_name") == branch for r in existing)
|
||||
|
||||
payload = {
|
||||
"branch_name": branch,
|
||||
"rule_name": branch,
|
||||
"required_approvals": rules.get("required_approvals", 1),
|
||||
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
|
||||
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
|
||||
"block_deletions": rules.get("block_deletions", True),
|
||||
"block_force_push": rules.get("block_force_push", True),
|
||||
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
|
||||
"enable_status_check": rules.get("require_ci_to_merge", False),
|
||||
"status_check_contexts": rules.get("status_check_contexts", []),
|
||||
}
|
||||
|
||||
try:
|
||||
if exists:
|
||||
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
|
||||
else:
|
||||
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
|
||||
print(f"✅ {repo}:{branch} synced")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ {repo}:{branch} failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN not set")
|
||||
return 1
|
||||
|
||||
ok = 0
|
||||
for fname in os.listdir(CONFIG_DIR):
|
||||
if not fname.endswith(".yml"):
|
||||
continue
|
||||
repo = fname[:-4]
|
||||
with open(os.path.join(CONFIG_DIR, fname)) as f:
|
||||
cfg = yaml.safe_load(f)
|
||||
if apply_protection(repo, cfg.get("rules", {})):
|
||||
ok += 1
|
||||
|
||||
print(f"\nSynced {ok} repo(s)")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
30
scripts/sync_fleet_to_alpha.sh
Normal file
30
scripts/sync_fleet_to_alpha.sh
Normal file
@@ -0,0 +1,30 @@
|
||||
#!/usr/bin/env bash
|
||||
# Sync Fleet MemPalace from Beta to Alpha
|
||||
# Usage: ./sync_fleet_to_alpha.sh
|
||||
set -euo pipefail
|
||||
|
||||
FLEET_DIR="/var/lib/mempalace/fleet"
|
||||
ALPHA_HOST="167.99.126.228"
|
||||
ALPHA_USER="root"
|
||||
ALPHA_DEST="/var/lib/mempalace/fleet"
|
||||
LOG="/var/log/bezalel_alpha_sync.log"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
|
||||
}
|
||||
|
||||
log "Starting fleet palace sync to Alpha (${ALPHA_HOST})..."
|
||||
|
||||
# Ensure Alpha destination exists (SSH must be configured key-based or agent-forwarded)
|
||||
ssh -o ConnectTimeout=10 "${ALPHA_USER}@${ALPHA_HOST}" "mkdir -p ${ALPHA_DEST}" || {
|
||||
log "ERROR: Cannot reach Alpha host. Aborting."
|
||||
exit 1
|
||||
}
|
||||
|
||||
# rsync the fleet palace directory (ChromaDB files + incoming closets)
|
||||
rsync -avz --delete \
|
||||
-e "ssh -o ConnectTimeout=10" \
|
||||
"${FLEET_DIR}/" \
|
||||
"${ALPHA_USER}@${ALPHA_HOST}:${ALPHA_DEST}/" >> "$LOG" 2>&1
|
||||
|
||||
log "Fleet palace sync complete."
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.service
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — stale cron detection (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
ExecStart=/root/wizards/the-nexus/bin/bezalel_heartbeat_check.py
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
Environment=HOME=/root
|
||||
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
11
scripts/systemd/bezalel-meta-heartbeat.timer
Normal file
@@ -0,0 +1,11 @@
|
||||
[Unit]
|
||||
Description=Bezalel Meta-Heartbeat — fires every 15 minutes (poka-yoke #1096)
|
||||
Documentation=https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
|
||||
[Timer]
|
||||
OnBootSec=5min
|
||||
OnUnitActiveSec=15min
|
||||
Persistent=true
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
123
scripts/validate_mempalace_taxonomy.py
Normal file
123
scripts/validate_mempalace_taxonomy.py
Normal file
@@ -0,0 +1,123 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Validate a wizard's mempalace.yaml against the fleet taxonomy standard.
|
||||
|
||||
Usage:
|
||||
python validate_mempalace_taxonomy.py /path/to/mempalace.yaml
|
||||
python validate_mempalace_taxonomy.py --ci /path/to/mempalace.yaml
|
||||
|
||||
Exit codes:
|
||||
0 = valid
|
||||
1 = missing required rooms or other violations
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
REQUIRED_ROOMS = {
|
||||
"forge",
|
||||
"hermes",
|
||||
"nexus",
|
||||
"issues",
|
||||
"experiments",
|
||||
}
|
||||
|
||||
|
||||
def load_standard():
|
||||
# Try to find the fleet standard in the-nexus clone or local path
|
||||
candidates = [
|
||||
Path(__file__).parent.parent / "mempalace_taxonomy.yaml",
|
||||
Path("/tmp/nexus_clone/docs/mempalace_taxonomy.yaml"),
|
||||
Path(__file__).parent.parent.parent / "the-nexus" / "docs" / "mempalace_taxonomy.yaml",
|
||||
]
|
||||
for c in candidates:
|
||||
if c.exists():
|
||||
with open(c) as f:
|
||||
return yaml.safe_load(f)
|
||||
return None
|
||||
|
||||
|
||||
def validate(path: Path):
|
||||
errors = []
|
||||
warnings = []
|
||||
|
||||
if not path.exists():
|
||||
errors.append(f"File not found: {path}")
|
||||
return errors, warnings
|
||||
|
||||
with open(path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data:
|
||||
errors.append("Empty or invalid YAML")
|
||||
return errors, warnings
|
||||
|
||||
rooms = data.get("rooms", data.get("wings", {}).get("bezalel", {}).get("rooms", []))
|
||||
if isinstance(rooms, list) and rooms and isinstance(rooms[0], dict):
|
||||
room_names = {r.get("name") for r in rooms if isinstance(r, dict)}
|
||||
elif isinstance(rooms, dict):
|
||||
room_names = set(rooms.keys())
|
||||
else:
|
||||
room_names = set()
|
||||
|
||||
missing = REQUIRED_ROOMS - room_names
|
||||
if missing:
|
||||
errors.append(f"Missing required rooms: {', '.join(sorted(missing))}")
|
||||
|
||||
# Check for duplicate room names
|
||||
if len(room_names) < len(list(rooms) if isinstance(rooms, list) else rooms):
|
||||
errors.append("Duplicate room names detected")
|
||||
|
||||
# Check for empty keywords
|
||||
if isinstance(rooms, list):
|
||||
for r in rooms:
|
||||
if isinstance(r, dict):
|
||||
kw = r.get("keywords", [])
|
||||
if not kw:
|
||||
warnings.append(f"Room '{r.get('name')}' has no keywords")
|
||||
|
||||
standard = load_standard()
|
||||
if standard:
|
||||
std_optional = set(standard.get("optional_rooms", {}).keys())
|
||||
unknown = room_names - REQUIRED_ROOMS - std_optional
|
||||
if unknown:
|
||||
warnings.append(f"Non-standard rooms (OK but not in fleet spec): {', '.join(sorted(unknown))}")
|
||||
|
||||
return errors, warnings
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Validate MemPalace taxonomy")
|
||||
parser.add_argument("config", help="Path to mempalace.yaml")
|
||||
parser.add_argument("--ci", action="store_true", help="CI mode: fail on warnings too")
|
||||
args = parser.parse_args()
|
||||
|
||||
errors, warnings = validate(Path(args.config))
|
||||
|
||||
if warnings:
|
||||
for w in warnings:
|
||||
print(f"WARNING: {w}")
|
||||
|
||||
if errors:
|
||||
for e in errors:
|
||||
print(f"ERROR: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if args.ci and warnings:
|
||||
print("Validation failed in CI mode (warnings treated as errors)")
|
||||
sys.exit(1)
|
||||
|
||||
print("OK: Taxonomy validation passed")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,15 +1,43 @@
|
||||
"""Pytest configuration for the test suite."""
|
||||
import re
|
||||
import pytest
|
||||
|
||||
# Configure pytest-asyncio mode
|
||||
pytest_plugins = ["pytest_asyncio"]
|
||||
|
||||
# Pattern that constitutes a valid issue link in a skip reason.
|
||||
# Accepts: #NNN, https?://..., or JIRA-NNN style keys.
|
||||
_ISSUE_LINK_RE = re.compile(
|
||||
r"(#\d+|https?://\S+|[A-Z]+-\d+)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _has_issue_link(reason: str) -> bool:
|
||||
"""Return True if *reason* contains a recognisable issue reference."""
|
||||
return bool(_ISSUE_LINK_RE.search(reason or ""))
|
||||
|
||||
|
||||
def _skip_reason(report) -> str:
|
||||
"""Extract the human-readable skip reason from a pytest report."""
|
||||
longrepr = getattr(report, "longrepr", None)
|
||||
if longrepr is None:
|
||||
return ""
|
||||
if isinstance(longrepr, tuple) and len(longrepr) >= 3:
|
||||
# (filename, lineno, "Skipped: <reason>")
|
||||
return str(longrepr[2])
|
||||
return str(longrepr)
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
"""Configure pytest."""
|
||||
config.addinivalue_line(
|
||||
"markers", "integration: mark test as integration test (requires MCP servers)"
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"quarantine: mark test as quarantined (flaky/broken, tracked by issue)",
|
||||
)
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
@@ -20,6 +48,12 @@ def pytest_addoption(parser):
|
||||
default=False,
|
||||
help="Run integration tests that require MCP servers",
|
||||
)
|
||||
parser.addoption(
|
||||
"--no-skip-enforcement",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Disable poka-yoke enforcement of issue-linked skip reasons (CI escape hatch)",
|
||||
)
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
@@ -31,3 +65,60 @@ def pytest_collection_modifyitems(config, items):
|
||||
for item in items:
|
||||
if "integration" in item.keywords:
|
||||
item.add_marker(skip_integration)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POKA-YOKE: Treat skipped tests as failures unless they carry an issue link.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True)
|
||||
def pytest_runtest_makereport(item, call):
|
||||
"""Intercept skipped reports and fail them if they lack an issue link.
|
||||
|
||||
Exceptions:
|
||||
* Tests in tests/quarantine/ — explicitly quarantined, issue link required
|
||||
on the quarantine marker, not the skip marker.
|
||||
* Tests using environment-variable-based ``skipif`` conditions — these are
|
||||
legitimate CI gates (RUN_INTEGRATION_TESTS, RUN_LIVE_TESTS, etc.) where
|
||||
the *condition* is the gate, not a developer opt-out. We allow these
|
||||
only when the skip reason mentions a recognised env-var pattern.
|
||||
* --no-skip-enforcement flag set (emergency escape hatch).
|
||||
"""
|
||||
outcome = yield
|
||||
report = outcome.get_result()
|
||||
|
||||
if not report.skipped:
|
||||
return
|
||||
|
||||
# Escape hatch for emergency use.
|
||||
if item.config.getoption("--no-skip-enforcement", default=False):
|
||||
return
|
||||
|
||||
reason = _skip_reason(report)
|
||||
|
||||
# Allow quarantined tests — they are tracked by their quarantine marker.
|
||||
if "quarantine" in item.keywords:
|
||||
return
|
||||
|
||||
# Allow env-var-gated skipif conditions. These come from the
|
||||
# pytest_collection_modifyitems integration gate above, or from
|
||||
# explicit @pytest.mark.skipif(..., reason="... requires ENV=1 ...")
|
||||
_ENV_GATE_RE = re.compile(r"(require|needs|set)\s+\w+=[^\s]+", re.IGNORECASE)
|
||||
if _ENV_GATE_RE.search(reason):
|
||||
return
|
||||
|
||||
# Allow skips added by the integration gate in this very conftest.
|
||||
if "require --run-integration" in reason:
|
||||
return
|
||||
|
||||
# Anything else needs an issue link.
|
||||
if not _has_issue_link(reason):
|
||||
report.outcome = "failed"
|
||||
report.longrepr = (
|
||||
"[POKA-YOKE] Skip without issue link is not allowed.\n"
|
||||
f" Reason given: {reason!r}\n"
|
||||
" Fix: add an issue reference to the skip reason, e.g.:\n"
|
||||
" @pytest.mark.skip(reason='Broken until #NNN is resolved')\n"
|
||||
" Or quarantine the test: move it to tests/quarantine/ and\n"
|
||||
" file an issue — see docs/QUARANTINE_PROCESS.md"
|
||||
)
|
||||
|
||||
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
16
tests/fixtures/fleet_palace/bezalel/forge.closet.json
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "forge",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "CI pipeline green on main. All 253 tests passing.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
},
|
||||
{
|
||||
"text": "Deployed nexus heartbeat cron fix to Beta. Poka-yoke checks pass.",
|
||||
"source_file": "forge.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/hermes.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "hermes",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "Hermes gateway v2 deployed. MCP tools registered: mempalace, gitea, cron.",
|
||||
"source_file": "hermes.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
11
tests/fixtures/fleet_palace/bezalel/issues.closet.json
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"wizard": "bezalel",
|
||||
"room": "issues",
|
||||
"drawers": [
|
||||
{
|
||||
"text": "MemPalace x Evennia milestone: 6 of 8 issues closed. #1078 and #1083 in progress.",
|
||||
"source_file": "issues.closet.json",
|
||||
"closet": true
|
||||
}
|
||||
]
|
||||
}
|
||||
28
tests/quarantine/README.md
Normal file
28
tests/quarantine/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# tests/quarantine/
|
||||
|
||||
This directory holds tests that have been **quarantined** because they are
|
||||
flaky or temporarily broken. Quarantine keeps the main test suite green while
|
||||
ensuring no test is silently deleted or forgotten.
|
||||
|
||||
## Rules
|
||||
|
||||
1. Every file here must correspond to an open issue.
|
||||
2. Every test here must carry `@pytest.mark.quarantine(reason="#NNN")`.
|
||||
3. Quarantined tests are **excluded from the default CI run** but are included
|
||||
when you pass `--run-quarantine`.
|
||||
4. The quarantine is a **temporary holding area**, not a graveyard. If a
|
||||
quarantined test's issue has been open for more than 30 days with no
|
||||
progress, escalate it.
|
||||
|
||||
## Adding a test
|
||||
|
||||
```python
|
||||
# tests/quarantine/test_my_flaky_thing.py
|
||||
import pytest
|
||||
|
||||
@pytest.mark.quarantine(reason="Flaky: #1234")
|
||||
def test_my_flaky_thing():
|
||||
...
|
||||
```
|
||||
|
||||
See `docs/QUARANTINE_PROCESS.md` for the complete workflow.
|
||||
2
tests/quarantine/__init__.py
Normal file
2
tests/quarantine/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
# Quarantined tests live here.
|
||||
# See docs/QUARANTINE_PROCESS.md for the full quarantine workflow.
|
||||
334
tests/test_bezalel_heartbeat.py
Normal file
334
tests/test_bezalel_heartbeat.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""
|
||||
Tests for Bezalel Cron Heartbeat system (poka-yoke #1096).
|
||||
|
||||
Validates:
|
||||
- check_cron_heartbeats() with healthy and stale jobs
|
||||
- Empty heartbeat dir (no .last files) returns safely
|
||||
- Corrupt JSON in a .last file is handled gracefully
|
||||
- Mixed healthy/stale jobs
|
||||
- Alert file writing (write_alert)
|
||||
- The 2× interval staleness threshold is applied correctly
|
||||
|
||||
Uses importlib to load bin/bezalel_heartbeat_check.py without __init__.py,
|
||||
following the same pattern as test_nexus_watchdog.py.
|
||||
|
||||
Refs: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Load module under test ────────────────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"bezalel_heartbeat_check_test",
|
||||
PROJECT_ROOT / "bin" / "bezalel_heartbeat_check.py",
|
||||
)
|
||||
_hb_mod = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["bezalel_heartbeat_check_test"] = _hb_mod
|
||||
_hb_spec.loader.exec_module(_hb_mod)
|
||||
|
||||
check_cron_heartbeats = _hb_mod.check_cron_heartbeats
|
||||
write_alert = _hb_mod.write_alert
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def write_heartbeat_file(
|
||||
directory: Path,
|
||||
job: str,
|
||||
timestamp: float,
|
||||
interval: int = 3600,
|
||||
pid: int = 12345,
|
||||
) -> Path:
|
||||
"""Write a valid .last heartbeat file for testing."""
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"job": job,
|
||||
"timestamp": timestamp,
|
||||
"interval": interval,
|
||||
"pid": pid,
|
||||
}
|
||||
path = directory / f"{job}.last"
|
||||
path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
# ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCheckCronHeartbeats:
|
||||
|
||||
def test_healthy_job(self, tmp_path: Path) -> None:
|
||||
"""A job with a recent timestamp is reported as healthy."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "morning-report", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
assert len(result["jobs"]) == 1
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "morning-report"
|
||||
assert job["healthy"] is True
|
||||
assert job["age_secs"] == pytest.approx(100, abs=5)
|
||||
assert "OK" in job["message"]
|
||||
|
||||
def test_stale_job(self, tmp_path: Path) -> None:
|
||||
"""A job silent for > 2× its interval is reported as stale."""
|
||||
now = time.time()
|
||||
# 3 hours ago with 1-hour interval → 3 > 2×1 → stale
|
||||
write_heartbeat_file(tmp_path, "hourly-sync", timestamp=now - 10800, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "hourly-sync"
|
||||
assert job["healthy"] is False
|
||||
assert "STALE" in job["message"]
|
||||
assert "exceeds 2x threshold" in job["message"]
|
||||
|
||||
def test_just_within_threshold(self, tmp_path: Path) -> None:
|
||||
"""A job at exactly 2× interval is NOT stale (threshold is strictly >)."""
|
||||
fake_now = 1700000000.0
|
||||
# age = 7200, threshold = 2 * 3600 = 7200 — NOT stale (not strictly greater)
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=fake_now - 7200, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
# age_secs == 7200 and threshold = 7200, so not stale (age > threshold is False)
|
||||
assert result["stale_count"] == 0
|
||||
|
||||
def test_stale_threshold_just_over(self, tmp_path: Path) -> None:
|
||||
"""A job silent for 2× interval + 1 second is stale."""
|
||||
now = time.time()
|
||||
# age = 7201, threshold = 7200 — IS stale
|
||||
write_heartbeat_file(tmp_path, "edge-job", timestamp=now - 7201, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
|
||||
def test_empty_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Empty heartbeat directory returns zero jobs without error."""
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
assert "checked_at" in result
|
||||
|
||||
def test_nonexistent_dir_returns_safely(self, tmp_path: Path) -> None:
|
||||
"""Non-existent heartbeat dir returns empty result without error."""
|
||||
missing = str(tmp_path / "does-not-exist")
|
||||
result = check_cron_heartbeats(missing)
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 0
|
||||
assert result["jobs"] == []
|
||||
|
||||
def test_corrupt_json_handled_gracefully(self, tmp_path: Path) -> None:
|
||||
"""Corrupt JSON in a .last file is reported as stale with an error message."""
|
||||
bad_file = tmp_path / "broken-job.last"
|
||||
bad_file.write_text("{this is not valid json!}", encoding="utf-8")
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
assert result["healthy_count"] == 0
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["job"] == "broken-job"
|
||||
assert job["healthy"] is False
|
||||
assert "CORRUPT" in job["message"]
|
||||
assert job["last_seen"] is None
|
||||
|
||||
def test_multiple_jobs_mixed(self, tmp_path: Path) -> None:
|
||||
"""Mixed healthy and stale jobs are correctly counted."""
|
||||
now = time.time()
|
||||
|
||||
# 3 healthy jobs (recent)
|
||||
write_heartbeat_file(tmp_path, "job-a", timestamp=now - 60, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-b", timestamp=now - 1800, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-c", timestamp=now - 3599, interval=3600)
|
||||
|
||||
# 2 stale jobs
|
||||
write_heartbeat_file(tmp_path, "job-d", timestamp=now - 10000, interval=3600)
|
||||
write_heartbeat_file(tmp_path, "job-e", timestamp=now - 86400, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 2
|
||||
assert result["healthy_count"] == 3
|
||||
assert len(result["jobs"]) == 5
|
||||
|
||||
stale_jobs = {j["job"] for j in result["jobs"] if not j["healthy"]}
|
||||
healthy_jobs = {j["job"] for j in result["jobs"] if j["healthy"]}
|
||||
assert stale_jobs == {"job-d", "job-e"}
|
||||
assert healthy_jobs == {"job-a", "job-b", "job-c"}
|
||||
|
||||
def test_result_contains_required_keys(self, tmp_path: Path) -> None:
|
||||
"""Result dict contains all required keys."""
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "test-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert "checked_at" in result
|
||||
assert "jobs" in result
|
||||
assert "stale_count" in result
|
||||
assert "healthy_count" in result
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert "job" in job
|
||||
assert "healthy" in job
|
||||
assert "age_secs" in job
|
||||
assert "interval" in job
|
||||
assert "last_seen" in job
|
||||
assert "message" in job
|
||||
|
||||
def test_job_last_seen_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""last_seen field is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
now = time.time()
|
||||
write_heartbeat_file(tmp_path, "ts-job", timestamp=now - 100, interval=3600)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
job = result["jobs"][0]
|
||||
|
||||
# Should be parseable as an ISO timestamp
|
||||
assert job["last_seen"] is not None
|
||||
dt = datetime.fromisoformat(job["last_seen"])
|
||||
assert dt is not None
|
||||
|
||||
def test_checked_at_is_iso_timestamp(self, tmp_path: Path) -> None:
|
||||
"""checked_at is a valid ISO 8601 timestamp string."""
|
||||
from datetime import datetime
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
dt = datetime.fromisoformat(result["checked_at"])
|
||||
assert dt is not None
|
||||
|
||||
def test_custom_interval_applied(self, tmp_path: Path) -> None:
|
||||
"""Custom interval (e.g. daily) is respected for stale detection."""
|
||||
now = time.time()
|
||||
# 25 hours ago with 12-hour interval → 25 > 2×12 = 24 → stale
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 90000, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 1
|
||||
job = result["jobs"][0]
|
||||
assert job["interval"] == 43200
|
||||
assert not job["healthy"]
|
||||
|
||||
def test_custom_interval_healthy(self, tmp_path: Path) -> None:
|
||||
"""Job within 2× custom interval is healthy."""
|
||||
now = time.time()
|
||||
# 23 hours ago with 12-hour interval → 23 < 2×12 = 24 → healthy
|
||||
write_heartbeat_file(tmp_path, "daily-job", timestamp=now - 82800, interval=43200)
|
||||
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
assert result["stale_count"] == 0
|
||||
assert result["healthy_count"] == 1
|
||||
|
||||
def test_deterministic_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Test with mocked time.time() for fully deterministic assertion."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
write_heartbeat_file(tmp_path, "frozen-job", timestamp=fake_now - 500, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
# age should be exactly 500s
|
||||
assert job["age_secs"] == pytest.approx(500.0, abs=0.01)
|
||||
assert job["healthy"] is True # 500 < 7200
|
||||
|
||||
def test_stale_with_mocked_time(self, tmp_path: Path) -> None:
|
||||
"""Stale detection with mocked time is exact."""
|
||||
fake_now = 1700000000.0
|
||||
|
||||
# 8000s ago with 3600s interval → 8000 > 7200 → stale
|
||||
write_heartbeat_file(tmp_path, "frozen-stale", timestamp=fake_now - 8000, interval=3600)
|
||||
|
||||
with patch("time.time", return_value=fake_now):
|
||||
result = check_cron_heartbeats(str(tmp_path))
|
||||
|
||||
job = result["jobs"][0]
|
||||
assert job["age_secs"] == pytest.approx(8000.0, abs=0.01)
|
||||
assert job["healthy"] is False
|
||||
|
||||
|
||||
class TestWriteAlert:
|
||||
|
||||
def test_alert_file_created(self, tmp_path: Path) -> None:
|
||||
"""write_alert creates an alert file in the alerts subdirectory."""
|
||||
job_info = {
|
||||
"job": "test-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-01-01T00:00:00+00:00",
|
||||
"message": "STALE (last 8000s ago, interval 3600s — exceeds 2x threshold of 7200s)",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "test-job.alert"
|
||||
assert alert_file.exists()
|
||||
|
||||
def test_alert_file_content(self, tmp_path: Path) -> None:
|
||||
"""Alert file contains correct JSON fields."""
|
||||
job_info = {
|
||||
"job": "my-job",
|
||||
"healthy": False,
|
||||
"age_secs": 9000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": "2024-06-01T12:00:00+00:00",
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alert_file = tmp_path / "alerts" / "my-job.alert"
|
||||
data = json.loads(alert_file.read_text())
|
||||
|
||||
assert data["alert_level"] == "P1"
|
||||
assert data["job"] == "my-job"
|
||||
assert data["age_secs"] == 9000.0
|
||||
assert data["interval"] == 3600
|
||||
assert "detected_at" in data
|
||||
|
||||
def test_alert_no_partial_files_left(self, tmp_path: Path) -> None:
|
||||
"""No temp files remain after a successful write."""
|
||||
job_info = {
|
||||
"job": "clean-job",
|
||||
"healthy": False,
|
||||
"age_secs": 8000.0,
|
||||
"interval": 3600,
|
||||
"last_seen": None,
|
||||
"message": "STALE",
|
||||
}
|
||||
write_alert(str(tmp_path), job_info)
|
||||
|
||||
alerts_dir = tmp_path / "alerts"
|
||||
# Only the .alert file should exist — no .tmp files
|
||||
files = list(alerts_dir.iterdir())
|
||||
assert len(files) == 1
|
||||
assert files[0].suffix == ".alert"
|
||||
341
tests/test_cron_heartbeats.py
Normal file
341
tests/test_cron_heartbeats.py
Normal file
@@ -0,0 +1,341 @@
|
||||
"""Tests for the poka-yoke cron heartbeat system.
|
||||
|
||||
Covers:
|
||||
- nexus/cron_heartbeat.py (write utility)
|
||||
- bin/check_cron_heartbeats.py (meta-heartbeat checker)
|
||||
|
||||
Refs: #1096
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Load modules under test ───────────────────────────────────────────
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
|
||||
_hb_spec = importlib.util.spec_from_file_location(
|
||||
"_cron_heartbeat",
|
||||
PROJECT_ROOT / "nexus" / "cron_heartbeat.py",
|
||||
)
|
||||
_hb = importlib.util.module_from_spec(_hb_spec)
|
||||
sys.modules["_cron_heartbeat"] = _hb
|
||||
_hb_spec.loader.exec_module(_hb)
|
||||
|
||||
_chk_spec = importlib.util.spec_from_file_location(
|
||||
"_check_cron_heartbeats",
|
||||
PROJECT_ROOT / "bin" / "check_cron_heartbeats.py",
|
||||
)
|
||||
_chk = importlib.util.module_from_spec(_chk_spec)
|
||||
sys.modules["_check_cron_heartbeats"] = _chk
|
||||
_chk_spec.loader.exec_module(_chk)
|
||||
|
||||
write_cron_heartbeat = _hb.write_cron_heartbeat
|
||||
heartbeat_path = _hb.heartbeat_path
|
||||
|
||||
scan_heartbeats = _chk.scan_heartbeats
|
||||
build_report = _chk.build_report
|
||||
HeartbeatReport = _chk.HeartbeatReport
|
||||
JobStatus = _chk.JobStatus
|
||||
_read_job_status = _chk._read_job_status
|
||||
_fmt_duration = _chk._fmt_duration
|
||||
|
||||
|
||||
# ── nexus/cron_heartbeat.py ───────────────────────────────────────────
|
||||
|
||||
class TestWriteCronHeartbeat:
|
||||
def test_creates_file(self, tmp_path):
|
||||
"""write_cron_heartbeat creates <job>.last in the given directory."""
|
||||
path = write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
|
||||
assert path == tmp_path / "my_job.last"
|
||||
assert path.exists()
|
||||
|
||||
def test_file_content(self, tmp_path):
|
||||
"""Written file has all required fields."""
|
||||
write_cron_heartbeat("my_job", interval_seconds=600, status="ok", directory=tmp_path)
|
||||
data = json.loads((tmp_path / "my_job.last").read_text())
|
||||
assert data["job"] == "my_job"
|
||||
assert data["interval_seconds"] == 600
|
||||
assert data["status"] == "ok"
|
||||
assert data["pid"] == os.getpid()
|
||||
assert abs(data["timestamp"] - time.time()) < 2
|
||||
|
||||
def test_atomic_write_no_temp_files(self, tmp_path):
|
||||
"""No temporary files remain after a successful write."""
|
||||
write_cron_heartbeat("my_job", interval_seconds=300, directory=tmp_path)
|
||||
files = list(tmp_path.iterdir())
|
||||
assert len(files) == 1
|
||||
assert files[0].name == "my_job.last"
|
||||
|
||||
def test_overwrites_cleanly(self, tmp_path):
|
||||
"""Successive writes update, not append."""
|
||||
write_cron_heartbeat("j", interval_seconds=60, status="ok", directory=tmp_path)
|
||||
write_cron_heartbeat("j", interval_seconds=60, status="warn", directory=tmp_path)
|
||||
data = json.loads((tmp_path / "j.last").read_text())
|
||||
assert data["status"] == "warn"
|
||||
|
||||
def test_creates_parent_dirs(self, tmp_path):
|
||||
"""Parent directories are created as needed."""
|
||||
deep_dir = tmp_path / "a" / "b" / "c"
|
||||
write_cron_heartbeat("j", interval_seconds=60, directory=deep_dir)
|
||||
assert (deep_dir / "j.last").exists()
|
||||
|
||||
def test_heartbeat_path_helper(self, tmp_path):
|
||||
"""heartbeat_path() returns the correct path without writing."""
|
||||
p = heartbeat_path("myjob", directory=tmp_path)
|
||||
assert p == tmp_path / "myjob.last"
|
||||
assert not p.exists()
|
||||
|
||||
def test_env_var_override(self, tmp_path, monkeypatch):
|
||||
"""BEZALEL_HEARTBEAT_DIR env var overrides the directory."""
|
||||
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
|
||||
# Call without directory= so it uses the env var
|
||||
path = write_cron_heartbeat("env_job", interval_seconds=120)
|
||||
assert path.parent == tmp_path
|
||||
|
||||
|
||||
# ── bin/check_cron_heartbeats.py ─────────────────────────────────────
|
||||
|
||||
class TestScanHeartbeats:
|
||||
def test_empty_dir(self, tmp_path):
|
||||
"""No .last files → empty list."""
|
||||
assert scan_heartbeats(tmp_path) == []
|
||||
|
||||
def test_nonexistent_dir(self, tmp_path):
|
||||
"""Missing directory → empty list (no exception)."""
|
||||
assert scan_heartbeats(tmp_path / "nope") == []
|
||||
|
||||
def test_healthy_job(self, tmp_path):
|
||||
"""Fresh heartbeat → healthy."""
|
||||
(tmp_path / "myjob.last").write_text(json.dumps({
|
||||
"job": "myjob",
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": 300,
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0].healthy is True
|
||||
assert jobs[0].job == "myjob"
|
||||
|
||||
def test_stale_job(self, tmp_path):
|
||||
"""Heartbeat older than 2× interval → stale."""
|
||||
(tmp_path / "slow.last").write_text(json.dumps({
|
||||
"job": "slow",
|
||||
"timestamp": time.time() - 700, # 11.7 minutes
|
||||
"interval_seconds": 300, # 5 min interval → ratio 2.33
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert jobs[0].healthy is False
|
||||
assert jobs[0].staleness_ratio > 2.0
|
||||
|
||||
def test_missing_heartbeat_file(self, tmp_path):
|
||||
"""_read_job_status handles a file that disappears mid-scan."""
|
||||
ghost_path = tmp_path / "ghost.last"
|
||||
status = _read_job_status("ghost", ghost_path)
|
||||
assert status.healthy is False
|
||||
assert "missing" in status.raw_status
|
||||
|
||||
def test_corrupt_heartbeat(self, tmp_path):
|
||||
"""Corrupt JSON → unhealthy with 'corrupt' status."""
|
||||
p = tmp_path / "bad.last"
|
||||
p.write_text("{not valid json")
|
||||
status = _read_job_status("bad", p)
|
||||
assert status.healthy is False
|
||||
assert "corrupt" in status.raw_status
|
||||
|
||||
def test_multiple_jobs(self, tmp_path):
|
||||
"""Multiple .last files are all reported."""
|
||||
for i, name in enumerate(["alpha", "beta", "gamma"]):
|
||||
(tmp_path / f"{name}.last").write_text(json.dumps({
|
||||
"job": name,
|
||||
"timestamp": time.time() - i * 10,
|
||||
"interval_seconds": 300,
|
||||
"pid": i + 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
jobs = scan_heartbeats(tmp_path)
|
||||
assert len(jobs) == 3
|
||||
job_names = {j.job for j in jobs}
|
||||
assert job_names == {"alpha", "beta", "gamma"}
|
||||
|
||||
def test_non_last_files_ignored(self, tmp_path):
|
||||
""".json and other extensions are ignored."""
|
||||
(tmp_path / "other.json").write_text("{}")
|
||||
(tmp_path / "notes.txt").write_text("hello")
|
||||
assert scan_heartbeats(tmp_path) == []
|
||||
|
||||
|
||||
class TestHeartbeatReport:
|
||||
def _fresh_job(self, name="j"):
|
||||
return JobStatus(
|
||||
job=name, path=Path(f"/tmp/{name}.last"),
|
||||
healthy=True, age_seconds=30, interval_seconds=300,
|
||||
staleness_ratio=0.1, last_timestamp=time.time() - 30,
|
||||
pid=1, raw_status="ok",
|
||||
message="Last beat 30s ago (ratio 0.1x)",
|
||||
)
|
||||
|
||||
def _stale_job(self, name="s"):
|
||||
return JobStatus(
|
||||
job=name, path=Path(f"/tmp/{name}.last"),
|
||||
healthy=False, age_seconds=700, interval_seconds=300,
|
||||
staleness_ratio=2.33, last_timestamp=time.time() - 700,
|
||||
pid=1, raw_status="stale",
|
||||
message="Silent for 11m 40s (2.3x interval of 5m 0s)",
|
||||
)
|
||||
|
||||
def test_overall_healthy(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
assert report.overall_healthy is True
|
||||
|
||||
def test_overall_unhealthy(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job(), self._stale_job()],
|
||||
)
|
||||
assert report.overall_healthy is False
|
||||
assert len(report.stale_jobs) == 1
|
||||
|
||||
def test_panel_markdown_contains_table(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job("alpha"), self._stale_job("beta")],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "## Heartbeat Panel" in panel
|
||||
assert "| `alpha` |" in panel
|
||||
assert "| `beta` |" in panel
|
||||
assert "STALE" in panel
|
||||
assert "OK" in panel
|
||||
assert "**Overall:** ALERT" in panel
|
||||
|
||||
def test_panel_markdown_no_jobs(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "no heartbeat files found" in panel
|
||||
|
||||
def test_panel_overall_ok(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
panel = report.to_panel_markdown()
|
||||
assert "**Overall:** OK" in panel
|
||||
|
||||
def test_alert_body_lists_stale_jobs(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._stale_job("slow")],
|
||||
)
|
||||
body = report.to_alert_body()
|
||||
assert "slow" in body
|
||||
assert "STALE" in body.upper() or "stale" in body.lower() or "silent" in body.lower()
|
||||
assert "crontab" in body.lower()
|
||||
|
||||
def test_to_json(self):
|
||||
report = HeartbeatReport(
|
||||
timestamp=time.time(),
|
||||
heartbeat_dir=Path("/tmp"),
|
||||
jobs=[self._fresh_job()],
|
||||
)
|
||||
data = report.to_json()
|
||||
assert data["healthy"] is True
|
||||
assert len(data["jobs"]) == 1
|
||||
assert data["jobs"][0]["job"] == "j"
|
||||
|
||||
|
||||
class TestFmtDuration:
|
||||
def test_seconds(self):
|
||||
assert _fmt_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _fmt_duration(90) == "1m 30s"
|
||||
|
||||
def test_hours(self):
|
||||
assert _fmt_duration(3661) == "1h 1m"
|
||||
|
||||
|
||||
class TestBuildReport:
|
||||
def test_build_report_with_dir(self, tmp_path):
|
||||
"""build_report() uses the given directory."""
|
||||
(tmp_path / "myjob.last").write_text(json.dumps({
|
||||
"job": "myjob",
|
||||
"timestamp": time.time(),
|
||||
"interval_seconds": 300,
|
||||
"pid": 1,
|
||||
"status": "ok",
|
||||
}))
|
||||
report = build_report(directory=tmp_path)
|
||||
assert len(report.jobs) == 1
|
||||
assert report.overall_healthy is True
|
||||
|
||||
def test_build_report_empty_dir(self, tmp_path):
|
||||
report = build_report(directory=tmp_path)
|
||||
assert report.jobs == []
|
||||
assert report.overall_healthy is True # nothing stale = healthy
|
||||
|
||||
|
||||
# ── Integration: nexus_watchdog writes its own heartbeat ─────────────
|
||||
|
||||
class TestWatchdogHeartbeatIntegration:
|
||||
"""Verify nexus_watchdog.py writes a cron heartbeat after run_once()."""
|
||||
|
||||
def test_watchdog_writes_cron_heartbeat(self, tmp_path, monkeypatch):
|
||||
"""After run_once, nexus_watchdog writes nexus_watchdog.last."""
|
||||
monkeypatch.setenv("BEZALEL_HEARTBEAT_DIR", str(tmp_path))
|
||||
|
||||
# Load watchdog module
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"_watchdog_hb_test",
|
||||
PROJECT_ROOT / "bin" / "nexus_watchdog.py",
|
||||
)
|
||||
wd = importlib.util.module_from_spec(spec)
|
||||
sys.modules["_watchdog_hb_test"] = wd
|
||||
|
||||
# Patch out network calls
|
||||
with patch("socket.socket") as mock_sock, \
|
||||
patch("subprocess.run") as mock_run:
|
||||
mock_sock.return_value.connect_ex.return_value = 111 # port closed
|
||||
mock_run.return_value = MagicMock(returncode=1, stdout="")
|
||||
|
||||
spec.loader.exec_module(wd)
|
||||
|
||||
args = MagicMock()
|
||||
args.ws_host = "localhost"
|
||||
args.ws_port = 8765
|
||||
args.heartbeat_path = str(tmp_path / "nexus_heartbeat.json")
|
||||
args.stale_threshold = 300
|
||||
args.dry_run = True # don't touch Gitea
|
||||
|
||||
wd.run_once(args)
|
||||
|
||||
hb_file = tmp_path / "nexus_watchdog.last"
|
||||
assert hb_file.exists(), "nexus_watchdog.last was not written"
|
||||
data = json.loads(hb_file.read_text())
|
||||
assert data["job"] == "nexus_watchdog"
|
||||
assert data["interval_seconds"] == 300
|
||||
239
tests/test_mempalace_fleet_api.py
Normal file
239
tests/test_mempalace_fleet_api.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""
|
||||
Tests for mempalace/fleet_api.py — Alpha-side HTTP fleet memory API.
|
||||
|
||||
Refs: #1078, #1075
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import handler directly so we can test without running a server process.
|
||||
from mempalace.fleet_api import FleetAPIHandler, _handle_health, _handle_search, _handle_wings, make_server
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeSocket:
|
||||
"""Minimal socket stub for BaseHTTPRequestHandler."""
|
||||
|
||||
def makefile(self, mode: str, *args, **kwargs): # noqa: ANN001
|
||||
return io.BytesIO(b"")
|
||||
|
||||
|
||||
def _make_handler(path: str = "/health") -> tuple[FleetAPIHandler, io.BytesIO]:
|
||||
"""Construct a handler pointed at *path*, capture wfile output."""
|
||||
buf = io.BytesIO()
|
||||
request = _FakeSocket()
|
||||
client_address = ("127.0.0.1", 0)
|
||||
|
||||
handler = FleetAPIHandler.__new__(FleetAPIHandler)
|
||||
handler.path = path
|
||||
handler.request = request
|
||||
handler.client_address = client_address
|
||||
handler.server = MagicMock()
|
||||
handler.wfile = buf
|
||||
handler.rfile = io.BytesIO(b"")
|
||||
handler.command = "GET"
|
||||
handler._headers_buffer = []
|
||||
|
||||
# Stub send_response / send_header / end_headers to write minimal HTTP
|
||||
handler._response_code = None
|
||||
def _send_response(code, message=None): # noqa: ANN001
|
||||
handler._response_code = code
|
||||
def _send_header(k, v): # noqa: ANN001
|
||||
pass
|
||||
def _end_headers(): # noqa: ANN001
|
||||
pass
|
||||
handler.send_response = _send_response
|
||||
handler.send_header = _send_header
|
||||
handler.end_headers = _end_headers
|
||||
|
||||
return handler, buf
|
||||
|
||||
|
||||
def _parse_response(buf: io.BytesIO) -> dict:
|
||||
buf.seek(0)
|
||||
return json.loads(buf.read())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /health
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_health_returns_ok(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is True
|
||||
|
||||
|
||||
def test_health_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/health")
|
||||
_handle_health(handler)
|
||||
data = _parse_response(buf)
|
||||
assert data["status"] == "ok"
|
||||
assert data["palace_exists"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /search
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_search_fleet(results):
|
||||
"""Return a patch target that returns *results*."""
|
||||
mock = MagicMock(return_value=results)
|
||||
return mock
|
||||
|
||||
|
||||
def _make_result(text="hello", room="forge", wing="bezalel", score=0.9):
|
||||
from nexus.mempalace.searcher import MemPalaceResult
|
||||
return MemPalaceResult(text=text, room=room, wing=wing, score=score)
|
||||
|
||||
|
||||
def test_search_missing_q_param():
|
||||
handler, buf = _make_handler("/search")
|
||||
_handle_search(handler, {})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_returns_results(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "chroma.sqlite3").touch()
|
||||
result = _make_result(text="CI green", room="forge", wing="bezalel", score=0.95)
|
||||
|
||||
with patch("mempalace.fleet_api.FleetAPIHandler") as _:
|
||||
handler, buf = _make_handler("/search?q=CI")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]):
|
||||
import importlib
|
||||
import mempalace.fleet_api as api_module
|
||||
# Patch search_fleet inside the handler's import context
|
||||
with patch("nexus.mempalace.searcher.search_fleet", return_value=[result]):
|
||||
_handle_search(handler, {"q": ["CI"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert data["count"] == 1
|
||||
assert data["results"][0]["text"] == "CI green"
|
||||
assert data["results"][0]["room"] == "forge"
|
||||
assert data["results"][0]["wing"] == "bezalel"
|
||||
assert data["results"][0]["score"] == 0.95
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_search_with_room_filter(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
result = _make_result()
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[result]) as mock_sf:
|
||||
_handle_search(MagicMock(), {"q": ["test"], "room": ["hermes"]})
|
||||
|
||||
# Verify room was passed through
|
||||
mock_sf.assert_called_once_with("test", room="hermes", n_results=10)
|
||||
|
||||
|
||||
def test_search_invalid_n_param():
|
||||
handler, buf = _make_handler("/search?q=test&n=bad")
|
||||
_handle_search(handler, {"q": ["test"], "n": ["bad"]})
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 400
|
||||
|
||||
|
||||
def test_search_palace_unavailable(monkeypatch):
|
||||
from nexus.mempalace.searcher import MemPalaceUnavailable
|
||||
|
||||
handler, buf = _make_handler("/search?q=test")
|
||||
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", side_effect=MemPalaceUnavailable("no palace")):
|
||||
_handle_search(handler, {"q": ["test"]})
|
||||
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
def test_search_n_clamped_to_max():
|
||||
"""n > MAX_RESULTS is silently clamped."""
|
||||
import nexus.mempalace.searcher as s_module
|
||||
with patch.object(s_module, "search_fleet", return_value=[]) as mock_sf:
|
||||
handler = MagicMock()
|
||||
_handle_search(handler, {"q": ["test"], "n": ["9999"]})
|
||||
|
||||
mock_sf.assert_called_once_with("test", room=None, n_results=50)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /wings
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_wings_returns_list(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(tmp_path))
|
||||
(tmp_path / "bezalel").mkdir()
|
||||
(tmp_path / "timmy").mkdir()
|
||||
# A file should not appear in wings
|
||||
(tmp_path / "README.txt").touch()
|
||||
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert set(data["wings"]) == {"bezalel", "timmy"}
|
||||
assert handler._response_code == 200
|
||||
|
||||
|
||||
def test_wings_missing_palace(tmp_path, monkeypatch):
|
||||
missing = tmp_path / "nonexistent"
|
||||
monkeypatch.setenv("FLEET_PALACE_PATH", str(missing))
|
||||
handler, buf = _make_handler("/wings")
|
||||
_handle_wings(handler)
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 503
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 404 unknown endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_unknown_endpoint():
|
||||
handler, buf = _make_handler("/foobar")
|
||||
handler.do_GET()
|
||||
data = _parse_response(buf)
|
||||
assert "error" in data
|
||||
assert handler._response_code == 404
|
||||
assert "/search" in data["endpoints"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# audit fixture smoke test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_audit_fixture_is_clean():
|
||||
"""Ensure tests/fixtures/fleet_palace/ passes privacy audit (no violations)."""
|
||||
from mempalace.audit_privacy import audit_palace
|
||||
|
||||
fixture_dir = Path(__file__).parent / "fixtures" / "fleet_palace"
|
||||
assert fixture_dir.exists(), f"Fixture directory missing: {fixture_dir}"
|
||||
|
||||
result = audit_palace(fixture_dir)
|
||||
assert result.clean, (
|
||||
f"Privacy violations found in CI fixture:\n" +
|
||||
"\n".join(f" [{v.rule}] {v.path}: {v.detail}" for v in result.violations)
|
||||
)
|
||||
Reference in New Issue
Block a user