#!/usr/bin/env python3 """Sanitize Hermes session exports before they enter local training pipelines.""" from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any ALLOWED_EMAILS = { "alexpaynex@gmail.com", "alexander@alexanderwhitestone.com", } ALLOWED_IPS = { "143.198.27.163", } ALLOWED_HOSTS = { "localhost", } EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") HOST_RE = re.compile(r"(? str: for pattern, replacement in PATH_REPLACEMENTS: text = pattern.sub(replacement, text) return text def sanitize_text(text: str) -> str: text = normalize_paths(text) for pattern, replacement in INLINE_SECRET_PATTERNS: text = pattern.sub(replacement, text) text = EMAIL_RE.sub( lambda match: match.group(0) if match.group(0).lower() in ALLOWED_EMAILS else "[REDACTED_EMAIL]", text, ) text = IP_RE.sub( lambda match: match.group(0) if match.group(0) in ALLOWED_IPS else "[REDACTED_IP]", text, ) text = HOST_RE.sub( lambda match: match.group(0) if match.group(0).lower() in ALLOWED_HOSTS else "[REDACTED_HOST]", text, ) return text def sanitize_payload(payload: Any) -> Any: if isinstance(payload, dict): sanitized: dict[str, Any] = {} for key, value in payload.items(): normalized_key = key.lower() if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS: sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key] continue sanitized[key] = sanitize_payload(value) return sanitized if isinstance(payload, list): return [sanitize_payload(item) for item in payload] if isinstance(payload, str): return sanitize_text(payload) return payload def sanitize_json_file(input_path: Path, output_path: Path) -> None: payload = json.loads(input_path.read_text()) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n") def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) rows = [] for line in input_path.read_text().splitlines(): line = line.strip() if not line: continue rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True)) output_path.write_text("\n".join(rows) + ("\n" if rows else "")) def sanitize_file(input_path: Path, output_dir: Path) -> Path: output_path = output_dir / input_path.name if input_path.suffix == ".jsonl": sanitize_jsonl_file(input_path, output_path) else: sanitize_json_file(input_path, output_path) return output_path def iter_input_files(input_path: Path) -> list[Path]: if input_path.is_file(): return [input_path] return sorted( path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"} ) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--input", required=True, help="Session file or directory to sanitize") parser.add_argument("--output", required=True, help="Output directory for sanitized files") return parser def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) input_path = Path(args.input).expanduser() output_dir = Path(args.output).expanduser() files = iter_input_files(input_path) for path in files: sanitize_file(path, output_dir) print(f"Sanitized {len(files)} file(s) into {output_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())