166 lines
4.7 KiB
Python
166 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Sanitize Hermes session exports before they enter local training pipelines."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ALLOWED_EMAILS = {
|
|
"alexpaynex@gmail.com",
|
|
"alexander@alexanderwhitestone.com",
|
|
}
|
|
ALLOWED_IPS = {
|
|
"143.198.27.163",
|
|
}
|
|
ALLOWED_HOSTS = {
|
|
"localhost",
|
|
}
|
|
|
|
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
|
|
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
|
|
HOST_RE = re.compile(r"(?<![@/])\b(?:[A-Z0-9-]+\.)+[A-Z]{2,}\b(?!/)", re.IGNORECASE)
|
|
PATH_REPLACEMENTS = [
|
|
(re.compile(r"/Users/[^/\s]+/"), "~/"),
|
|
(re.compile(r"/home/[^/\s]+/"), "~/"),
|
|
]
|
|
INLINE_SECRET_PATTERNS = [
|
|
(re.compile(r"(?i)\bapi[_-]?key\s*[:=]\s*\S+"), "api_key=[REDACTED_API_KEY]"),
|
|
(re.compile(r"(?i)\btoken\s*[:=]\s*\S+"), "token=[REDACTED_TOKEN]"),
|
|
(re.compile(r"(?i)\bpassword\s*[:=]\s*\S+"), "password=[REDACTED_PASSWORD]"),
|
|
]
|
|
|
|
DIRECT_KEY_REDACTIONS = {
|
|
"api_key": "[REDACTED_API_KEY]",
|
|
"apikey": "[REDACTED_API_KEY]",
|
|
"secret_key": "[REDACTED_API_KEY]",
|
|
"token": "[REDACTED_TOKEN]",
|
|
"access_token": "[REDACTED_TOKEN]",
|
|
"auth_token": "[REDACTED_TOKEN]",
|
|
"password": "[REDACTED_PASSWORD]",
|
|
"passwd": "[REDACTED_PASSWORD]",
|
|
"pwd": "[REDACTED_PASSWORD]",
|
|
}
|
|
|
|
|
|
def normalize_paths(text: str) -> str:
|
|
for pattern, replacement in PATH_REPLACEMENTS:
|
|
text = pattern.sub(replacement, text)
|
|
return text
|
|
|
|
|
|
def sanitize_text(text: str) -> str:
|
|
text = normalize_paths(text)
|
|
|
|
for pattern, replacement in INLINE_SECRET_PATTERNS:
|
|
text = pattern.sub(replacement, text)
|
|
|
|
text = EMAIL_RE.sub(
|
|
lambda match: match.group(0)
|
|
if match.group(0).lower() in ALLOWED_EMAILS
|
|
else "[REDACTED_EMAIL]",
|
|
text,
|
|
)
|
|
|
|
text = IP_RE.sub(
|
|
lambda match: match.group(0)
|
|
if match.group(0) in ALLOWED_IPS
|
|
else "[REDACTED_IP]",
|
|
text,
|
|
)
|
|
|
|
text = HOST_RE.sub(
|
|
lambda match: match.group(0)
|
|
if match.group(0).lower() in ALLOWED_HOSTS
|
|
else "[REDACTED_HOST]",
|
|
text,
|
|
)
|
|
|
|
return text
|
|
|
|
|
|
|
|
def sanitize_payload(payload: Any) -> Any:
|
|
if isinstance(payload, dict):
|
|
sanitized: dict[str, Any] = {}
|
|
for key, value in payload.items():
|
|
normalized_key = key.lower()
|
|
if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS:
|
|
sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key]
|
|
continue
|
|
sanitized[key] = sanitize_payload(value)
|
|
return sanitized
|
|
|
|
if isinstance(payload, list):
|
|
return [sanitize_payload(item) for item in payload]
|
|
|
|
if isinstance(payload, str):
|
|
return sanitize_text(payload)
|
|
|
|
return payload
|
|
|
|
|
|
|
|
def sanitize_json_file(input_path: Path, output_path: Path) -> None:
|
|
payload = json.loads(input_path.read_text())
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n")
|
|
|
|
|
|
|
|
def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
rows = []
|
|
for line in input_path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True))
|
|
output_path.write_text("\n".join(rows) + ("\n" if rows else ""))
|
|
|
|
|
|
|
|
def sanitize_file(input_path: Path, output_dir: Path) -> Path:
|
|
output_path = output_dir / input_path.name
|
|
if input_path.suffix == ".jsonl":
|
|
sanitize_jsonl_file(input_path, output_path)
|
|
else:
|
|
sanitize_json_file(input_path, output_path)
|
|
return output_path
|
|
|
|
|
|
|
|
def iter_input_files(input_path: Path) -> list[Path]:
|
|
if input_path.is_file():
|
|
return [input_path]
|
|
return sorted(
|
|
path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"}
|
|
)
|
|
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--input", required=True, help="Session file or directory to sanitize")
|
|
parser.add_argument("--output", required=True, help="Output directory for sanitized files")
|
|
return parser
|
|
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = build_parser().parse_args(argv)
|
|
input_path = Path(args.input).expanduser()
|
|
output_dir = Path(args.output).expanduser()
|
|
files = iter_input_files(input_path)
|
|
for path in files:
|
|
sanitize_file(path, output_dir)
|
|
print(f"Sanitized {len(files)} file(s) into {output_dir}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|