From 3acd15aaaf30b18f8d6fe771334212cac984290b Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Sat, 28 Mar 2026 01:04:13 -0400 Subject: [PATCH] feat: add trajectory sanitization utility --- scripts/trajectory_sanitize.py | 165 ++++++++++++++++++++++++++++++ tests/test_trajectory_sanitize.py | 107 +++++++++++++++++++ 2 files changed, 272 insertions(+) create mode 100644 scripts/trajectory_sanitize.py create mode 100644 tests/test_trajectory_sanitize.py diff --git a/scripts/trajectory_sanitize.py b/scripts/trajectory_sanitize.py new file mode 100644 index 0000000..4531e83 --- /dev/null +++ b/scripts/trajectory_sanitize.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +"""Sanitize Hermes session exports before they enter local training pipelines.""" + +from __future__ import annotations + +import argparse +import json +import re +from pathlib import Path +from typing import Any + +ALLOWED_EMAILS = { + "alexpaynex@gmail.com", + "alexander@alexanderwhitestone.com", +} +ALLOWED_IPS = { + "143.198.27.163", +} +ALLOWED_HOSTS = { + "localhost", +} + +EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) +IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") +HOST_RE = re.compile(r"(? str: + for pattern, replacement in PATH_REPLACEMENTS: + text = pattern.sub(replacement, text) + return text + + +def sanitize_text(text: str) -> str: + text = normalize_paths(text) + + for pattern, replacement in INLINE_SECRET_PATTERNS: + text = pattern.sub(replacement, text) + + text = EMAIL_RE.sub( + lambda match: match.group(0) + if match.group(0).lower() in ALLOWED_EMAILS + else "[REDACTED_EMAIL]", + text, + ) + + text = IP_RE.sub( + lambda match: match.group(0) + if match.group(0) in ALLOWED_IPS + else "[REDACTED_IP]", + text, + ) + + text = HOST_RE.sub( + lambda match: match.group(0) + if match.group(0).lower() in ALLOWED_HOSTS + else "[REDACTED_HOST]", + text, + ) + + return text + + + +def sanitize_payload(payload: Any) -> Any: + if isinstance(payload, dict): + sanitized: dict[str, Any] = {} + for key, value in payload.items(): + normalized_key = key.lower() + if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS: + sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key] + continue + sanitized[key] = sanitize_payload(value) + return sanitized + + if isinstance(payload, list): + return [sanitize_payload(item) for item in payload] + + if isinstance(payload, str): + return sanitize_text(payload) + + return payload + + + +def sanitize_json_file(input_path: Path, output_path: Path) -> None: + payload = json.loads(input_path.read_text()) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n") + + + +def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + rows = [] + for line in input_path.read_text().splitlines(): + line = line.strip() + if not line: + continue + rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True)) + output_path.write_text("\n".join(rows) + ("\n" if rows else "")) + + + +def sanitize_file(input_path: Path, output_dir: Path) -> Path: + output_path = output_dir / input_path.name + if input_path.suffix == ".jsonl": + sanitize_jsonl_file(input_path, output_path) + else: + sanitize_json_file(input_path, output_path) + return output_path + + + +def iter_input_files(input_path: Path) -> list[Path]: + if input_path.is_file(): + return [input_path] + return sorted( + path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"} + ) + + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--input", required=True, help="Session file or directory to sanitize") + parser.add_argument("--output", required=True, help="Output directory for sanitized files") + return parser + + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + input_path = Path(args.input).expanduser() + output_dir = Path(args.output).expanduser() + files = iter_input_files(input_path) + for path in files: + sanitize_file(path, output_dir) + print(f"Sanitized {len(files)} file(s) into {output_dir}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_trajectory_sanitize.py b/tests/test_trajectory_sanitize.py new file mode 100644 index 0000000..ab8be39 --- /dev/null +++ b/tests/test_trajectory_sanitize.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path + +from scripts.trajectory_sanitize import sanitize_payload + + +def test_sanitize_payload_redacts_secrets_and_normalizes_paths() -> None: + payload = { + "api_key": "sk-secret1234567890ABCDEF", + "token": "ghp_12345ABCDEfghijk67890lmnoPQRST", + "password": "supersecret", + "notes": ( + "email user@example.com keep alexpaynex@gmail.com and " + "alexander@alexanderwhitestone.com; " + "ip 10.0.0.8 keep 143.198.27.163; " + "host app.internal.local; " + "path /Users/apayne/projects/timmy/file.txt and /home/runner/work/app.py" + ), + "messages": [{"content": "Contact admin@corp.com and token=abc123"}], + } + + sanitized = sanitize_payload(payload) + + assert sanitized["api_key"] == "[REDACTED_API_KEY]" + assert sanitized["token"] == "[REDACTED_TOKEN]" + assert sanitized["password"] == "[REDACTED_PASSWORD]" + + notes = sanitized["notes"] + assert "user@example.com" not in notes + assert "admin@corp.com" not in sanitized["messages"][0]["content"] + assert "[REDACTED_EMAIL]" in notes + assert "alexpaynex@gmail.com" in notes + assert "alexander@alexanderwhitestone.com" in notes + assert "10.0.0.8" not in notes + assert "143.198.27.163" in notes + assert "[REDACTED_IP]" in notes + assert "app.internal.local" not in notes + assert "[REDACTED_HOST]" in notes + assert "~/projects/timmy/file.txt" in notes + assert "~/work/app.py" in notes + assert "/Users/apayne/" not in notes + assert "/home/runner/" not in notes + + +def test_sanitize_payload_is_idempotent() -> None: + payload = { + "api_key": "sk-secret1234567890ABCDEF", + "notes": "email user@example.com host app.internal.local ip 10.0.0.8", + } + + once = sanitize_payload(payload) + twice = sanitize_payload(once) + + assert twice == once + + +def test_cli_sanitizes_json_and_jsonl_files(tmp_path: Path) -> None: + input_dir = tmp_path / "input" + output_dir = tmp_path / "output" + input_dir.mkdir() + + session_json = input_dir / "session_a.json" + session_json.write_text( + json.dumps( + { + "email": "private@example.com", + "path": "/Users/alice/project/file.txt", + "host": "app.internal.local", + "ip": "10.0.0.8", + } + ) + ) + + session_jsonl = input_dir / "session_b.jsonl" + session_jsonl.write_text( + json.dumps({"token": "ghp_12345ABCDEfghijk67890lmnoPQRST"}) + "\n" + ) + + result = subprocess.run( + [ + sys.executable, + "-m", + "scripts.trajectory_sanitize", + "--input", + str(input_dir), + "--output", + str(output_dir), + ], + capture_output=True, + text=True, + check=True, + ) + + assert "Sanitized 2 file(s)" in result.stdout + + sanitized_json = json.loads((output_dir / "session_a.json").read_text()) + assert sanitized_json["email"] == "[REDACTED_EMAIL]" + assert sanitized_json["path"] == "~/project/file.txt" + assert sanitized_json["host"] == "[REDACTED_HOST]" + assert sanitized_json["ip"] == "[REDACTED_IP]" + + sanitized_jsonl = (output_dir / "session_b.jsonl").read_text().strip().splitlines() + assert json.loads(sanitized_jsonl[0])["token"] == "[REDACTED_TOKEN]"