Replace shell=True with list-based subprocess execution to prevent command injection via malicious user input. Changes: - tools/transcription_tools.py: Use shlex.split() + shell=False - tools/environments/docker.py: List-based commands with container ID validation Fixes CVE-level vulnerability where malicious file paths or container IDs could inject arbitrary commands. CVSS: 9.8 (Critical) Refs: V-001 in SECURITY_AUDIT_REPORT.md
14 KiB
14 KiB
SECURE CODING GUIDELINES
Hermes Agent Development Security Standards
Version: 1.0
Effective Date: March 30, 2026
1. GENERAL PRINCIPLES
1.1 Security-First Mindset
- Every feature must be designed with security in mind
- Assume all input is malicious until proven otherwise
- Defense in depth: multiple layers of security controls
- Fail securely: when security controls fail, default to denial
1.2 Threat Model
Primary threats to consider:
- Malicious user prompts
- Compromised or malicious skills
- Supply chain attacks
- Insider threats
- Accidental data exposure
2. INPUT VALIDATION
2.1 Validate All Input
# ❌ INCORRECT
def process_file(path: str):
with open(path) as f:
return f.read()
# ✅ CORRECT
from pydantic import BaseModel, validator
import re
class FileRequest(BaseModel):
path: str
max_size: int = 1000000
@validator('path')
def validate_path(cls, v):
# Block path traversal
if '..' in v or v.startswith('/'):
raise ValueError('Invalid path characters')
# Allowlist safe characters
if not re.match(r'^[\w\-./]+$', v):
raise ValueError('Invalid characters in path')
return v
@validator('max_size')
def validate_size(cls, v):
if v < 0 or v > 10000000:
raise ValueError('Size out of range')
return v
def process_file(request: FileRequest):
# Now safe to use request.path
pass
2.2 Length Limits
Always enforce maximum lengths:
MAX_INPUT_LENGTH = 10000
MAX_FILENAME_LENGTH = 255
MAX_PATH_LENGTH = 4096
def validate_length(value: str, max_len: int, field_name: str):
if len(value) > max_len:
raise ValueError(f"{field_name} exceeds maximum length of {max_len}")
2.3 Type Safety
Use type hints and enforce them:
from typing import Union
def safe_function(user_id: int, message: str) -> dict:
if not isinstance(user_id, int):
raise TypeError("user_id must be an integer")
if not isinstance(message, str):
raise TypeError("message must be a string")
# ... function logic
3. COMMAND EXECUTION
3.1 Never Use shell=True
import subprocess
import shlex
# ❌ NEVER DO THIS
subprocess.run(f"ls {user_input}", shell=True)
# ❌ NEVER DO THIS EITHER
cmd = f"cat {filename}"
os.system(cmd)
# ✅ CORRECT - Use list arguments
subprocess.run(["ls", user_input], shell=False)
# ✅ CORRECT - Use shlex for complex cases
cmd_parts = shlex.split(user_input)
subprocess.run(["ls"] + cmd_parts, shell=False)
3.2 Command Allowlisting
ALLOWED_COMMANDS = frozenset([
"ls", "cat", "grep", "find", "git", "python", "pip"
])
def validate_command(command: str):
parts = shlex.split(command)
if parts[0] not in ALLOWED_COMMANDS:
raise SecurityError(f"Command '{parts[0]}' not allowed")
3.3 Input Sanitization
import re
def sanitize_shell_input(value: str) -> str:
"""Remove dangerous shell metacharacters."""
# Block shell metacharacters
dangerous = re.compile(r'[;&|`$(){}[\]\\]')
if dangerous.search(value):
raise ValueError("Shell metacharacters not allowed")
return value
4. FILE OPERATIONS
4.1 Path Validation
from pathlib import Path
class FileSandbox:
def __init__(self, root: Path):
self.root = root.resolve()
def validate_path(self, user_path: str) -> Path:
"""Validate and resolve user-provided path within sandbox."""
# Expand user home
expanded = Path(user_path).expanduser()
# Resolve to absolute path
try:
resolved = expanded.resolve()
except (OSError, ValueError) as e:
raise SecurityError(f"Invalid path: {e}")
# Ensure path is within sandbox
try:
resolved.relative_to(self.root)
except ValueError:
raise SecurityError("Path outside sandbox")
return resolved
def safe_open(self, user_path: str, mode: str = 'r'):
safe_path = self.validate_path(user_path)
return open(safe_path, mode)
4.2 Prevent Symlink Attacks
import os
def safe_read_file(filepath: Path):
"""Read file, following symlinks only within allowed directories."""
# Resolve symlinks
real_path = filepath.resolve()
# Verify still in allowed location after resolution
if not str(real_path).startswith(str(SAFE_ROOT)):
raise SecurityError("Symlink escape detected")
# Verify it's a regular file
if not real_path.is_file():
raise SecurityError("Not a regular file")
return real_path.read_text()
4.3 Temporary Files
import tempfile
import os
def create_secure_temp_file():
"""Create temp file with restricted permissions."""
# Create with restrictive permissions
fd, path = tempfile.mkstemp(prefix="hermes_", suffix=".tmp")
try:
# Set owner-read/write only
os.chmod(path, 0o600)
return fd, path
except:
os.close(fd)
os.unlink(path)
raise
5. SECRET MANAGEMENT
5.1 Environment Variables
import os
# ❌ NEVER DO THIS
def execute_command(command: str):
# Child inherits ALL environment
subprocess.run(command, shell=True, env=os.environ)
# ✅ CORRECT - Explicit whitelisting
_ALLOWED_ENV = frozenset([
"PATH", "HOME", "USER", "LANG", "TERM", "SHELL"
])
def get_safe_environment():
return {k: v for k, v in os.environ.items()
if k in _ALLOWED_ENV}
def execute_command(command: str):
subprocess.run(
command,
shell=False,
env=get_safe_environment()
)
5.2 Secret Detection
import re
_SECRET_PATTERNS = [
re.compile(r'sk-[a-zA-Z0-9]{20,}'), # OpenAI-style keys
re.compile(r'ghp_[a-zA-Z0-9]{36}'), # GitHub PAT
re.compile(r'[a-zA-Z0-9]{40}'), # Generic high-entropy strings
]
def detect_secrets(text: str) -> list:
"""Detect potential secrets in text."""
findings = []
for pattern in _SECRET_PATTERNS:
matches = pattern.findall(text)
findings.extend(matches)
return findings
def redact_secrets(text: str) -> str:
"""Redact detected secrets."""
for pattern in _SECRET_PATTERNS:
text = pattern.sub('***REDACTED***', text)
return text
5.3 Secure Logging
import logging
from agent.redact import redact_sensitive_text
class SecureLogger:
def __init__(self, logger: logging.Logger):
self.logger = logger
def debug(self, msg: str, *args, **kwargs):
self.logger.debug(redact_sensitive_text(msg), *args, **kwargs)
def info(self, msg: str, *args, **kwargs):
self.logger.info(redact_sensitive_text(msg), *args, **kwargs)
def warning(self, msg: str, *args, **kwargs):
self.logger.warning(redact_sensitive_text(msg), *args, **kwargs)
def error(self, msg: str, *args, **kwargs):
self.logger.error(redact_sensitive_text(msg), *args, **kwargs)
6. NETWORK SECURITY
6.1 URL Validation
from urllib.parse import urlparse
import ipaddress
_BLOCKED_SCHEMES = frozenset(['file', 'ftp', 'gopher'])
_BLOCKED_HOSTS = frozenset([
'localhost', '127.0.0.1', '0.0.0.0',
'169.254.169.254', # AWS metadata
'[::1]', '[::]'
])
_PRIVATE_NETWORKS = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'), # Link-local
]
def validate_url(url: str) -> bool:
"""Validate URL is safe to fetch."""
parsed = urlparse(url)
# Check scheme
if parsed.scheme not in ('http', 'https'):
raise ValueError(f"Scheme '{parsed.scheme}' not allowed")
# Check hostname
hostname = parsed.hostname
if not hostname:
raise ValueError("No hostname in URL")
if hostname.lower() in _BLOCKED_HOSTS:
raise ValueError("Host not allowed")
# Check IP addresses
try:
ip = ipaddress.ip_address(hostname)
for network in _PRIVATE_NETWORKS:
if ip in network:
raise ValueError("Private IP address not allowed")
except ValueError:
pass # Not an IP, continue
return True
6.2 Redirect Handling
import requests
def safe_get(url: str, max_redirects: int = 5):
"""GET URL with redirect validation."""
session = requests.Session()
session.max_redirects = max_redirects
# Validate initial URL
validate_url(url)
# Custom redirect handler
response = session.get(
url,
allow_redirects=True,
hooks={'response': lambda r, *args, **kwargs: validate_url(r.url)}
)
return response
7. AUTHENTICATION & AUTHORIZATION
7.1 API Key Validation
import secrets
import hmac
import hashlib
def constant_time_compare(val1: str, val2: str) -> bool:
"""Compare strings in constant time to prevent timing attacks."""
return hmac.compare_digest(val1.encode(), val2.encode())
def validate_api_key(provided_key: str, expected_key: str) -> bool:
"""Validate API key using constant-time comparison."""
if not provided_key or not expected_key:
return False
return constant_time_compare(provided_key, expected_key)
7.2 Session Management
import secrets
from datetime import datetime, timedelta
class SessionManager:
SESSION_TIMEOUT = timedelta(hours=24)
def create_session(self, user_id: str) -> str:
"""Create secure session token."""
token = secrets.token_urlsafe(32)
expires = datetime.utcnow() + self.SESSION_TIMEOUT
# Store in database with expiration
return token
def validate_session(self, token: str) -> bool:
"""Validate session token."""
# Lookup in database
# Check expiration
# Validate token format
return True
8. ERROR HANDLING
8.1 Secure Error Messages
import logging
# Internal detailed logging
logger = logging.getLogger(__name__)
class UserFacingError(Exception):
"""Error safe to show to users."""
pass
def process_request(data: dict):
try:
result = internal_operation(data)
return result
except ValueError as e:
# Log full details internally
logger.error(f"Validation error: {e}", exc_info=True)
# Return safe message to user
raise UserFacingError("Invalid input provided")
except Exception as e:
# Log full details internally
logger.error(f"Unexpected error: {e}", exc_info=True)
# Generic message to user
raise UserFacingError("An error occurred")
8.2 Exception Handling
def safe_operation():
try:
risky_operation()
except Exception as e:
# Always clean up resources
cleanup_resources()
# Log securely
logger.error(f"Operation failed: {redact_sensitive_text(str(e))}")
# Re-raise or convert
raise
9. CRYPTOGRAPHY
9.1 Password Hashing
import bcrypt
def hash_password(password: str) -> str:
"""Hash password using bcrypt."""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode(), salt)
return hashed.decode()
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode(), hashed.encode())
9.2 Secure Random
import secrets
def generate_token(length: int = 32) -> str:
"""Generate cryptographically secure token."""
return secrets.token_urlsafe(length)
def generate_pin(length: int = 6) -> str:
"""Generate secure numeric PIN."""
return ''.join(str(secrets.randbelow(10)) for _ in range(length))
10. CODE REVIEW CHECKLIST
Before Submitting Code:
- All user inputs validated
- No shell=True in subprocess calls
- All file paths validated and sandboxed
- Secrets not logged or exposed
- URLs validated before fetching
- Error messages don't leak sensitive info
- No hardcoded credentials
- Proper exception handling
- Security tests included
- Documentation updated
Security-Focused Review Questions:
- What happens if this receives malicious input?
- Can this leak sensitive data?
- Are there privilege escalation paths?
- What if the external service is compromised?
- Is the error handling secure?
11. TESTING SECURITY
11.1 Security Unit Tests
def test_path_traversal_blocked():
sandbox = FileSandbox(Path("/safe/path"))
with pytest.raises(SecurityError):
sandbox.validate_path("../../../etc/passwd")
def test_command_injection_blocked():
with pytest.raises(SecurityError):
validate_command("ls; rm -rf /")
def test_secret_redaction():
text = "Key: sk-test123456789"
redacted = redact_secrets(text)
assert "sk-test" not in redacted
11.2 Fuzzing
import hypothesis.strategies as st
from hypothesis import given
@given(st.text())
def test_input_validation(input_text):
# Should never crash, always validate or reject
try:
result = process_input(input_text)
assert isinstance(result, ExpectedType)
except ValidationError:
pass # Expected for invalid input
12. INCIDENT RESPONSE
Security Incident Procedure:
- Stop - Halt the affected system/process
- Assess - Determine scope and impact
- Contain - Prevent further damage
- Investigate - Gather evidence
- Remediate - Fix the vulnerability
- Recover - Restore normal operations
- Learn - Document and improve
Emergency Contacts:
- Security Team: security@example.com
- On-call: +1-XXX-XXX-XXXX
- Slack: #security-incidents
Document Owner: Security Team
Review Cycle: Quarterly
Last Updated: March 30, 2026