Files
timmy-home/scripts/trajectory_sanitize.py

166 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""Sanitize Hermes session exports before they enter local training pipelines."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
ALLOWED_EMAILS = {
"alexpaynex@gmail.com",
"alexander@alexanderwhitestone.com",
}
ALLOWED_IPS = {
"143.198.27.163",
}
ALLOWED_HOSTS = {
"localhost",
}
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
HOST_RE = re.compile(r"(?<![@/])\b(?:[A-Z0-9-]+\.)+[A-Z]{2,}\b(?!/)", re.IGNORECASE)
PATH_REPLACEMENTS = [
(re.compile(r"/Users/[^/\s]+/"), "~/"),
(re.compile(r"/home/[^/\s]+/"), "~/"),
]
INLINE_SECRET_PATTERNS = [
(re.compile(r"(?i)\bapi[_-]?key\s*[:=]\s*\S+"), "api_key=[REDACTED_API_KEY]"),
(re.compile(r"(?i)\btoken\s*[:=]\s*\S+"), "token=[REDACTED_TOKEN]"),
(re.compile(r"(?i)\bpassword\s*[:=]\s*\S+"), "password=[REDACTED_PASSWORD]"),
]
DIRECT_KEY_REDACTIONS = {
"api_key": "[REDACTED_API_KEY]",
"apikey": "[REDACTED_API_KEY]",
"secret_key": "[REDACTED_API_KEY]",
"token": "[REDACTED_TOKEN]",
"access_token": "[REDACTED_TOKEN]",
"auth_token": "[REDACTED_TOKEN]",
"password": "[REDACTED_PASSWORD]",
"passwd": "[REDACTED_PASSWORD]",
"pwd": "[REDACTED_PASSWORD]",
}
def normalize_paths(text: str) -> str:
for pattern, replacement in PATH_REPLACEMENTS:
text = pattern.sub(replacement, text)
return text
def sanitize_text(text: str) -> str:
text = normalize_paths(text)
for pattern, replacement in INLINE_SECRET_PATTERNS:
text = pattern.sub(replacement, text)
text = EMAIL_RE.sub(
lambda match: match.group(0)
if match.group(0).lower() in ALLOWED_EMAILS
else "[REDACTED_EMAIL]",
text,
)
text = IP_RE.sub(
lambda match: match.group(0)
if match.group(0) in ALLOWED_IPS
else "[REDACTED_IP]",
text,
)
text = HOST_RE.sub(
lambda match: match.group(0)
if match.group(0).lower() in ALLOWED_HOSTS
else "[REDACTED_HOST]",
text,
)
return text
def sanitize_payload(payload: Any) -> Any:
if isinstance(payload, dict):
sanitized: dict[str, Any] = {}
for key, value in payload.items():
normalized_key = key.lower()
if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS:
sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key]
continue
sanitized[key] = sanitize_payload(value)
return sanitized
if isinstance(payload, list):
return [sanitize_payload(item) for item in payload]
if isinstance(payload, str):
return sanitize_text(payload)
return payload
def sanitize_json_file(input_path: Path, output_path: Path) -> None:
payload = json.loads(input_path.read_text())
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n")
def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
rows = []
for line in input_path.read_text().splitlines():
line = line.strip()
if not line:
continue
rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True))
output_path.write_text("\n".join(rows) + ("\n" if rows else ""))
def sanitize_file(input_path: Path, output_dir: Path) -> Path:
output_path = output_dir / input_path.name
if input_path.suffix == ".jsonl":
sanitize_jsonl_file(input_path, output_path)
else:
sanitize_json_file(input_path, output_path)
return output_path
def iter_input_files(input_path: Path) -> list[Path]:
if input_path.is_file():
return [input_path]
return sorted(
path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"}
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", required=True, help="Session file or directory to sanitize")
parser.add_argument("--output", required=True, help="Output directory for sanitized files")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
input_path = Path(args.input).expanduser()
output_dir = Path(args.output).expanduser()
files = iter_input_files(input_path)
for path in files:
sanitize_file(path, output_dir)
print(f"Sanitized {len(files)} file(s) into {output_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())