Compare commits

...

1 Commits

Author SHA1 Message Date
Timmy (AI Agent)
368cda55c7 feat(security): implement PrivacyFilter for remote API calls (#283)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m15s
Add agent/privacy_filter.py — PII redaction layer that strips sensitive
data from messages before they leave the local machine and hit remote
LLM providers.

Detects and redacts:
- Email addresses
- Phone numbers (E.164, US formats)
- US Social Security Numbers
- Crypto wallet addresses (Bitcoin, Ethereum)
- Private file paths (/home/*, /Users/*, C:\Users\*)
- PEM private key blocks

Key API:
- filter_text(text) — string-level redaction
- filter_messages(messages) — deep-copy message list filter
- has_sensitive_content(messages) — category detection
- should_route_local(messages, base_url) — routing decision
- prepare_for_remote(messages, base_url) — drop-in filter hook

Provider-aware: skips filtering for localhost/127.0.0.1 endpoints.
Config: HERMES_PRIVACY_FILTER=0 to disable, FORCE=1 to force even local.

59 tests covering all redaction categories, message formats (string content,
multimodal parts, tool call arguments in both direct and OpenAI function
format), provider routing, and integration scenarios.

Closes #283
2026-04-13 17:47:43 -04:00
2 changed files with 841 additions and 0 deletions

426
agent/privacy_filter.py Normal file
View File

@@ -0,0 +1,426 @@
"""Privacy filter for remote API calls — PII redaction before wire transit.
Strips personally identifiable information (PII) from messages before they
leave the local machine and hit a remote LLM provider. Designed to sit
between the message list and the API client so local model routing can
bypass it entirely.
Sensitive categories detected:
- Email addresses
- Phone numbers (E.164 and common formats)
- Physical addresses / private file paths
- Crypto wallet addresses (Bitcoin, Ethereum, generic EVM)
- SSN / government ID patterns
- Real names (opt-in via config)
Integration point: call ``filter_messages()`` on the ``api_messages`` list
inside ``_build_api_kwargs()`` or just before ``_interruptible_api_call()``
when the active provider is a remote endpoint (not localhost).
"""
from __future__ import annotations
import copy
import json
import logging
import os
import re
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration — snapshot at import time
# ---------------------------------------------------------------------------
_ENV = os.getenv
#: If True, privacy filtering is enabled by default. Can be toggled via
#: ``HERMES_PRIVACY_FILTER=0`` to disable.
_PRIVACY_FILTER_ENABLED: bool = _ENV("HERMES_PRIVACY_FILTER", "").lower() not in (
"0",
"false",
"no",
"off",
)
#: If True, filter is on even when the provider looks local (for testing).
_FORCE_FILTER: bool = _ENV("HERMES_PRIVACY_FILTER_FORCE", "").lower() in (
"1",
"true",
"yes",
"on",
)
#: Tokens shorter than this are fully masked; longer ones get prefix+suffix.
_MASK_THRESHOLD = 8
# ---------------------------------------------------------------------------
# Pattern catalogue — PII and sensitive data detectors
# ---------------------------------------------------------------------------
#: RFC 5322-lite email pattern (covers 99% of real addresses).
_EMAIL_RE = re.compile(
r"""(?<![A-Za-z0-9._%+\-])"""
r"""([A-Za-z0-9._%+\-]+)@([A-Za-z0-9.\-]+\.[A-Za-z]{2,})"""
r"""(?![A-Za-z0-9._%+\-])"""
)
#: E.164 phone numbers: +1… through +9…, 7-15 digits.
#: Also catches common US formats like (555) 123-4567 and 555-123-4567.
_PHONE_E164_RE = re.compile(r"(\+[1-9]\d{6,14})(?![\d])")
_PHONE_US_RE = re.compile(
r"""(?:\+?1[\s.-]?)?""" # optional country code
r"""(?:\(?[2-9]\d{2}\)?[\s.-]?)""" # area code
r"""(?:[2-9]\d{2}[\s.-]?)""" # exchange
r"""(?:\d{4})""" # subscriber
r"""(?![\d])"""
)
#: US Social Security Number: XXX-XX-XXXX (with exclusion of 000/666/9xx area).
_SSN_RE = re.compile(
r"""(?<!\d)"""
r"""(?!000|666|9\d{2})\d{3}"""
r"""[\s-]"""
r"""(?!00)\d{2}"""
r"""[\s-]"""
r"""(?!0000)\d{4}"""
r"""(?!\d)"""
)
#: Crypto wallet addresses.
#: Bitcoin: starts with 1, 3, or bc1 — 25-39 chars (legacy) or 42-62 (bech32).
_BITCOIN_RE = re.compile(r"\b([13][a-km-zA-HJ-NP-Z1-9]{25,35}|bc1[a-zA-HJ-NP-Z0-9]{25,49})\b")
#: Ethereum / EVM: 0x + 40 hex chars.
_ETHEREUM_RE = re.compile(r"\b(0x[a-fA-F0-9]{40})\b")
#: Generic long hex that looks like a wallet (>= 32 hex chars, not git hashes
#: which are usually short or have context clues).
_GENERIC_WALLET_RE = re.compile(r"\b(0x[a-fA-F0-9]{32,})\b")
#: Unix home paths: /home/user, /Users/username, /root
_UNIX_HOME_PATH_RE = re.compile(
r"""(?:/home/[\w.\-]+|/Users/[\w.\-]+|/root)(?:/[\w.\-]+)*"""
)
#: Windows user profile paths: C:\Users\username
_WIN_HOME_PATH_RE = re.compile(
r"""[A-Z]:\\Users\\[\w.\-]+(?:\\[\w.\-]+)*""", re.IGNORECASE
)
#: SSH keys, GPG keys, PEM private keys — entire blocks.
_PRIVATE_KEY_BLOCK_RE = re.compile(
r"""-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----"""
)
#: Common "name:" patterns in structured input (YAML, JSON, form data).
#: Only matches when followed by a plausible 2+ word name.
_NAME_FIELD_RE = re.compile(
r"""(?:\"name\"\s*:\s*\"|name:\s*)([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)"""
)
# ---------------------------------------------------------------------------
# Masking helpers
# ---------------------------------------------------------------------------
def _mask_value(value: str, visible: int = 4) -> str:
"""Mask a string, keeping at most *visible* chars at each end."""
if len(value) <= _MASK_THRESHOLD:
return "[REDACTED]"
keep = max(2, min(visible, len(value) // 4))
return f"{value[:keep]}{value[-keep:]}"
def _mask_email(m: re.Match) -> str:
user, domain = m.group(1), m.group(2)
masked_user = user[0] + "" if len(user) > 1 else ""
return f"{masked_user}@{domain}"
def _mask_phone(m: re.Match) -> str:
raw = m.group(0)
digits = re.sub(r"\D", "", raw)
if len(digits) <= 6:
return "[REDACTED-PHONE]"
return f"+{'*' * (len(digits) - 4)}{digits[-4:]}"
def _mask_wallet(m: re.Match) -> str:
addr = m.group(1)
if addr.startswith("0x"):
return f"0x{'*' * 6}{addr[-4:]}"
if addr.startswith("bc1"):
return f"bc1{'*' * 4}{addr[-4:]}"
# Legacy Bitcoin
return f"{addr[:4]}{'*' * 4}{addr[-4:]}"
def _mask_path(m: re.Match) -> str:
raw = m.group(0)
parts = raw.replace("\\", "/").split("/")
if len(parts) >= 3:
return f"{parts[0]}/{parts[1]}/[REDACTED-PATH]"
return "[REDACTED-PATH]"
# ---------------------------------------------------------------------------
# Core filtering — string level
# ---------------------------------------------------------------------------
#: Ordered list of (compiled_replacement_tuple) applied to every string.
_FILTER_RULES: list[tuple[re.Pattern, Any]] = [
# 1. Private key blocks — must run first (multi-line)
(_PRIVATE_KEY_BLOCK_RE, "[REDACTED-PRIVATE-KEY]"),
# 2. Emails
(_EMAIL_RE, _mask_email),
# 3. Phone numbers — E.164 first, then US format
(_PHONE_E164_RE, _mask_phone),
(_PHONE_US_RE, _mask_phone),
# 4. SSN
(_SSN_RE, lambda m: f"{'*' * 3}-{m.group(0)[-6:-5]}{'*' * 2}-{m.group(0)[-4:]}"),
# 5. Crypto wallets — Bitcoin then Ethereum then generic
(_BITCOIN_RE, _mask_wallet),
(_ETHEREUM_RE, _mask_wallet),
(_GENERIC_WALLET_RE, _mask_wallet),
# 6. File paths with user dirs
(_UNIX_HOME_PATH_RE, _mask_path),
(_WIN_HOME_PATH_RE, _mask_path),
]
def filter_text(text: str) -> str:
"""Apply all privacy filter rules to a single string.
Safe for any string input — non-matching text passes through unchanged.
"""
if text is None:
return ""
if not text:
return text
for pattern, replacement in _FILTER_RULES:
if callable(replacement) and not isinstance(replacement, str):
text = pattern.sub(replacement, text)
else:
text = pattern.sub(replacement, text)
return text
# ---------------------------------------------------------------------------
# Detection — is this content sensitive?
# ---------------------------------------------------------------------------
#: Patterns whose mere presence indicates "route to local model only".
_SENSITIVE_DETECTION_RULES: list[tuple[str, re.Pattern]] = [
("email", _EMAIL_RE),
("phone", _PHONE_E164_RE),
("phone_us", _PHONE_US_RE),
("ssn", _SSN_RE),
("bitcoin_wallet", _BITCOIN_RE),
("ethereum_wallet", _ETHEREUM_RE),
("private_key", _PRIVATE_KEY_BLOCK_RE),
("user_path_unix", _UNIX_HOME_PATH_RE),
("user_path_win", _WIN_HOME_PATH_RE),
]
def detect_sensitive(text: str) -> list[str]:
"""Return a list of sensitive categories found in *text*.
Empty list means the text is safe for remote APIs (after filtering).
Non-empty list means the text *contains* PII — the caller should
consider routing to a local model instead.
"""
if not text:
return []
found = []
for name, pattern in _SENSITIVE_DETECTION_RULES:
if pattern.search(text):
found.append(name)
return found
# ---------------------------------------------------------------------------
# Message-level filtering
# ---------------------------------------------------------------------------
def _extract_text_from_content(content: Any) -> str:
"""Extract plain text from OpenAI message content (str or list of parts)."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
parts.append(part.get("text", ""))
elif part.get("type") == "tool_result":
# tool_result content can be nested
inner = part.get("content", "")
if isinstance(inner, str):
parts.append(inner)
elif isinstance(inner, list):
for p in inner:
if isinstance(p, dict) and p.get("type") == "text":
parts.append(p.get("text", ""))
elif isinstance(part, str):
parts.append(part)
return "\n".join(parts)
return str(content)
def _set_content_text(content: Any, filtered: str) -> Any:
"""Reconstruct content structure with filtered text."""
if content is None:
return None
if isinstance(content, str):
return filtered
if isinstance(content, list):
result = []
text_idx = 0
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
result.append({**part, "text": filtered if text_idx == 0 else part.get("text", "")})
text_idx += 1
elif isinstance(part, dict) and part.get("type") == "tool_result":
inner = part.get("content", "")
if isinstance(inner, str):
result.append({**part, "content": filter_text(inner)})
else:
result.append(part)
else:
result.append(part)
return result
return filtered
def filter_messages(messages: list[dict]) -> list[dict]:
"""Return a deep-copied message list with PII redacted.
Each message's ``content`` field is filtered. Tool call arguments
(``arguments`` inside ``tool_calls``) are also filtered as JSON strings.
``name`` fields inside message dicts are left untouched (they are
role labels, not PII).
"""
if not messages:
return messages
filtered = copy.deepcopy(messages)
for msg in filtered:
if not isinstance(msg, dict):
continue
# Filter content
if "content" in msg:
raw = _extract_text_from_content(msg["content"])
msg["content"] = _set_content_text(msg["content"], filter_text(raw))
# Filter tool call arguments (they arrive as JSON strings)
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict):
# Direct arguments field
args = tc.get("arguments")
if isinstance(args, str):
tc["arguments"] = filter_text(args)
# OpenAI function format: tc["function"]["arguments"]
func = tc.get("function")
if isinstance(func, dict):
fargs = func.get("arguments")
if isinstance(fargs, str):
func["arguments"] = filter_text(fargs)
return filtered
def has_sensitive_content(messages: list[dict]) -> list[str]:
"""Scan messages and return all sensitive categories found.
Returns empty list if no PII detected (safe for remote).
"""
categories: set[str] = set()
for msg in messages:
if not isinstance(msg, dict):
continue
raw = _extract_text_from_content(msg.get("content", ""))
categories.update(detect_sensitive(raw))
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict):
args = tc.get("arguments", "")
if isinstance(args, str):
categories.update(detect_sensitive(args))
# OpenAI function format
func = tc.get("function")
if isinstance(func, dict):
fargs = func.get("arguments", "")
if isinstance(fargs, str):
categories.update(detect_sensitive(fargs))
return sorted(categories)
# ---------------------------------------------------------------------------
# Provider routing helpers
# ---------------------------------------------------------------------------
_LOCAL_PATTERNS = (
"localhost",
"127.0.0.1",
"::1",
"0.0.0.0",
)
def is_remote_provider(base_url: str) -> bool:
"""Return True if *base_url* points to a remote (non-local) provider."""
if not base_url:
return False # assume local if unset
lower = base_url.lower()
return not any(h in lower for h in _LOCAL_PATTERNS)
def should_route_local(messages: list[dict], base_url: str) -> tuple[bool, list[str]]:
"""Decide whether messages should stay on local models.
Returns ``(should_local, reasons)`` where *reasons* lists the
sensitive categories detected. If *base_url* is already local,
returns ``(False, [])`` since there's no need to re-route.
"""
if not is_remote_provider(base_url):
return False, []
if not _PRIVACY_FILTER_ENABLED and not _FORCE_FILTER:
return False, []
reasons = has_sensitive_content(messages)
return bool(reasons), reasons
# ---------------------------------------------------------------------------
# Integration hook — drop-in replacement for the API call path
# ---------------------------------------------------------------------------
def prepare_for_remote(messages: list[dict], base_url: str) -> tuple[list[dict], list[str]]:
"""Filter messages for a remote API call.
Returns ``(filtered_messages, detected_categories)``.
If the endpoint is local or the filter is disabled, returns the
original messages unchanged with an empty category list.
"""
if not is_remote_provider(base_url):
return messages, []
if not _PRIVACY_FILTER_ENABLED and not _FORCE_FILTER:
return messages, []
categories = has_sensitive_content(messages)
if categories:
logger.info(
"PrivacyFilter: redacting %d sensitive category match(es) before remote call: %s",
len(categories),
", ".join(categories),
)
return filter_messages(messages), categories

View File

@@ -0,0 +1,415 @@
"""Tests for agent.privacy_filter — PII redaction for remote API calls."""
import os
import pytest
# Ensure the filter is active for all tests
@pytest.fixture(autouse=True)
def _enable_filter(monkeypatch):
monkeypatch.delenv("HERMES_PRIVACY_FILTER", raising=False)
monkeypatch.setattr("agent.privacy_filter._PRIVACY_FILTER_ENABLED", True)
monkeypatch.setattr("agent.privacy_filter._FORCE_FILTER", True)
from agent.privacy_filter import (
filter_text,
filter_messages,
detect_sensitive,
has_sensitive_content,
is_remote_provider,
should_route_local,
prepare_for_remote,
)
# ═══════════════════════════════════════════════════════════════════════════
# filter_text — string-level redaction
# ═══════════════════════════════════════════════════════════════════════════
class TestEmailRedaction:
def test_simple_email(self):
result = filter_text("Contact me at alice@example.com for details.")
assert "alice@example.com" not in result
assert "a…@example.com" in result
def test_email_with_dots(self):
result = filter_text("john.doe+work@corp.co.uk")
assert "john.doe+work@corp.co.uk" not in result
def test_multiple_emails(self):
text = "CC: first@test.io and second@test.io"
result = filter_text(text)
assert "first@test.io" not in result
assert "second@test.io" not in result
def test_email_in_code_block(self):
text = "config: { email: 'dev@company.com' }"
result = filter_text(text)
assert "dev@company.com" not in result
class TestPhoneRedaction:
def test_e164_format(self):
result = filter_text("Call me at +14155551234")
assert "+14155551234" not in result
assert "1234" in result # last 4 visible
def test_us_with_dashes(self):
result = filter_text("Phone: 415-555-1234")
assert "415-555-1234" not in result
def test_us_with_parens(self):
result = filter_text("Phone: (415) 555-1234")
assert "415" not in result or "555-1234" not in result
def test_international(self):
result = filter_text("WhatsApp: +442071234567")
assert "+442071234567" not in result
def test_short_number_not_redacted(self):
# 4-digit extension should pass through
result = filter_text("Ext: 1234")
assert "1234" in result
class TestSSNRedaction:
def test_ssn(self):
result = filter_text("SSN: 123-45-6789")
assert "6789" in result or "[REDACTED" in result
assert "123-45-6789" not in result
def test_ssn_no_dashes(self):
result = filter_text("123 45 6789")
assert "123 45 6789" not in result
class TestWalletRedaction:
def test_bitcoin_legacy(self):
addr = "1BvBMSEYstWetqTFn5Au4m4GFg7xJaNVN2"
result = filter_text(f"Send to {addr}")
assert addr not in result
assert "1BvB" in result # prefix preserved
assert "NVN2" in result # suffix preserved
def test_bitcoin_bech32(self):
addr = "bc1qxy2kgdygjrsqtzq2n0yrf2493p83kkfjhx0wlh"
result = filter_text(f"Wallet: {addr}")
assert addr not in result
assert "bc1" in result
def test_ethereum(self):
addr = "0x742d35Cc6634C0532925a3b844Bc9e7595f8Ca39"
result = filter_text(f"ETH: {addr}")
assert addr not in result
assert "0x" in result
assert "Ca39" in result
def test_multiple_wallets(self):
btc = "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"
eth = "0x0000000000000000000000000000000000000000"
result = filter_text(f"{btc} and {eth}")
assert btc not in result
assert eth not in result
class TestPathRedaction:
def test_unix_home(self):
result = filter_text("File at /home/alice/secrets/key.pem")
assert "/home/alice/secrets" not in result
assert "/home" in result
def test_macos_home(self):
result = filter_text("Path: /Users/bob/Documents/taxes.pdf")
assert "/Users/bob/Documents" not in result
def test_windows_path(self):
result = filter_text("C:\\Users\\Charlie\\Desktop\\notes.txt")
assert "Charlie" not in result
def test_relative_path_unchanged(self):
text = "File: ./src/main.py"
result = filter_text(text)
assert result == text
def test_system_path_unchanged(self):
text = "Binary at /usr/local/bin/python"
assert filter_text(text) == text
class TestPrivateKeyRedaction:
def test_pem_key(self):
key = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASC\n-----END PRIVATE KEY-----"
result = filter_text(f"Key: {key}")
assert "MIIEvQIBADAN" not in result
assert "[REDACTED" in result
def test_rsa_key(self):
key = "-----BEGIN RSA PRIVATE KEY-----\ndata\n-----END RSA PRIVATE KEY-----"
result = filter_text(key)
assert "data" not in result
class TestPassthrough:
def test_normal_text(self):
text = "Hello, please write a function that sorts a list."
assert filter_text(text) == text
def test_code(self):
text = "def hello():\n print('world')\n return 42"
assert filter_text(text) == text
def test_empty_string(self):
assert filter_text("") == ""
def test_none(self):
assert filter_text(None) == ""
def test_technical_discussion(self):
text = "The model uses CUDA 12.1 with tensor cores for FP16."
assert filter_text(text) == text
def test_api_url_unchanged(self):
text = "Connect to https://api.openai.com/v1/chat/completions"
assert filter_text(text) == text
# ═══════════════════════════════════════════════════════════════════════════
# detect_sensitive — category detection
# ═══════════════════════════════════════════════════════════════════════════
class TestDetection:
def test_no_pii(self):
assert detect_sensitive("Hello world") == []
def test_detects_email(self):
cats = detect_sensitive("Email me at alice@example.com")
assert "email" in cats
def test_detects_phone(self):
cats = detect_sensitive("Call +14155551234")
assert "phone" in cats
def test_detects_wallet(self):
cats = detect_sensitive("My BTC: 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa")
assert "bitcoin_wallet" in cats
def test_detects_eth(self):
addr = "0x742d35Cc6634C0532925a3b844Bc9e7595f8Ca39"
cats = detect_sensitive(f"ETH addr: {addr}")
assert "ethereum_wallet" in cats
def test_detects_multiple(self):
cats = detect_sensitive("alice@test.com +14155551234")
assert "email" in cats
assert "phone" in cats
def test_empty(self):
assert detect_sensitive("") == []
def test_none(self):
assert detect_sensitive(None) == []
# ═══════════════════════════════════════════════════════════════════════════
# filter_messages — message list level
# ═══════════════════════════════════════════════════════════════════════════
class TestMessageFiltering:
def test_filters_content_string(self):
messages = [
{"role": "user", "content": "My email is bob@example.com, please remember it."}
]
result = filter_messages(messages)
assert "bob@example.com" not in result[0]["content"]
# Original unchanged (deep copy)
assert "bob@example.com" in messages[0]["content"]
def test_filters_content_parts(self):
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Here's my SSN: 123-45-6789"},
{"type": "image_url", "image_url": {"url": "https://img.com/a.png"}},
],
}
]
result = filter_messages(messages)
text_part = [p for p in result[0]["content"] if p.get("type") == "text"][0]
assert "123-45-6789" not in text_part["text"]
# Image URL untouched
img_part = [p for p in result[0]["content"] if p.get("type") == "image_url"][0]
assert img_part["image_url"]["url"] == "https://img.com/a.png"
def test_filters_tool_call_arguments(self):
messages = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "send_email",
"arguments": '{"to": "alice@example.com", "body": "Hi Alice"}',
},
}
],
}
]
result = filter_messages(messages)
args_str = result[0]["tool_calls"][0]["function"]["arguments"]
assert "alice@example.com" not in args_str
def test_preserves_system_message(self):
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
]
result = filter_messages(messages)
assert result[0]["content"] == "You are a helpful assistant."
assert result[1]["content"] == "Hello!"
def test_deep_copy_safety(self):
original = [{"role": "user", "content": "test@example.com is my email"}]
result = filter_messages(original)
# Modifying result doesn't affect original
result[0]["content"] = "modified"
assert "test@example.com" in original[0]["content"]
def test_handles_none_content(self):
messages = [{"role": "assistant", "content": None, "tool_calls": []}]
result = filter_messages(messages)
assert result[0]["content"] is None
def test_handles_empty_messages(self):
assert filter_messages([]) == []
def test_preserves_tool_result_content(self):
messages = [
{
"role": "tool",
"content": "Found file at /usr/bin/secret but paths like /home/alice/x should be redacted",
"tool_call_id": "call_123",
}
]
result = filter_messages(messages)
assert "/home/alice" not in result[0]["content"]
assert "/usr/bin" in result[0]["content"] # system path preserved
# ═══════════════════════════════════════════════════════════════════════════
# has_sensitive_content — message-level detection
# ═══════════════════════════════════════════════════════════════════════════
class TestHasSensitiveContent:
def test_clean_messages(self):
messages = [{"role": "user", "content": "Write me a poem"}]
assert has_sensitive_content(messages) == []
def test_email_detected(self):
messages = [{"role": "user", "content": "email me at a@b.com"}]
cats = has_sensitive_content(messages)
assert "email" in cats
def test_tool_args_scanned(self):
messages = [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"function": {
"name": "search",
"arguments": '{"query": "user +14155551234"}',
}
}
],
}
]
cats = has_sensitive_content(messages)
assert "phone" in cats
# ═══════════════════════════════════════════════════════════════════════════
# Provider routing
# ═══════════════════════════════════════════════════════════════════════════
class TestProviderRouting:
def test_remote_openai(self):
assert is_remote_provider("https://api.openai.com/v1") is True
def test_remote_openrouter(self):
assert is_remote_provider("https://openrouter.ai/api/v1") is True
def test_local_localhost(self):
assert is_remote_provider("http://localhost:11434/v1") is False
def test_local_127(self):
assert is_remote_provider("http://127.0.0.1:8080/v1") is False
def test_empty_assumes_local(self):
assert is_remote_provider("") is False
def test_route_local_with_pii(self):
messages = [{"role": "user", "content": "My email: a@b.com"}]
should, reasons = should_route_local(messages, "https://api.openai.com/v1")
assert should is True
assert "email" in reasons
def test_no_route_without_pii(self):
messages = [{"role": "user", "content": "Hello!"}]
should, reasons = should_route_local(messages, "https://api.openai.com/v1")
assert should is False
def test_no_route_for_local_provider(self):
messages = [{"role": "user", "content": "Email: a@b.com"}]
should, reasons = should_route_local(messages, "http://localhost:11434/v1")
assert should is False
# ═══════════════════════════════════════════════════════════════════════════
# prepare_for_remote — integration hook
# ═══════════════════════════════════════════════════════════════════════════
class TestPrepareForRemote:
def test_filters_remote_with_pii(self):
messages = [
{"role": "user", "content": "Send to alice@test.com, wallet 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"},
]
result, cats = prepare_for_remote(messages, "https://api.openai.com/v1")
assert "alice@test.com" not in result[0]["content"]
assert "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa" not in result[0]["content"]
assert "email" in cats
assert "bitcoin_wallet" in cats
def test_passes_through_local(self):
messages = [{"role": "user", "content": "Email: a@b.com"}]
result, cats = prepare_for_remote(messages, "http://localhost:11434/v1")
assert result is messages # same object
assert cats == []
def test_passes_through_clean_remote(self):
messages = [{"role": "user", "content": "Sort this list"}]
result, cats = prepare_for_remote(messages, "https://api.openai.com/v1")
assert cats == []
assert result[0]["content"] == "Sort this list"
def test_realistic_conversation(self):
"""Full conversation with mixed sensitive and safe messages."""
messages = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "Help me write a Python HTTP server."},
{"role": "assistant", "content": "Here's a simple example:\n```python\nimport http.server\n```"},
{"role": "user", "content": "Great! Now deploy it to my server at /home/deploy/app. My email is admin@mycompany.com"},
]
result, cats = prepare_for_remote(messages, "https://api.openai.com/v1")
# Safe messages unchanged
assert result[0]["content"] == messages[0]["content"]
assert result[1]["content"] == messages[1]["content"]
# Sensitive message filtered
assert "admin@mycompany.com" not in result[3]["content"]
assert "/home/deploy" not in result[3]["content"]
assert "email" in cats
assert "user_path_unix" in cats