Implements security fix for issue #132 - Task router author whitelist Changes: - Add author_whitelist.py module with whitelist validation - Integrate whitelist checks into task_router_daemon.py - Add author_whitelist config option to config.yaml - Add comprehensive tests for whitelist validation Security features: - Validates task authors against authorized whitelist - Logs all authorization attempts (success and failure) - Secure by default: empty whitelist denies all - Configurable via environment variable or config file - Prevents unauthorized command execution from untrusted Gitea users
328 lines
11 KiB
Python
328 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Author Whitelist Module — Security Fix for Issue #132
|
|
|
|
Validates task authors against an authorized whitelist before processing.
|
|
Prevents unauthorized command execution from untrusted Gitea users.
|
|
|
|
Configuration (in order of precedence):
|
|
1. Environment variable: TIMMY_AUTHOR_WHITELIST (comma-separated)
|
|
2. Config file: security.author_whitelist (list)
|
|
3. Default: empty list (deny all - secure by default)
|
|
|
|
Security Events:
|
|
- All authorization failures are logged with full context
|
|
- Logs include: timestamp, author, issue, IP (if available), action taken
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
|
|
|
|
@dataclass
|
|
class AuthorizationResult:
|
|
"""Result of an authorization check"""
|
|
authorized: bool
|
|
author: str
|
|
reason: str
|
|
timestamp: str
|
|
issue_number: Optional[int] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
class SecurityLogger:
|
|
"""Dedicated security event logging"""
|
|
|
|
def __init__(self, log_dir: Optional[Path] = None):
|
|
self.log_dir = log_dir or Path.home() / "timmy" / "logs" / "security"
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
self.security_log = self.log_dir / "auth_events.jsonl"
|
|
|
|
# Also set up Python logger for immediate console/file output
|
|
self.logger = logging.getLogger("timmy.security")
|
|
self.logger.setLevel(logging.WARNING)
|
|
|
|
if not self.logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
self.logger.addHandler(handler)
|
|
|
|
def log_authorization(self, result: AuthorizationResult, context: Optional[Dict] = None):
|
|
"""Log authorization attempt with full context"""
|
|
entry = {
|
|
"timestamp": result.timestamp,
|
|
"event_type": "authorization",
|
|
"authorized": result.authorized,
|
|
"author": result.author,
|
|
"reason": result.reason,
|
|
"issue_number": result.issue_number,
|
|
"context": context or {}
|
|
}
|
|
|
|
# Write to structured log file
|
|
with open(self.security_log, 'a') as f:
|
|
f.write(json.dumps(entry) + '\n')
|
|
|
|
# Log to Python logger for immediate visibility
|
|
if result.authorized:
|
|
self.logger.info(f"AUTHORIZED: '{result.author}' - {result.reason}")
|
|
else:
|
|
self.logger.warning(
|
|
f"UNAUTHORIZED ACCESS ATTEMPT: '{result.author}' - {result.reason}"
|
|
)
|
|
|
|
def log_security_event(self, event_type: str, details: Dict[str, Any]):
|
|
"""Log general security event"""
|
|
entry = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"event_type": event_type,
|
|
**details
|
|
}
|
|
|
|
with open(self.security_log, 'a') as f:
|
|
f.write(json.dumps(entry) + '\n')
|
|
|
|
self.logger.warning(f"SECURITY EVENT [{event_type}]: {details}")
|
|
|
|
|
|
class AuthorWhitelist:
|
|
"""
|
|
Author whitelist validator for task router security.
|
|
|
|
Usage:
|
|
whitelist = AuthorWhitelist()
|
|
result = whitelist.validate_author("username", issue_number=123)
|
|
if not result.authorized:
|
|
# Return 403, do not process task
|
|
"""
|
|
|
|
# Default deny all (secure by default)
|
|
DEFAULT_WHITELIST: List[str] = []
|
|
|
|
def __init__(
|
|
self,
|
|
whitelist: Optional[List[str]] = None,
|
|
config_path: Optional[Path] = None,
|
|
log_dir: Optional[Path] = None
|
|
):
|
|
"""
|
|
Initialize whitelist from provided list, env var, or config file.
|
|
|
|
Priority:
|
|
1. Explicit whitelist parameter
|
|
2. TIMMY_AUTHOR_WHITELIST environment variable
|
|
3. Config file security.author_whitelist
|
|
4. Default empty list (secure by default)
|
|
"""
|
|
self.security_logger = SecurityLogger(log_dir)
|
|
self._whitelist: List[str] = []
|
|
self._config_path = config_path or Path("/tmp/timmy-home/config.yaml")
|
|
|
|
# Load whitelist from available sources
|
|
if whitelist is not None:
|
|
self._whitelist = [u.strip().lower() for u in whitelist if u.strip()]
|
|
else:
|
|
self._whitelist = self._load_whitelist()
|
|
|
|
# Log initialization (without exposing full whitelist in production)
|
|
self.security_logger.log_security_event(
|
|
"whitelist_initialized",
|
|
{
|
|
"whitelist_size": len(self._whitelist),
|
|
"whitelist_empty": len(self._whitelist) == 0,
|
|
"source": self._get_whitelist_source()
|
|
}
|
|
)
|
|
|
|
def _get_whitelist_source(self) -> str:
|
|
"""Determine which source the whitelist came from"""
|
|
if os.environ.get("TIMMY_AUTHOR_WHITELIST"):
|
|
return "environment"
|
|
if self._config_path.exists():
|
|
try:
|
|
import yaml
|
|
with open(self._config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
if config and config.get("security", {}).get("author_whitelist"):
|
|
return "config_file"
|
|
except Exception:
|
|
pass
|
|
return "default"
|
|
|
|
def _load_whitelist(self) -> List[str]:
|
|
"""Load whitelist from environment or config"""
|
|
# 1. Check environment variable
|
|
env_whitelist = os.environ.get("TIMMY_AUTHOR_WHITELIST", "").strip()
|
|
if env_whitelist:
|
|
return [u.strip().lower() for u in env_whitelist.split(",") if u.strip()]
|
|
|
|
# 2. Check config file
|
|
if self._config_path.exists():
|
|
try:
|
|
import yaml
|
|
with open(self._config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
if config:
|
|
security_config = config.get("security", {})
|
|
config_whitelist = security_config.get("author_whitelist", [])
|
|
if config_whitelist:
|
|
return [u.strip().lower() for u in config_whitelist if u.strip()]
|
|
except Exception as e:
|
|
self.security_logger.log_security_event(
|
|
"config_load_error",
|
|
{"error": str(e), "path": str(self._config_path)}
|
|
)
|
|
|
|
# 3. Default: empty list (secure by default - deny all)
|
|
return list(self.DEFAULT_WHITELIST)
|
|
|
|
def validate_author(
|
|
self,
|
|
author: str,
|
|
issue_number: Optional[int] = None,
|
|
context: Optional[Dict[str, Any]] = None
|
|
) -> AuthorizationResult:
|
|
"""
|
|
Validate if an author is authorized to submit tasks.
|
|
|
|
Args:
|
|
author: The username to validate
|
|
issue_number: Optional issue number for logging context
|
|
context: Additional context (IP, user agent, etc.)
|
|
|
|
Returns:
|
|
AuthorizationResult with authorized status and reason
|
|
"""
|
|
timestamp = datetime.utcnow().isoformat()
|
|
author_clean = author.strip().lower() if author else ""
|
|
|
|
# Check for empty author
|
|
if not author_clean:
|
|
result = AuthorizationResult(
|
|
authorized=False,
|
|
author=author or "<empty>",
|
|
reason="Empty author provided",
|
|
timestamp=timestamp,
|
|
issue_number=issue_number
|
|
)
|
|
self.security_logger.log_authorization(result, context)
|
|
return result
|
|
|
|
# Check whitelist
|
|
if author_clean in self._whitelist:
|
|
result = AuthorizationResult(
|
|
authorized=True,
|
|
author=author,
|
|
reason="Author found in whitelist",
|
|
timestamp=timestamp,
|
|
issue_number=issue_number
|
|
)
|
|
self.security_logger.log_authorization(result, context)
|
|
return result
|
|
|
|
# Not authorized
|
|
result = AuthorizationResult(
|
|
authorized=False,
|
|
author=author,
|
|
reason="Author not in whitelist",
|
|
timestamp=timestamp,
|
|
issue_number=issue_number
|
|
)
|
|
self.security_logger.log_authorization(result, context)
|
|
return result
|
|
|
|
def is_authorized(self, author: str) -> bool:
|
|
"""Quick check if author is authorized (without logging)"""
|
|
if not author:
|
|
return False
|
|
return author.strip().lower() in self._whitelist
|
|
|
|
def get_whitelist(self) -> List[str]:
|
|
"""Get current whitelist (for admin/debug purposes)"""
|
|
return list(self._whitelist)
|
|
|
|
def add_author(self, author: str) -> None:
|
|
"""Add an author to the whitelist (runtime only)"""
|
|
author_clean = author.strip().lower()
|
|
if author_clean and author_clean not in self._whitelist:
|
|
self._whitelist.append(author_clean)
|
|
self.security_logger.log_security_event(
|
|
"whitelist_modified",
|
|
{"action": "add", "author": author, "new_size": len(self._whitelist)}
|
|
)
|
|
|
|
def remove_author(self, author: str) -> None:
|
|
"""Remove an author from the whitelist (runtime only)"""
|
|
author_clean = author.strip().lower()
|
|
if author_clean in self._whitelist:
|
|
self._whitelist.remove(author_clean)
|
|
self.security_logger.log_security_event(
|
|
"whitelist_modified",
|
|
{"action": "remove", "author": author, "new_size": len(self._whitelist)}
|
|
)
|
|
|
|
|
|
# HTTP-style response helpers for integration with web frameworks
|
|
def create_403_response(result: AuthorizationResult) -> Dict[str, Any]:
|
|
"""Create a 403 Forbidden response for unauthorized authors"""
|
|
return {
|
|
"status_code": 403,
|
|
"error": "Forbidden",
|
|
"message": "Author not authorized to submit tasks",
|
|
"details": {
|
|
"author": result.author,
|
|
"reason": result.reason,
|
|
"timestamp": result.timestamp
|
|
}
|
|
}
|
|
|
|
|
|
def create_200_response(result: AuthorizationResult) -> Dict[str, Any]:
|
|
"""Create a 200 OK response for authorized authors"""
|
|
return {
|
|
"status_code": 200,
|
|
"authorized": True,
|
|
"author": result.author,
|
|
"timestamp": result.timestamp
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Demo usage
|
|
print("=" * 60)
|
|
print("AUTHOR WHITELIST MODULE — Security Demo")
|
|
print("=" * 60)
|
|
|
|
# Example with explicit whitelist
|
|
whitelist = AuthorWhitelist(whitelist=["admin", "timmy", "ezra"])
|
|
|
|
print("\nTest Cases:")
|
|
print("-" * 60)
|
|
|
|
test_cases = [
|
|
("timmy", 123),
|
|
("hacker", 456),
|
|
("", 789),
|
|
("ADMIN", 100), # Case insensitive
|
|
]
|
|
|
|
for author, issue in test_cases:
|
|
result = whitelist.validate_author(author, issue_number=issue)
|
|
status = "✅ AUTHORIZED" if result.authorized else "❌ DENIED"
|
|
print(f"\n{status} '{author}' on issue #{issue}")
|
|
print(f" Reason: {result.reason}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Current whitelist:", whitelist.get_whitelist())
|