feat(bezalel): MemPalace ecosystem — validation, audit, sync, auto-revert, Evennia integration

This commit is contained in:
Bezalel
2026-04-07 14:47:04 +00:00
committed by Bezalel
parent 34ec13bc29
commit a0ee7858ff
16 changed files with 1438 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
Audit the fleet shared palace for privacy violations.
Ensures no raw drawers, full source paths, or private workspace leaks exist.
Usage:
python audit_mempalace_privacy.py /path/to/fleet/palace
Exit codes:
0 = clean
1 = violations found
"""
import sys
from pathlib import Path
try:
import chromadb
except ImportError:
print("ERROR: chromadb not installed")
sys.exit(1)
VIOLATION_KEYWORDS = [
"/root/wizards/",
"/home/",
"/Users/",
"private_key",
"-----BEGIN",
"GITEA_TOKEN=",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
]
def audit(palace_path: Path):
violations = []
client = chromadb.PersistentClient(path=str(palace_path))
try:
col = client.get_collection("mempalace_drawers")
except Exception as e:
print(f"ERROR: Could not open collection: {e}")
sys.exit(1)
all_data = col.get(include=["documents", "metadatas"])
docs = all_data["documents"]
metas = all_data["metadatas"]
for doc, meta in zip(docs, metas):
source = meta.get("source_file", "")
doc_type = meta.get("type", "")
# Rule 1: Fleet palace should only contain closets or explicitly typed entries
if doc_type not in ("closet", "summary", "fleet"):
violations.append(
f"VIOLATION: Document type is '{doc_type}' (expected closet/summary/fleet). "
f"Source: {source}"
)
# Rule 2: No full absolute paths from private workspaces
if any(abs_path in source for abs_path in ["/root/wizards/", "/home/", "/Users/"]):
violations.append(
f"VIOLATION: Source contains absolute path: {source}"
)
# Rule 3: No raw secrets in document text
for kw in VIOLATION_KEYWORDS:
if kw in doc:
violations.append(
f"VIOLATION: Document contains sensitive keyword '{kw}'. Source: {source}"
)
break # one violation per doc is enough
return violations
def main():
import argparse
parser = argparse.ArgumentParser(description="Audit fleet palace privacy")
parser.add_argument("palace", default="/var/lib/mempalace/fleet", nargs="?", help="Path to fleet palace")
args = parser.parse_args()
violations = audit(Path(args.palace))
if violations:
print(f"FAIL: {len(violations)} privacy violation(s) found")
for v in violations:
print(f" {v}")
sys.exit(1)
else:
print("PASS: No privacy violations detected")
sys.exit(0)
if __name__ == "__main__":
main()

50
scripts/backup_databases.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Bezalel Database Backup — MemPalace + Evennia + Fleet
# Runs nightly after re-mine completes. Keeps 7 days of rolling backups.
set -euo pipefail
BACKUP_BASE="/root/wizards/bezalel/home/backups"
DATE=$(date +%Y%m%d_%H%M%S)
LOG="/var/log/bezalel_db_backup.log"
# Sources
LOCAL_PALACE="/root/wizards/bezalel/.mempalace/palace"
FLEET_PALACE="/var/lib/mempalace/fleet"
EVENNIA_DB="/root/wizards/bezalel/evennia/bezalel_world/server/evennia.db3"
# Destinations
LOCAL_BACKUP="${BACKUP_BASE}/mempalace/mempalace_${DATE}.tar.gz"
FLEET_BACKUP="${BACKUP_BASE}/fleet/fleet_${DATE}.tar.gz"
EVENNIA_BACKUP="${BACKUP_BASE}/evennia/evennia_${DATE}.db3.gz"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
}
log "Starting database backup cycle..."
# 1. Backup local MemPalace
tar -czf "$LOCAL_BACKUP" -C "$(dirname "$LOCAL_PALACE")" "$(basename "$LOCAL_PALACE")"
log "Local palace backed up: ${LOCAL_BACKUP} ($(du -h "$LOCAL_BACKUP" | cut -f1))"
# 2. Backup fleet MemPalace
tar -czf "$FLEET_BACKUP" -C "$(dirname "$FLEET_PALACE")" "$(basename "$FLEET_PALACE")"
log "Fleet palace backed up: ${FLEET_BACKUP} ($(du -h "$FLEET_BACKUP" | cut -f1))"
# 3. Backup Evennia DB (gzip for space)
gzip -c "$EVENNIA_DB" > "$EVENNIA_BACKUP"
log "Evennia DB backed up: ${EVENNIA_BACKUP} ($(du -h "$EVENNIA_BACKUP" | cut -f1))"
# 4. Prune backups older than 7 days
find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' -mtime +7 -delete
find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' -mtime +7 -delete
find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' -mtime +7 -delete
log "Pruned backups older than 7 days"
# 5. Report counts
MP_COUNT=$(find "${BACKUP_BASE}/mempalace" -name 'mempalace_*.tar.gz' | wc -l)
FL_COUNT=$(find "${BACKUP_BASE}/fleet" -name 'fleet_*.tar.gz' | wc -l)
EV_COUNT=$(find "${BACKUP_BASE}/evennia" -name 'evennia_*.db3.gz' | wc -l)
log "Backup cycle complete. Retained: mempalace=${MP_COUNT}, fleet=${FL_COUNT}, evennia=${EV_COUNT}"
touch /var/lib/bezalel/heartbeats/db_backup.last

135
scripts/ci_auto_revert.py Normal file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
CI Auto-Revert — Poka-yoke for broken merges.
Monitors the main branch post-merge and auto-reverts via local git if CI fails.
Usage:
python ci_auto_revert.py <repo_owner>/<repo_name>
python ci_auto_revert.py Timmy_Foundation/hermes-agent
Recommended cron: */10 * * * *
"""
import os
import sys
import json
import subprocess
import tempfile
from datetime import datetime, timedelta, timezone
from urllib import request, error
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
REVERT_WINDOW_MINUTES = 10
def api_call(method, path):
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {GITEA_TOKEN}"}
req = request.Request(url, method=method, headers=headers)
try:
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except error.HTTPError as e:
return {"error": e.read().decode(), "status": e.code}
def get_recent_commits(owner, repo, since):
since_iso = since.strftime("%Y-%m-%dT%H:%M:%SZ")
return api_call("GET", f"/repos/{owner}/{repo}/commits?sha=main&since={since_iso}&limit=20")
def get_commit_status(owner, repo, sha):
return api_call("GET", f"/repos/{owner}/{repo}/commits/{sha}/status")
def revert_via_git(clone_url, sha, msg, owner, repo):
with tempfile.TemporaryDirectory() as tmpdir:
# Clone with token
auth_url = clone_url.replace("https://", f"https://bezalel:{GITEA_TOKEN}@")
subprocess.run(["git", "clone", "--depth", "10", auth_url, tmpdir], check=True, capture_output=True)
# Configure git
subprocess.run(["git", "-C", tmpdir, "config", "user.email", "bezalel@timmy.foundation"], check=True, capture_output=True)
subprocess.run(["git", "-C", tmpdir, "config", "user.name", "Bezalel"], check=True, capture_output=True)
# Revert the commit
revert_msg = f"[auto-revert] {msg}\n\nOriginal commit {sha} failed CI."
result = subprocess.run(
["git", "-C", tmpdir, "revert", "--no-edit", "-m", revert_msg, sha],
capture_output=True,
text=True,
)
if result.returncode != 0:
return {"error": f"git revert failed: {result.stderr}"}
# Push
push_result = subprocess.run(
["git", "-C", tmpdir, "push", "origin", "main"],
capture_output=True,
text=True,
)
if push_result.returncode != 0:
return {"error": f"git push failed: {push_result.stderr}"}
return {"ok": True, "reverted_sha": sha}
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <owner/repo>")
sys.exit(1)
repo_full = sys.argv[1]
owner, repo = repo_full.split("/", 1)
since = datetime.now(timezone.utc) - timedelta(minutes=REVERT_WINDOW_MINUTES + 5)
commits = get_recent_commits(owner, repo, since)
if not isinstance(commits, list):
print(f"ERROR fetching commits: {commits}")
sys.exit(1)
reverted = 0
for commit in commits:
sha = commit.get("sha", "")
msg = commit.get("commit", {}).get("message", "").split("\n")[0]
commit_time = commit.get("commit", {}).get("committer", {}).get("date", "")
if not commit_time:
continue
commit_dt = datetime.fromisoformat(commit_time.replace("Z", "+00:00"))
age_min = (datetime.now(timezone.utc) - commit_dt).total_seconds() / 60
if age_min > REVERT_WINDOW_MINUTES:
continue
status = get_commit_status(owner, repo, sha)
state = status.get("state", "")
if state == "failure":
print(f"ALERT: Commit {sha[:8]} '{msg}' failed CI ({age_min:.1f}m old). Initiating revert...")
repo_info = api_call("GET", f"/repos/{owner}/{repo}")
clone_url = repo_info.get("clone_url", "")
if not clone_url:
print(f" Cannot find clone URL")
continue
result = revert_via_git(clone_url, sha, msg, owner, repo)
if "error" in result:
print(f" Revert failed: {result['error']}")
else:
print(f" Reverted successfully.")
reverted += 1
elif state == "success":
print(f"OK: Commit {sha[:8]} '{msg}' passed CI.")
elif state == "pending":
print(f"PENDING: Commit {sha[:8]} '{msg}' still running CI.")
else:
print(f"UNKNOWN: Commit {sha[:8]} '{msg}' has CI state '{state}'.")
if reverted == 0:
print("No broken merges found in the last 10 minutes.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Export closets from a local MemPalace wing for fleet-wide sharing.
Privacy rule: only summaries/closets are exported. No raw source_file paths.
Source filenames are anonymized to just the basename.
"""
import json
import sys
from pathlib import Path
import chromadb
PALACE_PATH = "/root/wizards/bezalel/.mempalace/palace"
FLEET_INCOMING = "/var/lib/mempalace/fleet/incoming"
WING = "bezalel"
DOCS_PER_ROOM = 5
def main():
client = chromadb.PersistentClient(path=PALACE_PATH)
col = client.get_collection("mempalace_drawers")
# Discover rooms in this wing
all_meta = col.get(include=["metadatas"])["metadatas"]
rooms = set()
for m in all_meta:
if m.get("wing") == WING:
rooms.add(m.get("room", "general"))
Path(FLEET_INCOMING).mkdir(parents=True, exist_ok=True)
closets = []
for room in sorted(rooms):
results = col.query(
query_texts=[room],
n_results=DOCS_PER_ROOM,
where={"$and": [{"wing": WING}, {"room": room}]},
include=["documents", "metadatas"],
)
docs = results["documents"][0]
metas = results["metadatas"][0]
entries = []
for doc, meta in zip(docs, metas):
# Sanitize content: strip absolute workspace paths
sanitized = doc[:800]
sanitized = sanitized.replace("/root/wizards/bezalel/", "~/")
sanitized = sanitized.replace("/root/wizards/", "~/")
sanitized = sanitized.replace("/home/bezalel/", "~/")
sanitized = sanitized.replace("/home/", "~/")
entries.append({
"content": sanitized,
"source_basename": Path(meta.get("source_file", "?")).name,
})
closet = {
"wing": WING,
"room": room,
"type": "closet",
"entries": entries,
}
closets.append(closet)
out_file = Path(FLEET_INCOMING) / f"{WING}_closets.json"
with open(out_file, "w") as f:
json.dump(closets, f, indent=2)
print(f"Exported {len(closets)} closets to {out_file}")
for c in closets:
print(f" {c['wing']} / {c['room']} : {len(c['entries'])} entries")
if __name__ == "__main__":
main()

24
scripts/mempalace_nightly.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env bash
# Bezalel MemPalace Nightly Re-mine + Fleet Sync
set -euo pipefail
PALACE="/root/wizards/bezalel/.mempalace/palace"
MINER="/root/wizards/bezalel/hermes/venv/bin/mempalace"
WING_DIR="/root/wizards/bezalel"
LOG="/var/log/bezalel_mempalace.log"
EXPORTER="/root/wizards/bezalel/hermes/venv/bin/python /root/wizards/bezalel/mempalace_export.py"
IMPORTER="/root/wizards/bezalel/hermes/venv/bin/python /var/lib/mempalace/fleet_import.py"
echo "[$(date -Iseconds)] Starting mempalace re-mine" >> "$LOG"
cd "$WING_DIR"
"$MINER" --palace "$PALACE" mine "$WING_DIR" --agent bezalel >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Finished mempalace re-mine" >> "$LOG"
"$MINER" --palace "$PALACE" status >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Starting fleet closet export" >> "$LOG"
$EXPORTER >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Starting fleet closet import" >> "$LOG"
$IMPORTER >> "$LOG" 2>&1 || true
echo "[$(date -Iseconds)] Fleet sync complete" >> "$LOG"
touch /var/lib/bezalel/heartbeats/mempalace_nightly.last

53
scripts/meta_heartbeat.sh Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Meta-heartbeat — checks all Bezalel cron jobs for stale timestamps
set -euo pipefail
HEARTBEAT_DIR="/var/lib/bezalel/heartbeats"
ALERT_LOG="/var/log/bezalel_meta_heartbeat.log"
STALE_MINUTES=30
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
mkdir -p "$HEARTBEAT_DIR"
# Define expected heartbeats: name => max_stale_minutes
HEARTBEATS=(
"nightly_watch:150" # 2.5h — runs at 02:00
"mempalace_nightly:150" # 2.5h — runs at 03:00
"db_backup:150" # 2.5h — runs at 03:30
"runner_health:15" # 15m — every 5 min
)
NOW_EPOCH=$(date +%s)
FAILURES=0
for entry in "${HEARTBEATS[@]}"; do
name="${entry%%:*}"
max_minutes="${entry##*:}"
file="${HEARTBEAT_DIR}/${name}.last"
if [[ ! -f "$file" ]]; then
log "MISSING: $name heartbeat file not found ($file)"
FAILURES=$((FAILURES + 1))
continue
fi
LAST_EPOCH=$(stat -c %Y "$file")
AGE_MIN=$(( (NOW_EPOCH - LAST_EPOCH) / 60 ))
if [[ $AGE_MIN -gt $max_minutes ]]; then
log "STALE: $name is ${AGE_MIN}m old (max ${max_minutes}m)"
FAILURES=$((FAILURES + 1))
else
log "OK: $name is ${AGE_MIN}m old"
fi
done
if [[ $FAILURES -gt 0 ]]; then
log "ALERT: $FAILURES stale/missing heartbeat(s) detected."
exit 1
else
log "ALL_OK: All heartbeats healthy."
fi

46
scripts/runner_health_probe.sh Executable file
View File

@@ -0,0 +1,46 @@
#!/usr/bin/env bash
# Gitea Runner Health Probe — Poka-yoke for unregistered runners
set -euo pipefail
GITEA_TOKEN="${GITEA_TOKEN:-}"
GITEA_URL="https://forge.alexanderwhitestone.com"
ALERT_LOG="/var/log/bezalel_runner_health.log"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
if [[ -z "$GITEA_TOKEN" ]]; then
log "ERROR: GITEA_TOKEN not set"
exit 1
fi
ACTIVE_RUNNERS=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
log "Active runners: ${ACTIVE_RUNNERS}"
if [[ "$ACTIVE_RUNNERS" -eq 0 ]]; then
log "CRITICAL: Zero active runners detected. Attempting self-healing restart."
pkill -f "act_runner daemon" 2>/dev/null || true
sleep 2
cd /opt/gitea-runner && nohup ./act_runner daemon > /var/log/gitea-runner.log 2>&1 &
sleep 3
# Re-check
ACTIVE_RUNNERS_AFTER=$(curl -s -H "Authorization: token ${GITEA_TOKEN}" \
"${GITEA_URL}/api/v1/repos/Timmy_Foundation/hermes-agent/actions/runners" | \
python3 -c "import sys,json; d=json.load(sys.stdin); print(len([r for r in d.get('runners',[]) if r.get('status')=='online']))")
log "Active runners after restart: ${ACTIVE_RUNNERS_AFTER}"
if [[ "$ACTIVE_RUNNERS_AFTER" -eq 0 ]]; then
log "CRITICAL: Self-healing failed. Runner still offline."
touch /var/lib/bezalel/heartbeats/runner_health.last
exit 1
else
log "RECOVERED: Runner back online."
fi
else
log "OK: ${ACTIVE_RUNNERS} runner(s) online."
fi
touch /var/lib/bezalel/heartbeats/runner_health.last

50
scripts/secret_guard.sh Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Secret Guard — Poka-yoke for world-readable credentials
set -euo pipefail
ALERT_LOG="/var/log/bezalel_secret_guard.log"
QUARANTINE_DIR="/root/wizards/bezalel/home/quarantine"
mkdir -p "$QUARANTINE_DIR"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
# Scan for world-readable files with sensitive keywords in /root, /home, /etc, /tmp, /var/log
# Exclude binary files, large files (>1MB), and known safe paths
BAD_FILES=$(find /root /home /etc /tmp /var/log -maxdepth 4 -type f -perm /o+r 2>/dev/null \
! -path "*/.git/*" \
! -path "*/node_modules/*" \
! -path "*/venv/*" \
! -path "*/.venv/*" \
! -path "*/__pycache__/*" \
! -path "*/.pyc" \
! -size +1M \
-exec grep -l -i -E 'password|token|secret|nsec|api_key|private_key|aws_access_key_id|aws_secret_access_key' {} + 2>/dev/null | head -50)
VIOLATIONS=0
for file in $BAD_FILES; do
# Skip if already quarantined
if [[ "$file" == "$QUARANTINE_DIR"* ]]; then
continue
fi
# Skip log files that are expected to be world-readable
if [[ "$file" == /var/log/* ]]; then
continue
fi
VIOLATIONS=$((VIOLATIONS + 1))
basename=$(basename "$file")
quarantine_path="${QUARANTINE_DIR}/${basename}.$(date +%s)"
cp "$file" "$quarantine_path"
chmod 600 "$quarantine_path"
chmod 600 "$file"
log "QUARANTINED: $file -> $quarantine_path (permissions fixed to 600)"
done
if [[ $VIOLATIONS -gt 0 ]]; then
log "ALERT: $VIOLATIONS world-readable secret file(s) detected and quarantined."
else
log "OK: No world-readable secret files found."
fi

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env bash
# Sync Fleet MemPalace from Beta to Alpha
# Usage: ./sync_fleet_to_alpha.sh
set -euo pipefail
FLEET_DIR="/var/lib/mempalace/fleet"
ALPHA_HOST="167.99.126.228"
ALPHA_USER="root"
ALPHA_DEST="/var/lib/mempalace/fleet"
LOG="/var/log/bezalel_alpha_sync.log"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$LOG"
}
log "Starting fleet palace sync to Alpha (${ALPHA_HOST})..."
# Ensure Alpha destination exists (SSH must be configured key-based or agent-forwarded)
ssh -o ConnectTimeout=10 "${ALPHA_USER}@${ALPHA_HOST}" "mkdir -p ${ALPHA_DEST}" || {
log "ERROR: Cannot reach Alpha host. Aborting."
exit 1
}
# rsync the fleet palace directory (ChromaDB files + incoming closets)
rsync -avz --delete \
-e "ssh -o ConnectTimeout=10" \
"${FLEET_DIR}/" \
"${ALPHA_USER}@${ALPHA_HOST}:${ALPHA_DEST}/" >> "$LOG" 2>&1
log "Fleet palace sync complete."

View File

@@ -0,0 +1,123 @@
#!/usr/bin/env python3
"""
Validate a wizard's mempalace.yaml against the fleet taxonomy standard.
Usage:
python validate_mempalace_taxonomy.py /path/to/mempalace.yaml
python validate_mempalace_taxonomy.py --ci /path/to/mempalace.yaml
Exit codes:
0 = valid
1 = missing required rooms or other violations
"""
import sys
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML not installed. Run: pip install pyyaml")
sys.exit(1)
REQUIRED_ROOMS = {
"forge",
"hermes",
"nexus",
"issues",
"experiments",
}
def load_standard():
# Try to find the fleet standard in the-nexus clone or local path
candidates = [
Path(__file__).parent.parent / "mempalace_taxonomy.yaml",
Path("/tmp/nexus_clone/docs/mempalace_taxonomy.yaml"),
Path(__file__).parent.parent.parent / "the-nexus" / "docs" / "mempalace_taxonomy.yaml",
]
for c in candidates:
if c.exists():
with open(c) as f:
return yaml.safe_load(f)
return None
def validate(path: Path):
errors = []
warnings = []
if not path.exists():
errors.append(f"File not found: {path}")
return errors, warnings
with open(path) as f:
data = yaml.safe_load(f)
if not data:
errors.append("Empty or invalid YAML")
return errors, warnings
rooms = data.get("rooms", data.get("wings", {}).get("bezalel", {}).get("rooms", []))
if isinstance(rooms, list) and rooms and isinstance(rooms[0], dict):
room_names = {r.get("name") for r in rooms if isinstance(r, dict)}
elif isinstance(rooms, dict):
room_names = set(rooms.keys())
else:
room_names = set()
missing = REQUIRED_ROOMS - room_names
if missing:
errors.append(f"Missing required rooms: {', '.join(sorted(missing))}")
# Check for duplicate room names
if len(room_names) < len(list(rooms) if isinstance(rooms, list) else rooms):
errors.append("Duplicate room names detected")
# Check for empty keywords
if isinstance(rooms, list):
for r in rooms:
if isinstance(r, dict):
kw = r.get("keywords", [])
if not kw:
warnings.append(f"Room '{r.get('name')}' has no keywords")
standard = load_standard()
if standard:
std_optional = set(standard.get("optional_rooms", {}).keys())
unknown = room_names - REQUIRED_ROOMS - std_optional
if unknown:
warnings.append(f"Non-standard rooms (OK but not in fleet spec): {', '.join(sorted(unknown))}")
return errors, warnings
def main():
import argparse
parser = argparse.ArgumentParser(description="Validate MemPalace taxonomy")
parser.add_argument("config", help="Path to mempalace.yaml")
parser.add_argument("--ci", action="store_true", help="CI mode: fail on warnings too")
args = parser.parse_args()
errors, warnings = validate(Path(args.config))
if warnings:
for w in warnings:
print(f"WARNING: {w}")
if errors:
for e in errors:
print(f"ERROR: {e}")
sys.exit(1)
if args.ci and warnings:
print("Validation failed in CI mode (warnings treated as errors)")
sys.exit(1)
print("OK: Taxonomy validation passed")
sys.exit(0)
if __name__ == "__main__":
main()