Replace shell=True with list-based subprocess execution to prevent command injection via malicious user input. Changes: - tools/transcription_tools.py: Use shlex.split() + shell=False - tools/environments/docker.py: List-based commands with container ID validation Fixes CVE-level vulnerability where malicious file paths or container IDs could inject arbitrary commands. CVSS: 9.8 (Critical) Refs: V-001 in SECURITY_AUDIT_REPORT.md
567 lines
14 KiB
Markdown
567 lines
14 KiB
Markdown
# SECURE CODING GUIDELINES
|
|
|
|
## Hermes Agent Development Security Standards
|
|
**Version:** 1.0
|
|
**Effective Date:** March 30, 2026
|
|
|
|
---
|
|
|
|
## 1. GENERAL PRINCIPLES
|
|
|
|
### 1.1 Security-First Mindset
|
|
- Every feature must be designed with security in mind
|
|
- Assume all input is malicious until proven otherwise
|
|
- Defense in depth: multiple layers of security controls
|
|
- Fail securely: when security controls fail, default to denial
|
|
|
|
### 1.2 Threat Model
|
|
Primary threats to consider:
|
|
- Malicious user prompts
|
|
- Compromised or malicious skills
|
|
- Supply chain attacks
|
|
- Insider threats
|
|
- Accidental data exposure
|
|
|
|
---
|
|
|
|
## 2. INPUT VALIDATION
|
|
|
|
### 2.1 Validate All Input
|
|
```python
|
|
# ❌ INCORRECT
|
|
def process_file(path: str):
|
|
with open(path) as f:
|
|
return f.read()
|
|
|
|
# ✅ CORRECT
|
|
from pydantic import BaseModel, validator
|
|
import re
|
|
|
|
class FileRequest(BaseModel):
|
|
path: str
|
|
max_size: int = 1000000
|
|
|
|
@validator('path')
|
|
def validate_path(cls, v):
|
|
# Block path traversal
|
|
if '..' in v or v.startswith('/'):
|
|
raise ValueError('Invalid path characters')
|
|
# Allowlist safe characters
|
|
if not re.match(r'^[\w\-./]+$', v):
|
|
raise ValueError('Invalid characters in path')
|
|
return v
|
|
|
|
@validator('max_size')
|
|
def validate_size(cls, v):
|
|
if v < 0 or v > 10000000:
|
|
raise ValueError('Size out of range')
|
|
return v
|
|
|
|
def process_file(request: FileRequest):
|
|
# Now safe to use request.path
|
|
pass
|
|
```
|
|
|
|
### 2.2 Length Limits
|
|
Always enforce maximum lengths:
|
|
```python
|
|
MAX_INPUT_LENGTH = 10000
|
|
MAX_FILENAME_LENGTH = 255
|
|
MAX_PATH_LENGTH = 4096
|
|
|
|
def validate_length(value: str, max_len: int, field_name: str):
|
|
if len(value) > max_len:
|
|
raise ValueError(f"{field_name} exceeds maximum length of {max_len}")
|
|
```
|
|
|
|
### 2.3 Type Safety
|
|
Use type hints and enforce them:
|
|
```python
|
|
from typing import Union
|
|
|
|
def safe_function(user_id: int, message: str) -> dict:
|
|
if not isinstance(user_id, int):
|
|
raise TypeError("user_id must be an integer")
|
|
if not isinstance(message, str):
|
|
raise TypeError("message must be a string")
|
|
# ... function logic
|
|
```
|
|
|
|
---
|
|
|
|
## 3. COMMAND EXECUTION
|
|
|
|
### 3.1 Never Use shell=True
|
|
```python
|
|
import subprocess
|
|
import shlex
|
|
|
|
# ❌ NEVER DO THIS
|
|
subprocess.run(f"ls {user_input}", shell=True)
|
|
|
|
# ❌ NEVER DO THIS EITHER
|
|
cmd = f"cat {filename}"
|
|
os.system(cmd)
|
|
|
|
# ✅ CORRECT - Use list arguments
|
|
subprocess.run(["ls", user_input], shell=False)
|
|
|
|
# ✅ CORRECT - Use shlex for complex cases
|
|
cmd_parts = shlex.split(user_input)
|
|
subprocess.run(["ls"] + cmd_parts, shell=False)
|
|
```
|
|
|
|
### 3.2 Command Allowlisting
|
|
```python
|
|
ALLOWED_COMMANDS = frozenset([
|
|
"ls", "cat", "grep", "find", "git", "python", "pip"
|
|
])
|
|
|
|
def validate_command(command: str):
|
|
parts = shlex.split(command)
|
|
if parts[0] not in ALLOWED_COMMANDS:
|
|
raise SecurityError(f"Command '{parts[0]}' not allowed")
|
|
```
|
|
|
|
### 3.3 Input Sanitization
|
|
```python
|
|
import re
|
|
|
|
def sanitize_shell_input(value: str) -> str:
|
|
"""Remove dangerous shell metacharacters."""
|
|
# Block shell metacharacters
|
|
dangerous = re.compile(r'[;&|`$(){}[\]\\]')
|
|
if dangerous.search(value):
|
|
raise ValueError("Shell metacharacters not allowed")
|
|
return value
|
|
```
|
|
|
|
---
|
|
|
|
## 4. FILE OPERATIONS
|
|
|
|
### 4.1 Path Validation
|
|
```python
|
|
from pathlib import Path
|
|
|
|
class FileSandbox:
|
|
def __init__(self, root: Path):
|
|
self.root = root.resolve()
|
|
|
|
def validate_path(self, user_path: str) -> Path:
|
|
"""Validate and resolve user-provided path within sandbox."""
|
|
# Expand user home
|
|
expanded = Path(user_path).expanduser()
|
|
|
|
# Resolve to absolute path
|
|
try:
|
|
resolved = expanded.resolve()
|
|
except (OSError, ValueError) as e:
|
|
raise SecurityError(f"Invalid path: {e}")
|
|
|
|
# Ensure path is within sandbox
|
|
try:
|
|
resolved.relative_to(self.root)
|
|
except ValueError:
|
|
raise SecurityError("Path outside sandbox")
|
|
|
|
return resolved
|
|
|
|
def safe_open(self, user_path: str, mode: str = 'r'):
|
|
safe_path = self.validate_path(user_path)
|
|
return open(safe_path, mode)
|
|
```
|
|
|
|
### 4.2 Prevent Symlink Attacks
|
|
```python
|
|
import os
|
|
|
|
def safe_read_file(filepath: Path):
|
|
"""Read file, following symlinks only within allowed directories."""
|
|
# Resolve symlinks
|
|
real_path = filepath.resolve()
|
|
|
|
# Verify still in allowed location after resolution
|
|
if not str(real_path).startswith(str(SAFE_ROOT)):
|
|
raise SecurityError("Symlink escape detected")
|
|
|
|
# Verify it's a regular file
|
|
if not real_path.is_file():
|
|
raise SecurityError("Not a regular file")
|
|
|
|
return real_path.read_text()
|
|
```
|
|
|
|
### 4.3 Temporary Files
|
|
```python
|
|
import tempfile
|
|
import os
|
|
|
|
def create_secure_temp_file():
|
|
"""Create temp file with restricted permissions."""
|
|
# Create with restrictive permissions
|
|
fd, path = tempfile.mkstemp(prefix="hermes_", suffix=".tmp")
|
|
try:
|
|
# Set owner-read/write only
|
|
os.chmod(path, 0o600)
|
|
return fd, path
|
|
except:
|
|
os.close(fd)
|
|
os.unlink(path)
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
## 5. SECRET MANAGEMENT
|
|
|
|
### 5.1 Environment Variables
|
|
```python
|
|
import os
|
|
|
|
# ❌ NEVER DO THIS
|
|
def execute_command(command: str):
|
|
# Child inherits ALL environment
|
|
subprocess.run(command, shell=True, env=os.environ)
|
|
|
|
# ✅ CORRECT - Explicit whitelisting
|
|
_ALLOWED_ENV = frozenset([
|
|
"PATH", "HOME", "USER", "LANG", "TERM", "SHELL"
|
|
])
|
|
|
|
def get_safe_environment():
|
|
return {k: v for k, v in os.environ.items()
|
|
if k in _ALLOWED_ENV}
|
|
|
|
def execute_command(command: str):
|
|
subprocess.run(
|
|
command,
|
|
shell=False,
|
|
env=get_safe_environment()
|
|
)
|
|
```
|
|
|
|
### 5.2 Secret Detection
|
|
```python
|
|
import re
|
|
|
|
_SECRET_PATTERNS = [
|
|
re.compile(r'sk-[a-zA-Z0-9]{20,}'), # OpenAI-style keys
|
|
re.compile(r'ghp_[a-zA-Z0-9]{36}'), # GitHub PAT
|
|
re.compile(r'[a-zA-Z0-9]{40}'), # Generic high-entropy strings
|
|
]
|
|
|
|
def detect_secrets(text: str) -> list:
|
|
"""Detect potential secrets in text."""
|
|
findings = []
|
|
for pattern in _SECRET_PATTERNS:
|
|
matches = pattern.findall(text)
|
|
findings.extend(matches)
|
|
return findings
|
|
|
|
def redact_secrets(text: str) -> str:
|
|
"""Redact detected secrets."""
|
|
for pattern in _SECRET_PATTERNS:
|
|
text = pattern.sub('***REDACTED***', text)
|
|
return text
|
|
```
|
|
|
|
### 5.3 Secure Logging
|
|
```python
|
|
import logging
|
|
from agent.redact import redact_sensitive_text
|
|
|
|
class SecureLogger:
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
|
|
def debug(self, msg: str, *args, **kwargs):
|
|
self.logger.debug(redact_sensitive_text(msg), *args, **kwargs)
|
|
|
|
def info(self, msg: str, *args, **kwargs):
|
|
self.logger.info(redact_sensitive_text(msg), *args, **kwargs)
|
|
|
|
def warning(self, msg: str, *args, **kwargs):
|
|
self.logger.warning(redact_sensitive_text(msg), *args, **kwargs)
|
|
|
|
def error(self, msg: str, *args, **kwargs):
|
|
self.logger.error(redact_sensitive_text(msg), *args, **kwargs)
|
|
```
|
|
|
|
---
|
|
|
|
## 6. NETWORK SECURITY
|
|
|
|
### 6.1 URL Validation
|
|
```python
|
|
from urllib.parse import urlparse
|
|
import ipaddress
|
|
|
|
_BLOCKED_SCHEMES = frozenset(['file', 'ftp', 'gopher'])
|
|
_BLOCKED_HOSTS = frozenset([
|
|
'localhost', '127.0.0.1', '0.0.0.0',
|
|
'169.254.169.254', # AWS metadata
|
|
'[::1]', '[::]'
|
|
])
|
|
_PRIVATE_NETWORKS = [
|
|
ipaddress.ip_network('10.0.0.0/8'),
|
|
ipaddress.ip_network('172.16.0.0/12'),
|
|
ipaddress.ip_network('192.168.0.0/16'),
|
|
ipaddress.ip_network('127.0.0.0/8'),
|
|
ipaddress.ip_network('169.254.0.0/16'), # Link-local
|
|
]
|
|
|
|
def validate_url(url: str) -> bool:
|
|
"""Validate URL is safe to fetch."""
|
|
parsed = urlparse(url)
|
|
|
|
# Check scheme
|
|
if parsed.scheme not in ('http', 'https'):
|
|
raise ValueError(f"Scheme '{parsed.scheme}' not allowed")
|
|
|
|
# Check hostname
|
|
hostname = parsed.hostname
|
|
if not hostname:
|
|
raise ValueError("No hostname in URL")
|
|
|
|
if hostname.lower() in _BLOCKED_HOSTS:
|
|
raise ValueError("Host not allowed")
|
|
|
|
# Check IP addresses
|
|
try:
|
|
ip = ipaddress.ip_address(hostname)
|
|
for network in _PRIVATE_NETWORKS:
|
|
if ip in network:
|
|
raise ValueError("Private IP address not allowed")
|
|
except ValueError:
|
|
pass # Not an IP, continue
|
|
|
|
return True
|
|
```
|
|
|
|
### 6.2 Redirect Handling
|
|
```python
|
|
import requests
|
|
|
|
def safe_get(url: str, max_redirects: int = 5):
|
|
"""GET URL with redirect validation."""
|
|
session = requests.Session()
|
|
session.max_redirects = max_redirects
|
|
|
|
# Validate initial URL
|
|
validate_url(url)
|
|
|
|
# Custom redirect handler
|
|
response = session.get(
|
|
url,
|
|
allow_redirects=True,
|
|
hooks={'response': lambda r, *args, **kwargs: validate_url(r.url)}
|
|
)
|
|
|
|
return response
|
|
```
|
|
|
|
---
|
|
|
|
## 7. AUTHENTICATION & AUTHORIZATION
|
|
|
|
### 7.1 API Key Validation
|
|
```python
|
|
import secrets
|
|
import hmac
|
|
import hashlib
|
|
|
|
def constant_time_compare(val1: str, val2: str) -> bool:
|
|
"""Compare strings in constant time to prevent timing attacks."""
|
|
return hmac.compare_digest(val1.encode(), val2.encode())
|
|
|
|
def validate_api_key(provided_key: str, expected_key: str) -> bool:
|
|
"""Validate API key using constant-time comparison."""
|
|
if not provided_key or not expected_key:
|
|
return False
|
|
return constant_time_compare(provided_key, expected_key)
|
|
```
|
|
|
|
### 7.2 Session Management
|
|
```python
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
|
|
class SessionManager:
|
|
SESSION_TIMEOUT = timedelta(hours=24)
|
|
|
|
def create_session(self, user_id: str) -> str:
|
|
"""Create secure session token."""
|
|
token = secrets.token_urlsafe(32)
|
|
expires = datetime.utcnow() + self.SESSION_TIMEOUT
|
|
# Store in database with expiration
|
|
return token
|
|
|
|
def validate_session(self, token: str) -> bool:
|
|
"""Validate session token."""
|
|
# Lookup in database
|
|
# Check expiration
|
|
# Validate token format
|
|
return True
|
|
```
|
|
|
|
---
|
|
|
|
## 8. ERROR HANDLING
|
|
|
|
### 8.1 Secure Error Messages
|
|
```python
|
|
import logging
|
|
|
|
# Internal detailed logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class UserFacingError(Exception):
|
|
"""Error safe to show to users."""
|
|
pass
|
|
|
|
def process_request(data: dict):
|
|
try:
|
|
result = internal_operation(data)
|
|
return result
|
|
except ValueError as e:
|
|
# Log full details internally
|
|
logger.error(f"Validation error: {e}", exc_info=True)
|
|
# Return safe message to user
|
|
raise UserFacingError("Invalid input provided")
|
|
except Exception as e:
|
|
# Log full details internally
|
|
logger.error(f"Unexpected error: {e}", exc_info=True)
|
|
# Generic message to user
|
|
raise UserFacingError("An error occurred")
|
|
```
|
|
|
|
### 8.2 Exception Handling
|
|
```python
|
|
def safe_operation():
|
|
try:
|
|
risky_operation()
|
|
except Exception as e:
|
|
# Always clean up resources
|
|
cleanup_resources()
|
|
# Log securely
|
|
logger.error(f"Operation failed: {redact_sensitive_text(str(e))}")
|
|
# Re-raise or convert
|
|
raise
|
|
```
|
|
|
|
---
|
|
|
|
## 9. CRYPTOGRAPHY
|
|
|
|
### 9.1 Password Hashing
|
|
```python
|
|
import bcrypt
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash password using bcrypt."""
|
|
salt = bcrypt.gensalt(rounds=12)
|
|
hashed = bcrypt.hashpw(password.encode(), salt)
|
|
return hashed.decode()
|
|
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
"""Verify password against hash."""
|
|
return bcrypt.checkpw(password.encode(), hashed.encode())
|
|
```
|
|
|
|
### 9.2 Secure Random
|
|
```python
|
|
import secrets
|
|
|
|
def generate_token(length: int = 32) -> str:
|
|
"""Generate cryptographically secure token."""
|
|
return secrets.token_urlsafe(length)
|
|
|
|
def generate_pin(length: int = 6) -> str:
|
|
"""Generate secure numeric PIN."""
|
|
return ''.join(str(secrets.randbelow(10)) for _ in range(length))
|
|
```
|
|
|
|
---
|
|
|
|
## 10. CODE REVIEW CHECKLIST
|
|
|
|
### Before Submitting Code:
|
|
- [ ] All user inputs validated
|
|
- [ ] No shell=True in subprocess calls
|
|
- [ ] All file paths validated and sandboxed
|
|
- [ ] Secrets not logged or exposed
|
|
- [ ] URLs validated before fetching
|
|
- [ ] Error messages don't leak sensitive info
|
|
- [ ] No hardcoded credentials
|
|
- [ ] Proper exception handling
|
|
- [ ] Security tests included
|
|
- [ ] Documentation updated
|
|
|
|
### Security-Focused Review Questions:
|
|
1. What happens if this receives malicious input?
|
|
2. Can this leak sensitive data?
|
|
3. Are there privilege escalation paths?
|
|
4. What if the external service is compromised?
|
|
5. Is the error handling secure?
|
|
|
|
---
|
|
|
|
## 11. TESTING SECURITY
|
|
|
|
### 11.1 Security Unit Tests
|
|
```python
|
|
def test_path_traversal_blocked():
|
|
sandbox = FileSandbox(Path("/safe/path"))
|
|
with pytest.raises(SecurityError):
|
|
sandbox.validate_path("../../../etc/passwd")
|
|
|
|
def test_command_injection_blocked():
|
|
with pytest.raises(SecurityError):
|
|
validate_command("ls; rm -rf /")
|
|
|
|
def test_secret_redaction():
|
|
text = "Key: sk-test123456789"
|
|
redacted = redact_secrets(text)
|
|
assert "sk-test" not in redacted
|
|
```
|
|
|
|
### 11.2 Fuzzing
|
|
```python
|
|
import hypothesis.strategies as st
|
|
from hypothesis import given
|
|
|
|
@given(st.text())
|
|
def test_input_validation(input_text):
|
|
# Should never crash, always validate or reject
|
|
try:
|
|
result = process_input(input_text)
|
|
assert isinstance(result, ExpectedType)
|
|
except ValidationError:
|
|
pass # Expected for invalid input
|
|
```
|
|
|
|
---
|
|
|
|
## 12. INCIDENT RESPONSE
|
|
|
|
### Security Incident Procedure:
|
|
1. **Stop** - Halt the affected system/process
|
|
2. **Assess** - Determine scope and impact
|
|
3. **Contain** - Prevent further damage
|
|
4. **Investigate** - Gather evidence
|
|
5. **Remediate** - Fix the vulnerability
|
|
6. **Recover** - Restore normal operations
|
|
7. **Learn** - Document and improve
|
|
|
|
### Emergency Contacts:
|
|
- Security Team: security@example.com
|
|
- On-call: +1-XXX-XXX-XXXX
|
|
- Slack: #security-incidents
|
|
|
|
---
|
|
|
|
**Document Owner:** Security Team
|
|
**Review Cycle:** Quarterly
|
|
**Last Updated:** March 30, 2026
|