Compare commits
2 Commits
fix/format
...
fix/839-17
| Author | SHA1 | Date | |
|---|---|---|---|
| cb331da4f1 | |||
| fa892bfcb9 |
101
tests/test_credential_redact.py
Normal file
101
tests/test_credential_redact.py
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
"""
|
||||||
|
Tests for credential redaction
|
||||||
|
|
||||||
|
Issue: #839
|
||||||
|
"""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from tools.credential_redact import (
|
||||||
|
CredentialRedactor,
|
||||||
|
redact_credentials,
|
||||||
|
redact_tool_output,
|
||||||
|
should_mask_file,
|
||||||
|
mask_sensitive_file,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCredentialRedaction(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_openai_key(self):
|
||||||
|
text = "api_key=sk-abc123def456ghi789jkl012mno"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
self.assertNotIn("sk-abc123", redacted)
|
||||||
|
|
||||||
|
def test_github_token(self):
|
||||||
|
text = "token: ghp_1234567890abcdef1234567890abcdef12345678"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
|
||||||
|
def test_bearer_token(self):
|
||||||
|
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
|
||||||
|
def test_password(self):
|
||||||
|
text = "password: mySecretPassword123"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
|
||||||
|
def test_aws_key(self):
|
||||||
|
text = "AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
|
||||||
|
def test_database_url(self):
|
||||||
|
text = "DATABASE_URL=postgres://user:pass@localhost/db"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreater(count, 0)
|
||||||
|
self.assertIn("REDACTED", redacted)
|
||||||
|
|
||||||
|
def test_clean_text_unchanged(self):
|
||||||
|
text = "Hello world, this is a normal message"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertEqual(count, 0)
|
||||||
|
self.assertEqual(redacted, text)
|
||||||
|
|
||||||
|
def test_multiple_credentials(self):
|
||||||
|
text = "key1=sk-abc123def456ghi789jkl012mno and token: ghp_1234567890abcdef1234567890abcdef12345678"
|
||||||
|
redacted, count = redact_credentials(text)
|
||||||
|
self.assertGreaterEqual(count, 2)
|
||||||
|
|
||||||
|
|
||||||
|
class TestToolOutputRedaction(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_redaction_notice(self):
|
||||||
|
output = "Running with key sk-abc123def456ghi789jkl012mno"
|
||||||
|
redacted, notice = redact_tool_output("terminal", output)
|
||||||
|
self.assertIn("REDACTED", notice)
|
||||||
|
self.assertIn("terminal", notice)
|
||||||
|
|
||||||
|
def test_no_notice_when_clean(self):
|
||||||
|
output = "Hello world"
|
||||||
|
redacted, notice = redact_tool_output("terminal", output)
|
||||||
|
self.assertEqual(notice, "")
|
||||||
|
|
||||||
|
|
||||||
|
class TestSensitiveFileMasking(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_env_file_detected(self):
|
||||||
|
self.assertTrue(should_mask_file("/path/to/.env"))
|
||||||
|
self.assertTrue(should_mask_file("/path/to/.env.local"))
|
||||||
|
self.assertTrue(should_mask_file("/path/to/config.yaml"))
|
||||||
|
|
||||||
|
def test_normal_file_not_detected(self):
|
||||||
|
self.assertFalse(should_mask_file("/path/to/readme.md"))
|
||||||
|
self.assertFalse(should_mask_file("/path/to/code.py"))
|
||||||
|
|
||||||
|
def test_mask_env_file(self):
|
||||||
|
content = "API_KEY=sk-abc123\nDATABASE_URL=postgres://u:p@h/d\nNORMAL=value"
|
||||||
|
masked = mask_sensitive_file(content, ".env")
|
||||||
|
self.assertIn("[REDACTED]", masked)
|
||||||
|
self.assertIn("NORMAL=value", masked)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
183
tools/credential_redact.py
Normal file
183
tools/credential_redact.py
Normal file
@@ -0,0 +1,183 @@
|
|||||||
|
"""
|
||||||
|
Credential Redaction — Block silent credential exposure in tool outputs
|
||||||
|
|
||||||
|
Poka-yoke: Prevent API keys, tokens, passwords from leaking into context.
|
||||||
|
|
||||||
|
Issue: #839
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Tuple
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
HERMES_HOME = Path.home() / ".hermes"
|
||||||
|
AUDIT_DIR = HERMES_HOME / "audit"
|
||||||
|
|
||||||
|
# Credential patterns to detect and redact
|
||||||
|
CREDENTIAL_PATTERNS = [
|
||||||
|
# API keys
|
||||||
|
(r"sk-[a-zA-Z0-9]{20,}", "[REDACTED: OpenAI API key]"),
|
||||||
|
(r"sk-ant-[a-zA-Z0-9-]{20,}", "[REDACTED: Anthropic API key]"),
|
||||||
|
(r"ghp_[a-zA-Z0-9]{36}", "[REDACTED: GitHub token]"),
|
||||||
|
(r"gho_[a-zA-Z0-9]{36}", "[REDACTED: GitHub OAuth token]"),
|
||||||
|
(r"glpat-[a-zA-Z0-9-]{20,}", "[REDACTED: GitLab token]"),
|
||||||
|
|
||||||
|
# Bearer tokens
|
||||||
|
(r"Bearer\s+[a-zA-Z0-9._-]{20,}", "[REDACTED: Bearer token]"),
|
||||||
|
(r"bearer\s+[a-zA-Z0-9._-]{20,}", "[REDACTED: Bearer token]"),
|
||||||
|
|
||||||
|
# Generic tokens/passwords
|
||||||
|
(r"(?:token|TOKEN|Token)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Token]"),
|
||||||
|
(r"(?:password|PASSWORD|Password)[:=]\s*["']?[^\s"']{8,}["']?", "[REDACTED: Password]"),
|
||||||
|
(r"(?:secret|SECRET|Secret)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Secret]"),
|
||||||
|
(r"(?:api_key|API_KEY|apiKey|ApiKey)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: API key]"),
|
||||||
|
|
||||||
|
# AWS keys
|
||||||
|
(r"AKIA[0-9A-Z]{16}", "[REDACTED: AWS access key]"),
|
||||||
|
(r"(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\s*["']?[a-zA-Z0-9/+=]{40}["']?", "[REDACTED: AWS secret]"),
|
||||||
|
|
||||||
|
# Private keys
|
||||||
|
(r"-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----", "[REDACTED: Private key header]"),
|
||||||
|
|
||||||
|
# Connection strings
|
||||||
|
(r"(?:postgres|mysql|mongodb|redis)://[^:]+:[^@]+@[^\s]+", "[REDACTED: Database connection string]"),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Files that should trigger auto-masking
|
||||||
|
SENSITIVE_FILE_PATTERNS = [
|
||||||
|
r"\.env$",
|
||||||
|
r"\.env\.",
|
||||||
|
r"\.secret",
|
||||||
|
r"credentials",
|
||||||
|
r"\.token",
|
||||||
|
r"config\.yaml$",
|
||||||
|
r"config\.yml$",
|
||||||
|
r"config\.json$",
|
||||||
|
r"\.netrc$",
|
||||||
|
r"\.pgpass$",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class CredentialRedactor:
|
||||||
|
"""Redact credentials from text."""
|
||||||
|
|
||||||
|
def __init__(self, audit_log: bool = True):
|
||||||
|
self.audit_log = audit_log
|
||||||
|
self._redaction_count = 0
|
||||||
|
|
||||||
|
def redact(self, text: str) -> Tuple[str, int]:
|
||||||
|
"""
|
||||||
|
Redact credentials from text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (redacted_text, number_of_redactions)
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text, 0
|
||||||
|
|
||||||
|
redacted = text
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
for pattern, replacement in CREDENTIAL_PATTERNS:
|
||||||
|
matches = re.findall(pattern, redacted, re.IGNORECASE)
|
||||||
|
if matches:
|
||||||
|
redacted = re.sub(pattern, replacement, redacted, flags=re.IGNORECASE)
|
||||||
|
count += len(matches)
|
||||||
|
|
||||||
|
if count > 0:
|
||||||
|
self._redaction_count += count
|
||||||
|
if self.audit_log:
|
||||||
|
self._log_redaction(count, text[:100])
|
||||||
|
|
||||||
|
return redacted, count
|
||||||
|
|
||||||
|
def redact_tool_output(self, tool_name: str, output: str) -> Tuple[str, str]:
|
||||||
|
"""
|
||||||
|
Redact tool output and return notice if redactions occurred.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (redacted_output, notice_or_empty)
|
||||||
|
"""
|
||||||
|
redacted, count = self.redact(output)
|
||||||
|
|
||||||
|
if count > 0:
|
||||||
|
notice = f"[REDACTED: {count} credential pattern{'s' if count > 1 else ''} found in {tool_name} output]"
|
||||||
|
return redacted, notice
|
||||||
|
|
||||||
|
return redacted, ""
|
||||||
|
|
||||||
|
def should_mask_file(self, file_path: str) -> bool:
|
||||||
|
"""Check if file should have credentials auto-masked."""
|
||||||
|
path_lower = file_path.lower()
|
||||||
|
return any(re.search(p, path_lower) for p in SENSITIVE_FILE_PATTERNS)
|
||||||
|
|
||||||
|
def mask_file_content(self, content: str, file_path: str) -> str:
|
||||||
|
"""Mask credentials in file content while preserving structure."""
|
||||||
|
if not self.should_mask_file(file_path):
|
||||||
|
return content
|
||||||
|
|
||||||
|
lines = content.split("\n")
|
||||||
|
masked_lines = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
# Preserve key=value structure but mask values
|
||||||
|
if "=" in line and not line.strip().startswith("#"):
|
||||||
|
key, _, value = line.partition("=")
|
||||||
|
key_lower = key.strip().lower()
|
||||||
|
|
||||||
|
sensitive_keys = ["password", "secret", "token", "key", "api", "credential"]
|
||||||
|
if any(sk in key_lower for sk in sensitive_keys):
|
||||||
|
masked_lines.append(f"{key}=[REDACTED]")
|
||||||
|
else:
|
||||||
|
masked_lines.append(line)
|
||||||
|
else:
|
||||||
|
masked_lines.append(line)
|
||||||
|
|
||||||
|
return "\n".join(masked_lines)
|
||||||
|
|
||||||
|
def _log_redaction(self, count: int, preview: str):
|
||||||
|
"""Log redaction event to audit trail."""
|
||||||
|
try:
|
||||||
|
AUDIT_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
audit_file = AUDIT_DIR / "redactions.jsonl"
|
||||||
|
|
||||||
|
entry = {
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"redactions": count,
|
||||||
|
"preview_hash": hash(preview),
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(audit_file, "a") as f:
|
||||||
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Audit log failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level redactor
|
||||||
|
_redactor = CredentialRedactor()
|
||||||
|
|
||||||
|
|
||||||
|
def redact_credentials(text: str) -> Tuple[str, int]:
|
||||||
|
"""Redact credentials from text."""
|
||||||
|
return _redactor.redact(text)
|
||||||
|
|
||||||
|
|
||||||
|
def redact_tool_output(tool_name: str, output: str) -> Tuple[str, str]:
|
||||||
|
"""Redact tool output and return notice."""
|
||||||
|
return _redactor.redact_tool_output(tool_name, output)
|
||||||
|
|
||||||
|
|
||||||
|
def should_mask_file(file_path: str) -> bool:
|
||||||
|
"""Check if file should be masked."""
|
||||||
|
return _redactor.should_mask_file(file_path)
|
||||||
|
|
||||||
|
|
||||||
|
def mask_sensitive_file(content: str, file_path: str) -> str:
|
||||||
|
"""Mask credentials in sensitive file."""
|
||||||
|
return _redactor.mask_file_content(content, file_path)
|
||||||
Reference in New Issue
Block a user