Files
timmy-home/uni-wizard/v2/author_whitelist.py
Allegro 384fad6d5f security: Add author whitelist for task router (Issue #132)
Implements security fix for issue #132 - Task router author whitelist

Changes:
- Add author_whitelist.py module with whitelist validation
- Integrate whitelist checks into task_router_daemon.py
- Add author_whitelist config option to config.yaml
- Add comprehensive tests for whitelist validation

Security features:
- Validates task authors against authorized whitelist
- Logs all authorization attempts (success and failure)
- Secure by default: empty whitelist denies all
- Configurable via environment variable or config file
- Prevents unauthorized command execution from untrusted Gitea users
2026-03-31 03:53:37 +00:00

328 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Author Whitelist Module — Security Fix for Issue #132
Validates task authors against an authorized whitelist before processing.
Prevents unauthorized command execution from untrusted Gitea users.
Configuration (in order of precedence):
1. Environment variable: TIMMY_AUTHOR_WHITELIST (comma-separated)
2. Config file: security.author_whitelist (list)
3. Default: empty list (deny all - secure by default)
Security Events:
- All authorization failures are logged with full context
- Logs include: timestamp, author, issue, IP (if available), action taken
"""
import os
import json
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class AuthorizationResult:
"""Result of an authorization check"""
authorized: bool
author: str
reason: str
timestamp: str
issue_number: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SecurityLogger:
"""Dedicated security event logging"""
def __init__(self, log_dir: Optional[Path] = None):
self.log_dir = log_dir or Path.home() / "timmy" / "logs" / "security"
self.log_dir.mkdir(parents=True, exist_ok=True)
self.security_log = self.log_dir / "auth_events.jsonl"
# Also set up Python logger for immediate console/file output
self.logger = logging.getLogger("timmy.security")
self.logger.setLevel(logging.WARNING)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - SECURITY - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_authorization(self, result: AuthorizationResult, context: Optional[Dict] = None):
"""Log authorization attempt with full context"""
entry = {
"timestamp": result.timestamp,
"event_type": "authorization",
"authorized": result.authorized,
"author": result.author,
"reason": result.reason,
"issue_number": result.issue_number,
"context": context or {}
}
# Write to structured log file
with open(self.security_log, 'a') as f:
f.write(json.dumps(entry) + '\n')
# Log to Python logger for immediate visibility
if result.authorized:
self.logger.info(f"AUTHORIZED: '{result.author}' - {result.reason}")
else:
self.logger.warning(
f"UNAUTHORIZED ACCESS ATTEMPT: '{result.author}' - {result.reason}"
)
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""Log general security event"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
**details
}
with open(self.security_log, 'a') as f:
f.write(json.dumps(entry) + '\n')
self.logger.warning(f"SECURITY EVENT [{event_type}]: {details}")
class AuthorWhitelist:
"""
Author whitelist validator for task router security.
Usage:
whitelist = AuthorWhitelist()
result = whitelist.validate_author("username", issue_number=123)
if not result.authorized:
# Return 403, do not process task
"""
# Default deny all (secure by default)
DEFAULT_WHITELIST: List[str] = []
def __init__(
self,
whitelist: Optional[List[str]] = None,
config_path: Optional[Path] = None,
log_dir: Optional[Path] = None
):
"""
Initialize whitelist from provided list, env var, or config file.
Priority:
1. Explicit whitelist parameter
2. TIMMY_AUTHOR_WHITELIST environment variable
3. Config file security.author_whitelist
4. Default empty list (secure by default)
"""
self.security_logger = SecurityLogger(log_dir)
self._whitelist: List[str] = []
self._config_path = config_path or Path("/tmp/timmy-home/config.yaml")
# Load whitelist from available sources
if whitelist is not None:
self._whitelist = [u.strip().lower() for u in whitelist if u.strip()]
else:
self._whitelist = self._load_whitelist()
# Log initialization (without exposing full whitelist in production)
self.security_logger.log_security_event(
"whitelist_initialized",
{
"whitelist_size": len(self._whitelist),
"whitelist_empty": len(self._whitelist) == 0,
"source": self._get_whitelist_source()
}
)
def _get_whitelist_source(self) -> str:
"""Determine which source the whitelist came from"""
if os.environ.get("TIMMY_AUTHOR_WHITELIST"):
return "environment"
if self._config_path.exists():
try:
import yaml
with open(self._config_path) as f:
config = yaml.safe_load(f)
if config and config.get("security", {}).get("author_whitelist"):
return "config_file"
except Exception:
pass
return "default"
def _load_whitelist(self) -> List[str]:
"""Load whitelist from environment or config"""
# 1. Check environment variable
env_whitelist = os.environ.get("TIMMY_AUTHOR_WHITELIST", "").strip()
if env_whitelist:
return [u.strip().lower() for u in env_whitelist.split(",") if u.strip()]
# 2. Check config file
if self._config_path.exists():
try:
import yaml
with open(self._config_path) as f:
config = yaml.safe_load(f)
if config:
security_config = config.get("security", {})
config_whitelist = security_config.get("author_whitelist", [])
if config_whitelist:
return [u.strip().lower() for u in config_whitelist if u.strip()]
except Exception as e:
self.security_logger.log_security_event(
"config_load_error",
{"error": str(e), "path": str(self._config_path)}
)
# 3. Default: empty list (secure by default - deny all)
return list(self.DEFAULT_WHITELIST)
def validate_author(
self,
author: str,
issue_number: Optional[int] = None,
context: Optional[Dict[str, Any]] = None
) -> AuthorizationResult:
"""
Validate if an author is authorized to submit tasks.
Args:
author: The username to validate
issue_number: Optional issue number for logging context
context: Additional context (IP, user agent, etc.)
Returns:
AuthorizationResult with authorized status and reason
"""
timestamp = datetime.utcnow().isoformat()
author_clean = author.strip().lower() if author else ""
# Check for empty author
if not author_clean:
result = AuthorizationResult(
authorized=False,
author=author or "<empty>",
reason="Empty author provided",
timestamp=timestamp,
issue_number=issue_number
)
self.security_logger.log_authorization(result, context)
return result
# Check whitelist
if author_clean in self._whitelist:
result = AuthorizationResult(
authorized=True,
author=author,
reason="Author found in whitelist",
timestamp=timestamp,
issue_number=issue_number
)
self.security_logger.log_authorization(result, context)
return result
# Not authorized
result = AuthorizationResult(
authorized=False,
author=author,
reason="Author not in whitelist",
timestamp=timestamp,
issue_number=issue_number
)
self.security_logger.log_authorization(result, context)
return result
def is_authorized(self, author: str) -> bool:
"""Quick check if author is authorized (without logging)"""
if not author:
return False
return author.strip().lower() in self._whitelist
def get_whitelist(self) -> List[str]:
"""Get current whitelist (for admin/debug purposes)"""
return list(self._whitelist)
def add_author(self, author: str) -> None:
"""Add an author to the whitelist (runtime only)"""
author_clean = author.strip().lower()
if author_clean and author_clean not in self._whitelist:
self._whitelist.append(author_clean)
self.security_logger.log_security_event(
"whitelist_modified",
{"action": "add", "author": author, "new_size": len(self._whitelist)}
)
def remove_author(self, author: str) -> None:
"""Remove an author from the whitelist (runtime only)"""
author_clean = author.strip().lower()
if author_clean in self._whitelist:
self._whitelist.remove(author_clean)
self.security_logger.log_security_event(
"whitelist_modified",
{"action": "remove", "author": author, "new_size": len(self._whitelist)}
)
# HTTP-style response helpers for integration with web frameworks
def create_403_response(result: AuthorizationResult) -> Dict[str, Any]:
"""Create a 403 Forbidden response for unauthorized authors"""
return {
"status_code": 403,
"error": "Forbidden",
"message": "Author not authorized to submit tasks",
"details": {
"author": result.author,
"reason": result.reason,
"timestamp": result.timestamp
}
}
def create_200_response(result: AuthorizationResult) -> Dict[str, Any]:
"""Create a 200 OK response for authorized authors"""
return {
"status_code": 200,
"authorized": True,
"author": result.author,
"timestamp": result.timestamp
}
if __name__ == "__main__":
# Demo usage
print("=" * 60)
print("AUTHOR WHITELIST MODULE — Security Demo")
print("=" * 60)
# Example with explicit whitelist
whitelist = AuthorWhitelist(whitelist=["admin", "timmy", "ezra"])
print("\nTest Cases:")
print("-" * 60)
test_cases = [
("timmy", 123),
("hacker", 456),
("", 789),
("ADMIN", 100), # Case insensitive
]
for author, issue in test_cases:
result = whitelist.validate_author(author, issue_number=issue)
status = "✅ AUTHORIZED" if result.authorized else "❌ DENIED"
print(f"\n{status} '{author}' on issue #{issue}")
print(f" Reason: {result.reason}")
print("\n" + "=" * 60)
print("Current whitelist:", whitelist.get_whitelist())