feat: add trajectory sanitization utility (#27)

This commit was merged in pull request #27.
This commit is contained in:
2026-03-28 05:04:44 +00:00
parent b47e236b7b
commit 3be3a6ed4f
2 changed files with 272 additions and 0 deletions

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""Sanitize Hermes session exports before they enter local training pipelines."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
ALLOWED_EMAILS = {
"alexpaynex@gmail.com",
"alexander@alexanderwhitestone.com",
}
ALLOWED_IPS = {
"143.198.27.163",
}
ALLOWED_HOSTS = {
"localhost",
}
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
HOST_RE = re.compile(r"(?<![@/])\b(?:[A-Z0-9-]+\.)+[A-Z]{2,}\b(?!/)", re.IGNORECASE)
PATH_REPLACEMENTS = [
(re.compile(r"/Users/[^/\s]+/"), "~/"),
(re.compile(r"/home/[^/\s]+/"), "~/"),
]
INLINE_SECRET_PATTERNS = [
(re.compile(r"(?i)\bapi[_-]?key\s*[:=]\s*\S+"), "api_key=[REDACTED_API_KEY]"),
(re.compile(r"(?i)\btoken\s*[:=]\s*\S+"), "token=[REDACTED_TOKEN]"),
(re.compile(r"(?i)\bpassword\s*[:=]\s*\S+"), "password=[REDACTED_PASSWORD]"),
]
DIRECT_KEY_REDACTIONS = {
"api_key": "[REDACTED_API_KEY]",
"apikey": "[REDACTED_API_KEY]",
"secret_key": "[REDACTED_API_KEY]",
"token": "[REDACTED_TOKEN]",
"access_token": "[REDACTED_TOKEN]",
"auth_token": "[REDACTED_TOKEN]",
"password": "[REDACTED_PASSWORD]",
"passwd": "[REDACTED_PASSWORD]",
"pwd": "[REDACTED_PASSWORD]",
}
def normalize_paths(text: str) -> str:
for pattern, replacement in PATH_REPLACEMENTS:
text = pattern.sub(replacement, text)
return text
def sanitize_text(text: str) -> str:
text = normalize_paths(text)
for pattern, replacement in INLINE_SECRET_PATTERNS:
text = pattern.sub(replacement, text)
text = EMAIL_RE.sub(
lambda match: match.group(0)
if match.group(0).lower() in ALLOWED_EMAILS
else "[REDACTED_EMAIL]",
text,
)
text = IP_RE.sub(
lambda match: match.group(0)
if match.group(0) in ALLOWED_IPS
else "[REDACTED_IP]",
text,
)
text = HOST_RE.sub(
lambda match: match.group(0)
if match.group(0).lower() in ALLOWED_HOSTS
else "[REDACTED_HOST]",
text,
)
return text
def sanitize_payload(payload: Any) -> Any:
if isinstance(payload, dict):
sanitized: dict[str, Any] = {}
for key, value in payload.items():
normalized_key = key.lower()
if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS:
sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key]
continue
sanitized[key] = sanitize_payload(value)
return sanitized
if isinstance(payload, list):
return [sanitize_payload(item) for item in payload]
if isinstance(payload, str):
return sanitize_text(payload)
return payload
def sanitize_json_file(input_path: Path, output_path: Path) -> None:
payload = json.loads(input_path.read_text())
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n")
def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
rows = []
for line in input_path.read_text().splitlines():
line = line.strip()
if not line:
continue
rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True))
output_path.write_text("\n".join(rows) + ("\n" if rows else ""))
def sanitize_file(input_path: Path, output_dir: Path) -> Path:
output_path = output_dir / input_path.name
if input_path.suffix == ".jsonl":
sanitize_jsonl_file(input_path, output_path)
else:
sanitize_json_file(input_path, output_path)
return output_path
def iter_input_files(input_path: Path) -> list[Path]:
if input_path.is_file():
return [input_path]
return sorted(
path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"}
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", required=True, help="Session file or directory to sanitize")
parser.add_argument("--output", required=True, help="Output directory for sanitized files")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
input_path = Path(args.input).expanduser()
output_dir = Path(args.output).expanduser()
files = iter_input_files(input_path)
for path in files:
sanitize_file(path, output_dir)
print(f"Sanitized {len(files)} file(s) into {output_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,107 @@
from __future__ import annotations
import json
import subprocess
import sys
from pathlib import Path
from scripts.trajectory_sanitize import sanitize_payload
def test_sanitize_payload_redacts_secrets_and_normalizes_paths() -> None:
payload = {
"api_key": "sk-secret1234567890ABCDEF",
"token": "ghp_12345ABCDEfghijk67890lmnoPQRST",
"password": "supersecret",
"notes": (
"email user@example.com keep alexpaynex@gmail.com and "
"alexander@alexanderwhitestone.com; "
"ip 10.0.0.8 keep 143.198.27.163; "
"host app.internal.local; "
"path /Users/apayne/projects/timmy/file.txt and /home/runner/work/app.py"
),
"messages": [{"content": "Contact admin@corp.com and token=abc123"}],
}
sanitized = sanitize_payload(payload)
assert sanitized["api_key"] == "[REDACTED_API_KEY]"
assert sanitized["token"] == "[REDACTED_TOKEN]"
assert sanitized["password"] == "[REDACTED_PASSWORD]"
notes = sanitized["notes"]
assert "user@example.com" not in notes
assert "admin@corp.com" not in sanitized["messages"][0]["content"]
assert "[REDACTED_EMAIL]" in notes
assert "alexpaynex@gmail.com" in notes
assert "alexander@alexanderwhitestone.com" in notes
assert "10.0.0.8" not in notes
assert "143.198.27.163" in notes
assert "[REDACTED_IP]" in notes
assert "app.internal.local" not in notes
assert "[REDACTED_HOST]" in notes
assert "~/projects/timmy/file.txt" in notes
assert "~/work/app.py" in notes
assert "/Users/apayne/" not in notes
assert "/home/runner/" not in notes
def test_sanitize_payload_is_idempotent() -> None:
payload = {
"api_key": "sk-secret1234567890ABCDEF",
"notes": "email user@example.com host app.internal.local ip 10.0.0.8",
}
once = sanitize_payload(payload)
twice = sanitize_payload(once)
assert twice == once
def test_cli_sanitizes_json_and_jsonl_files(tmp_path: Path) -> None:
input_dir = tmp_path / "input"
output_dir = tmp_path / "output"
input_dir.mkdir()
session_json = input_dir / "session_a.json"
session_json.write_text(
json.dumps(
{
"email": "private@example.com",
"path": "/Users/alice/project/file.txt",
"host": "app.internal.local",
"ip": "10.0.0.8",
}
)
)
session_jsonl = input_dir / "session_b.jsonl"
session_jsonl.write_text(
json.dumps({"token": "ghp_12345ABCDEfghijk67890lmnoPQRST"}) + "\n"
)
result = subprocess.run(
[
sys.executable,
"-m",
"scripts.trajectory_sanitize",
"--input",
str(input_dir),
"--output",
str(output_dir),
],
capture_output=True,
text=True,
check=True,
)
assert "Sanitized 2 file(s)" in result.stdout
sanitized_json = json.loads((output_dir / "session_a.json").read_text())
assert sanitized_json["email"] == "[REDACTED_EMAIL]"
assert sanitized_json["path"] == "~/project/file.txt"
assert sanitized_json["host"] == "[REDACTED_HOST]"
assert sanitized_json["ip"] == "[REDACTED_IP]"
sanitized_jsonl = (output_dir / "session_b.jsonl").read_text().strip().splitlines()
assert json.loads(sanitized_jsonl[0])["token"] == "[REDACTED_TOKEN]"