#!/usr/bin/env python3 """ audit_privacy.py — Weekly privacy audit for the shared fleet palace. Scans a palace directory (typically the shared Alpha fleet palace) and reports any files that violate the closet-only sync policy: 1. Raw drawer files (.drawer.json) — must never exist in fleet palace. 2. Closet files containing full-text content (> threshold characters). 3. Closet files exposing private source_file paths. Exits 0 if clean, 1 if violations found. Usage: python mempalace/audit_privacy.py [fleet_palace_dir] Default: /var/lib/mempalace/fleet Refs: #1083, #1075 """ from __future__ import annotations import argparse import json import sys from dataclasses import dataclass, field from pathlib import Path # Closets should be compressed summaries, not full text. # Flag any text field exceeding this character count as suspicious. MAX_CLOSET_TEXT_CHARS = 2000 # Private path indicators — if a source_file contains any of these, # it is considered a private VPS path that should not be in the fleet palace. PRIVATE_PATH_PREFIXES = [ "/root/", "/home/", "/Users/", "/var/home/", ] @dataclass class Violation: path: Path rule: str detail: str @dataclass class AuditResult: scanned: int = 0 violations: list[Violation] = field(default_factory=list) @property def clean(self) -> bool: return len(self.violations) == 0 def _is_private_path(path_str: str) -> bool: for prefix in PRIVATE_PATH_PREFIXES: if path_str.startswith(prefix): return True return False def audit_file(path: Path) -> list[Violation]: violations: list[Violation] = [] # Rule 1: raw drawer files must not exist in fleet palace if path.name.endswith(".drawer.json"): violations.append(Violation( path=path, rule="RAW_DRAWER", detail="Raw drawer file present — only closets allowed in fleet palace.", )) return violations # no further checks needed if not path.name.endswith(".closet.json"): return violations # not a palace file, skip try: data = json.loads(path.read_text()) except (json.JSONDecodeError, OSError) as exc: violations.append(Violation( path=path, rule="PARSE_ERROR", detail=f"Could not parse file: {exc}", )) return violations drawers = data.get("drawers", []) if isinstance(data, dict) else [] if not isinstance(drawers, list): drawers = [] for i, drawer in enumerate(drawers): if not isinstance(drawer, dict): continue # Rule 2: closets must not contain full-text content text = drawer.get("text", "") if len(text) > MAX_CLOSET_TEXT_CHARS: violations.append(Violation( path=path, rule="FULL_TEXT_IN_CLOSET", detail=( f"Drawer [{i}] text is {len(text)} chars " f"(limit {MAX_CLOSET_TEXT_CHARS}). " "Closets must be compressed summaries, not raw content." ), )) # Rule 3: private source_file paths must not appear in fleet data source_file = drawer.get("source_file", "") if source_file and _is_private_path(source_file): violations.append(Violation( path=path, rule="PRIVATE_SOURCE_PATH", detail=f"Drawer [{i}] exposes private source_file: {source_file!r}", )) return violations def audit_palace(palace_dir: Path) -> AuditResult: result = AuditResult() for f in sorted(palace_dir.rglob("*.json")): violations = audit_file(f) result.scanned += 1 result.violations.extend(violations) return result def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Audit the fleet palace for privacy violations." ) parser.add_argument( "palace_dir", nargs="?", default="/var/lib/mempalace/fleet", help="Path to the fleet palace directory (default: /var/lib/mempalace/fleet)", ) parser.add_argument( "--max-text", type=int, default=MAX_CLOSET_TEXT_CHARS, metavar="N", help=f"Maximum closet text length (default: {MAX_CLOSET_TEXT_CHARS})", ) args = parser.parse_args(argv) palace_dir = Path(args.palace_dir) if not palace_dir.exists(): print(f"[audit_privacy] ERROR: palace directory not found: {palace_dir}", file=sys.stderr) return 2 print(f"[audit_privacy] Scanning: {palace_dir}") result = audit_palace(palace_dir) if result.clean: print(f"[audit_privacy] OK — {result.scanned} file(s) scanned, no violations.") return 0 print( f"[audit_privacy] FAIL — {len(result.violations)} violation(s) in {result.scanned} file(s):", file=sys.stderr, ) for v in result.violations: print(f" [{v.rule}] {v.path}", file=sys.stderr) print(f" {v.detail}", file=sys.stderr) return 1 if __name__ == "__main__": sys.exit(main())