Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 1m25s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m23s
Tests / e2e (pull_request) Successful in 8m23s
Tests / test (pull_request) Failing after 47m8s
270 lines
8.8 KiB
Python
270 lines
8.8 KiB
Python
"""Credential Redaction — Poka-yoke for tool outputs.
|
|
|
|
Blocks silent credential exposure by redacting API keys, tokens, and
|
|
passwords from tool outputs before they enter agent context.
|
|
|
|
Issue #839: Poka-yoke: Block silent credential exposure in tool outputs
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Audit log path
|
|
_AUDIT_DIR = Path.home() / ".hermes" / "audit"
|
|
_AUDIT_LOG = _AUDIT_DIR / "redactions.jsonl"
|
|
|
|
# Credential patterns — order matters (most specific first)
|
|
_CREDENTIAL_PATTERNS = [
|
|
# API keys
|
|
(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED: OpenAI-style API key]'),
|
|
(r'sk-ant-[a-zA-Z0-9-]{20,}', '[REDACTED: Anthropic API key]'),
|
|
(r'ghp_[a-zA-Z0-9]{36}', '[REDACTED: GitHub PAT]'),
|
|
(r'gho_[a-zA-Z0-9]{36}', '[REDACTED: GitHub OAuth token]'),
|
|
(r'github_pat_[a-zA-Z0-9_]{82}', '[REDACTED: GitHub fine-grained PAT]'),
|
|
(r'glpat-[a-zA-Z0-9-]{20,}', '[REDACTED: GitLab PAT]'),
|
|
(r'syt_[a-zA-Z0-9_-]{40,}', '[REDACTED: Matrix access token]'),
|
|
(r'xoxb-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack bot token]'),
|
|
(r'xoxp-[0-9]{10,}-[a-zA-Z0-9]{20,}', '[REDACTED: Slack user token]'),
|
|
|
|
# Bearer tokens
|
|
(r'Bearer\s+[a-zA-Z0-9_.-]{20,}', '[REDACTED: Bearer token]'),
|
|
|
|
# Generic tokens/passwords in assignments
|
|
(r'(?:token|api_key|api_key|secret|password|passwd|pwd)\s*[:=]\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: credential]'),
|
|
|
|
# Environment variable assignments
|
|
(r'(?:export\s+)?(?:TOKEN|KEY|SECRET|PASSWORD|API_KEY)\s*=\s*["\']?([a-zA-Z0-9_.-]{8,})["\']?', '[REDACTED: env credential]'),
|
|
|
|
# Base64 encoded credentials (high entropy strings)
|
|
(r'(?:authorization|auth)\s*[:=]\s*(?:basic|bearer)\s+[a-zA-Z0-9+/=]{20,}', '[REDACTED: auth header]'),
|
|
|
|
# AWS credentials
|
|
(r'AKIA[0-9A-Z]{16}', '[REDACTED: AWS access key]'),
|
|
(r'(?<![A-Z0-9])[A-Za-z0-9/+=]{40}(?![A-Z0-9])', None), # Only match near context
|
|
|
|
# Private keys
|
|
(r'-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----', '[REDACTED: private key block]'),
|
|
]
|
|
|
|
|
|
class RedactionResult:
|
|
"""Result of credential redaction."""
|
|
|
|
def __init__(self, text: str, original: str, redactions: List[Dict[str, Any]]):
|
|
self.text = text
|
|
self.original = original
|
|
self.redactions = redactions
|
|
|
|
@property
|
|
def was_redacted(self) -> bool:
|
|
return len(self.redactions) > 0
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return len(self.redactions)
|
|
|
|
def notice(self) -> str:
|
|
"""Generate compact redaction notice."""
|
|
if not self.was_redacted:
|
|
return ""
|
|
return f"[REDACTED: {self.count} credential pattern{'s' if self.count > 1 else ''} found]"
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"redacted": self.was_redacted,
|
|
"count": self.count,
|
|
"notice": self.notice(),
|
|
"patterns": [r["pattern_name"] for r in self.redactions],
|
|
}
|
|
|
|
|
|
def redact_credentials(text: str, source: str = "unknown") -> RedactionResult:
|
|
"""Redact credentials from text.
|
|
|
|
Args:
|
|
text: Text to redact
|
|
source: Source identifier for audit logging
|
|
|
|
Returns:
|
|
RedactionResult with redacted text and metadata
|
|
"""
|
|
if not text:
|
|
return RedactionResult(text, text, [])
|
|
|
|
redactions = []
|
|
result = text
|
|
|
|
for pattern, replacement in _CREDENTIAL_PATTERNS:
|
|
if replacement is None:
|
|
continue # Skip conditional patterns
|
|
|
|
matches = list(re.finditer(pattern, result, re.IGNORECASE))
|
|
for match in matches:
|
|
redactions.append({
|
|
"pattern_name": replacement,
|
|
"position": match.start(),
|
|
"length": len(match.group()),
|
|
"source": source,
|
|
"timestamp": time.time(),
|
|
})
|
|
|
|
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE)
|
|
|
|
redaction_result = RedactionResult(result, text, redactions)
|
|
|
|
# Log to audit trail
|
|
if redaction_result.was_redacted:
|
|
_log_redaction(redaction_result, source)
|
|
|
|
return redaction_result
|
|
|
|
|
|
def _log_redaction(result: RedactionResult, source: str) -> None:
|
|
"""Log redaction event to audit trail."""
|
|
try:
|
|
_AUDIT_DIR.mkdir(parents=True, exist_ok=True)
|
|
entry = {
|
|
"timestamp": time.time(),
|
|
"source": source,
|
|
"count": result.count,
|
|
"patterns": [r["pattern_name"] for r in result.redactions],
|
|
}
|
|
with open(_AUDIT_LOG, "a") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
except Exception as e:
|
|
logger.debug(f"Failed to log redaction: {e}")
|
|
|
|
|
|
def should_auto_mask(file_path: str) -> bool:
|
|
"""Check if file should have credentials auto-masked."""
|
|
path_lower = file_path.lower()
|
|
sensitive_patterns = [
|
|
".env", "config", "token", "secret", "credential",
|
|
"key", "auth", "password", ".pem", ".key",
|
|
]
|
|
return any(p in path_lower for p in sensitive_patterns)
|
|
|
|
|
|
def mask_config_values(text: str) -> str:
|
|
"""Mask credential values in config/env files while preserving structure.
|
|
|
|
Transforms:
|
|
API_KEY=sk-abc123 → API_KEY=[REDACTED]
|
|
token: "ghp_xyz" → token: "[REDACTED]"
|
|
"""
|
|
lines = text.split("\n")
|
|
result = []
|
|
|
|
for line in lines:
|
|
# Match KEY=VALUE patterns
|
|
match = re.match(r'^(\s*(?:export\s+)?[A-Z_][A-Z0-9_]*)\s*=\s*(.*)', line)
|
|
if match:
|
|
key = match.group(1)
|
|
value = match.group(2).strip()
|
|
|
|
# Check if key looks credential-like
|
|
key_lower = key.lower()
|
|
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
|
|
if value and not value.startswith("[REDACTED]"):
|
|
# Preserve quotes
|
|
if value.startswith('"') and value.endswith('"'):
|
|
result.append(f'{key}="[REDACTED]"')
|
|
elif value.startswith("'") and value.endswith("'"):
|
|
result.append(f"{key}='[REDACTED]'")
|
|
else:
|
|
result.append(f"{key}=[REDACTED]")
|
|
continue
|
|
|
|
# Match YAML-style key: value
|
|
match = re.match(r'^(\s*[a-z_][a-z0-9_]*)\s*:\s*["\']?(.*?)["\']?\s*$', line)
|
|
if match:
|
|
key = match.group(1)
|
|
value = match.group(2).strip()
|
|
|
|
key_lower = key.lower()
|
|
if any(p in key_lower for p in ["key", "token", "secret", "password", "auth"]):
|
|
if value and not value.startswith("[REDACTED]"):
|
|
result.append(f'{key}: "[REDACTED]"')
|
|
continue
|
|
|
|
result.append(line)
|
|
|
|
return "\n".join(result)
|
|
|
|
|
|
def redact_tool_output(
|
|
tool_name: str,
|
|
output: Any,
|
|
source: str = None,
|
|
) -> Tuple[Any, Optional[str]]:
|
|
"""Redact credentials from tool output.
|
|
|
|
Args:
|
|
tool_name: Name of the tool
|
|
output: Tool output (string or dict)
|
|
source: Source identifier (defaults to tool_name)
|
|
|
|
Returns:
|
|
Tuple of (redacted_output, notice)
|
|
"""
|
|
source = source or tool_name
|
|
|
|
if isinstance(output, str):
|
|
result = redact_credentials(output, source)
|
|
if result.was_redacted:
|
|
return result.text, result.notice()
|
|
return output, None
|
|
|
|
if isinstance(output, dict):
|
|
# Redact string values in dict
|
|
redacted = {}
|
|
notices = []
|
|
for key, value in output.items():
|
|
if isinstance(value, str):
|
|
r, n = redact_tool_output(tool_name, value, f"{source}.{key}")
|
|
redacted[key] = r
|
|
if n:
|
|
notices.append(n)
|
|
else:
|
|
redacted[key] = value
|
|
|
|
notice = "; ".join(notices) if notices else None
|
|
return redacted, notice
|
|
|
|
# Non-string, non-dict: pass through
|
|
return output, None
|
|
|
|
|
|
def get_redaction_stats() -> Dict[str, Any]:
|
|
"""Get redaction statistics from audit log."""
|
|
stats = {
|
|
"total_redactions": 0,
|
|
"by_source": {},
|
|
"by_pattern": {},
|
|
}
|
|
|
|
if not _AUDIT_LOG.exists():
|
|
return stats
|
|
|
|
try:
|
|
with open(_AUDIT_LOG, "r") as f:
|
|
for line in f:
|
|
entry = json.loads(line.strip())
|
|
stats["total_redactions"] += entry.get("count", 0)
|
|
|
|
source = entry.get("source", "unknown")
|
|
stats["by_source"][source] = stats["by_source"].get(source, 0) + 1
|
|
|
|
for pattern in entry.get("patterns", []):
|
|
stats["by_pattern"][pattern] = stats["by_pattern"].get(pattern, 0) + 1
|
|
except Exception:
|
|
pass
|
|
|
|
return stats
|