feat: add trajectory sanitization utility
This commit is contained in:
165
scripts/trajectory_sanitize.py
Normal file
165
scripts/trajectory_sanitize.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sanitize Hermes session exports before they enter local training pipelines."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
ALLOWED_EMAILS = {
|
||||
"alexpaynex@gmail.com",
|
||||
"alexander@alexanderwhitestone.com",
|
||||
}
|
||||
ALLOWED_IPS = {
|
||||
"143.198.27.163",
|
||||
}
|
||||
ALLOWED_HOSTS = {
|
||||
"localhost",
|
||||
}
|
||||
|
||||
EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
|
||||
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
|
||||
HOST_RE = re.compile(r"(?<![@/])\b(?:[A-Z0-9-]+\.)+[A-Z]{2,}\b(?!/)", re.IGNORECASE)
|
||||
PATH_REPLACEMENTS = [
|
||||
(re.compile(r"/Users/[^/\s]+/"), "~/"),
|
||||
(re.compile(r"/home/[^/\s]+/"), "~/"),
|
||||
]
|
||||
INLINE_SECRET_PATTERNS = [
|
||||
(re.compile(r"(?i)\bapi[_-]?key\s*[:=]\s*\S+"), "api_key=[REDACTED_API_KEY]"),
|
||||
(re.compile(r"(?i)\btoken\s*[:=]\s*\S+"), "token=[REDACTED_TOKEN]"),
|
||||
(re.compile(r"(?i)\bpassword\s*[:=]\s*\S+"), "password=[REDACTED_PASSWORD]"),
|
||||
]
|
||||
|
||||
DIRECT_KEY_REDACTIONS = {
|
||||
"api_key": "[REDACTED_API_KEY]",
|
||||
"apikey": "[REDACTED_API_KEY]",
|
||||
"secret_key": "[REDACTED_API_KEY]",
|
||||
"token": "[REDACTED_TOKEN]",
|
||||
"access_token": "[REDACTED_TOKEN]",
|
||||
"auth_token": "[REDACTED_TOKEN]",
|
||||
"password": "[REDACTED_PASSWORD]",
|
||||
"passwd": "[REDACTED_PASSWORD]",
|
||||
"pwd": "[REDACTED_PASSWORD]",
|
||||
}
|
||||
|
||||
|
||||
def normalize_paths(text: str) -> str:
|
||||
for pattern, replacement in PATH_REPLACEMENTS:
|
||||
text = pattern.sub(replacement, text)
|
||||
return text
|
||||
|
||||
|
||||
def sanitize_text(text: str) -> str:
|
||||
text = normalize_paths(text)
|
||||
|
||||
for pattern, replacement in INLINE_SECRET_PATTERNS:
|
||||
text = pattern.sub(replacement, text)
|
||||
|
||||
text = EMAIL_RE.sub(
|
||||
lambda match: match.group(0)
|
||||
if match.group(0).lower() in ALLOWED_EMAILS
|
||||
else "[REDACTED_EMAIL]",
|
||||
text,
|
||||
)
|
||||
|
||||
text = IP_RE.sub(
|
||||
lambda match: match.group(0)
|
||||
if match.group(0) in ALLOWED_IPS
|
||||
else "[REDACTED_IP]",
|
||||
text,
|
||||
)
|
||||
|
||||
text = HOST_RE.sub(
|
||||
lambda match: match.group(0)
|
||||
if match.group(0).lower() in ALLOWED_HOSTS
|
||||
else "[REDACTED_HOST]",
|
||||
text,
|
||||
)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
|
||||
def sanitize_payload(payload: Any) -> Any:
|
||||
if isinstance(payload, dict):
|
||||
sanitized: dict[str, Any] = {}
|
||||
for key, value in payload.items():
|
||||
normalized_key = key.lower()
|
||||
if isinstance(value, str) and normalized_key in DIRECT_KEY_REDACTIONS:
|
||||
sanitized[key] = DIRECT_KEY_REDACTIONS[normalized_key]
|
||||
continue
|
||||
sanitized[key] = sanitize_payload(value)
|
||||
return sanitized
|
||||
|
||||
if isinstance(payload, list):
|
||||
return [sanitize_payload(item) for item in payload]
|
||||
|
||||
if isinstance(payload, str):
|
||||
return sanitize_text(payload)
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
|
||||
def sanitize_json_file(input_path: Path, output_path: Path) -> None:
|
||||
payload = json.loads(input_path.read_text())
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(sanitize_payload(payload), indent=2, sort_keys=True) + "\n")
|
||||
|
||||
|
||||
|
||||
def sanitize_jsonl_file(input_path: Path, output_path: Path) -> None:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
rows = []
|
||||
for line in input_path.read_text().splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
rows.append(json.dumps(sanitize_payload(json.loads(line)), sort_keys=True))
|
||||
output_path.write_text("\n".join(rows) + ("\n" if rows else ""))
|
||||
|
||||
|
||||
|
||||
def sanitize_file(input_path: Path, output_dir: Path) -> Path:
|
||||
output_path = output_dir / input_path.name
|
||||
if input_path.suffix == ".jsonl":
|
||||
sanitize_jsonl_file(input_path, output_path)
|
||||
else:
|
||||
sanitize_json_file(input_path, output_path)
|
||||
return output_path
|
||||
|
||||
|
||||
|
||||
def iter_input_files(input_path: Path) -> list[Path]:
|
||||
if input_path.is_file():
|
||||
return [input_path]
|
||||
return sorted(
|
||||
path for path in input_path.rglob("*") if path.is_file() and path.suffix in {".json", ".jsonl"}
|
||||
)
|
||||
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--input", required=True, help="Session file or directory to sanitize")
|
||||
parser.add_argument("--output", required=True, help="Output directory for sanitized files")
|
||||
return parser
|
||||
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = build_parser().parse_args(argv)
|
||||
input_path = Path(args.input).expanduser()
|
||||
output_dir = Path(args.output).expanduser()
|
||||
files = iter_input_files(input_path)
|
||||
for path in files:
|
||||
sanitize_file(path, output_dir)
|
||||
print(f"Sanitized {len(files)} file(s) into {output_dir}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
107
tests/test_trajectory_sanitize.py
Normal file
107
tests/test_trajectory_sanitize.py
Normal file
@@ -0,0 +1,107 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from scripts.trajectory_sanitize import sanitize_payload
|
||||
|
||||
|
||||
def test_sanitize_payload_redacts_secrets_and_normalizes_paths() -> None:
|
||||
payload = {
|
||||
"api_key": "sk-secret1234567890ABCDEF",
|
||||
"token": "ghp_12345ABCDEfghijk67890lmnoPQRST",
|
||||
"password": "supersecret",
|
||||
"notes": (
|
||||
"email user@example.com keep alexpaynex@gmail.com and "
|
||||
"alexander@alexanderwhitestone.com; "
|
||||
"ip 10.0.0.8 keep 143.198.27.163; "
|
||||
"host app.internal.local; "
|
||||
"path /Users/apayne/projects/timmy/file.txt and /home/runner/work/app.py"
|
||||
),
|
||||
"messages": [{"content": "Contact admin@corp.com and token=abc123"}],
|
||||
}
|
||||
|
||||
sanitized = sanitize_payload(payload)
|
||||
|
||||
assert sanitized["api_key"] == "[REDACTED_API_KEY]"
|
||||
assert sanitized["token"] == "[REDACTED_TOKEN]"
|
||||
assert sanitized["password"] == "[REDACTED_PASSWORD]"
|
||||
|
||||
notes = sanitized["notes"]
|
||||
assert "user@example.com" not in notes
|
||||
assert "admin@corp.com" not in sanitized["messages"][0]["content"]
|
||||
assert "[REDACTED_EMAIL]" in notes
|
||||
assert "alexpaynex@gmail.com" in notes
|
||||
assert "alexander@alexanderwhitestone.com" in notes
|
||||
assert "10.0.0.8" not in notes
|
||||
assert "143.198.27.163" in notes
|
||||
assert "[REDACTED_IP]" in notes
|
||||
assert "app.internal.local" not in notes
|
||||
assert "[REDACTED_HOST]" in notes
|
||||
assert "~/projects/timmy/file.txt" in notes
|
||||
assert "~/work/app.py" in notes
|
||||
assert "/Users/apayne/" not in notes
|
||||
assert "/home/runner/" not in notes
|
||||
|
||||
|
||||
def test_sanitize_payload_is_idempotent() -> None:
|
||||
payload = {
|
||||
"api_key": "sk-secret1234567890ABCDEF",
|
||||
"notes": "email user@example.com host app.internal.local ip 10.0.0.8",
|
||||
}
|
||||
|
||||
once = sanitize_payload(payload)
|
||||
twice = sanitize_payload(once)
|
||||
|
||||
assert twice == once
|
||||
|
||||
|
||||
def test_cli_sanitizes_json_and_jsonl_files(tmp_path: Path) -> None:
|
||||
input_dir = tmp_path / "input"
|
||||
output_dir = tmp_path / "output"
|
||||
input_dir.mkdir()
|
||||
|
||||
session_json = input_dir / "session_a.json"
|
||||
session_json.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"email": "private@example.com",
|
||||
"path": "/Users/alice/project/file.txt",
|
||||
"host": "app.internal.local",
|
||||
"ip": "10.0.0.8",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
session_jsonl = input_dir / "session_b.jsonl"
|
||||
session_jsonl.write_text(
|
||||
json.dumps({"token": "ghp_12345ABCDEfghijk67890lmnoPQRST"}) + "\n"
|
||||
)
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"-m",
|
||||
"scripts.trajectory_sanitize",
|
||||
"--input",
|
||||
str(input_dir),
|
||||
"--output",
|
||||
str(output_dir),
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
assert "Sanitized 2 file(s)" in result.stdout
|
||||
|
||||
sanitized_json = json.loads((output_dir / "session_a.json").read_text())
|
||||
assert sanitized_json["email"] == "[REDACTED_EMAIL]"
|
||||
assert sanitized_json["path"] == "~/project/file.txt"
|
||||
assert sanitized_json["host"] == "[REDACTED_HOST]"
|
||||
assert sanitized_json["ip"] == "[REDACTED_IP]"
|
||||
|
||||
sanitized_jsonl = (output_dir / "session_b.jsonl").read_text().strip().splitlines()
|
||||
assert json.loads(sanitized_jsonl[0])["token"] == "[REDACTED_TOKEN]"
|
||||
Reference in New Issue
Block a user