diff --git a/config.yaml b/config.yaml index 5e01d58..88dc477 100644 --- a/config.yaml +++ b/config.yaml @@ -160,6 +160,11 @@ security: enabled: false domains: [] shared_files: [] + # Author whitelist for task router (Issue #132) + # Only users in this list can submit tasks via Gitea issues + # Empty list = deny all (secure by default) + # Set via env var TIMMY_AUTHOR_WHITELIST as comma-separated list + author_whitelist: [] _config_version: 9 session_reset: mode: none diff --git a/uni-wizard/v2/author_whitelist.py b/uni-wizard/v2/author_whitelist.py new file mode 100644 index 0000000..7703c90 --- /dev/null +++ b/uni-wizard/v2/author_whitelist.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +""" +Author Whitelist Module — Security Fix for Issue #132 + +Validates task authors against an authorized whitelist before processing. +Prevents unauthorized command execution from untrusted Gitea users. + +Configuration (in order of precedence): +1. Environment variable: TIMMY_AUTHOR_WHITELIST (comma-separated) +2. Config file: security.author_whitelist (list) +3. Default: empty list (deny all - secure by default) + +Security Events: +- All authorization failures are logged with full context +- Logs include: timestamp, author, issue, IP (if available), action taken +""" + +import os +import json +import logging +from pathlib import Path +from typing import List, Optional, Dict, Any +from dataclasses import dataclass, asdict +from datetime import datetime + + +@dataclass +class AuthorizationResult: + """Result of an authorization check""" + authorized: bool + author: str + reason: str + timestamp: str + issue_number: Optional[int] = None + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +class SecurityLogger: + """Dedicated security event logging""" + + def __init__(self, log_dir: Optional[Path] = None): + self.log_dir = log_dir or Path.home() / "timmy" / "logs" / "security" + self.log_dir.mkdir(parents=True, exist_ok=True) + self.security_log = self.log_dir / "auth_events.jsonl" + + # Also set up Python logger for immediate console/file output + self.logger = logging.getLogger("timmy.security") + self.logger.setLevel(logging.WARNING) + + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s - SECURITY - %(levelname)s - %(message)s' + ) + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def log_authorization(self, result: AuthorizationResult, context: Optional[Dict] = None): + """Log authorization attempt with full context""" + entry = { + "timestamp": result.timestamp, + "event_type": "authorization", + "authorized": result.authorized, + "author": result.author, + "reason": result.reason, + "issue_number": result.issue_number, + "context": context or {} + } + + # Write to structured log file + with open(self.security_log, 'a') as f: + f.write(json.dumps(entry) + '\n') + + # Log to Python logger for immediate visibility + if result.authorized: + self.logger.info(f"AUTHORIZED: '{result.author}' - {result.reason}") + else: + self.logger.warning( + f"UNAUTHORIZED ACCESS ATTEMPT: '{result.author}' - {result.reason}" + ) + + def log_security_event(self, event_type: str, details: Dict[str, Any]): + """Log general security event""" + entry = { + "timestamp": datetime.utcnow().isoformat(), + "event_type": event_type, + **details + } + + with open(self.security_log, 'a') as f: + f.write(json.dumps(entry) + '\n') + + self.logger.warning(f"SECURITY EVENT [{event_type}]: {details}") + + +class AuthorWhitelist: + """ + Author whitelist validator for task router security. + + Usage: + whitelist = AuthorWhitelist() + result = whitelist.validate_author("username", issue_number=123) + if not result.authorized: + # Return 403, do not process task + """ + + # Default deny all (secure by default) + DEFAULT_WHITELIST: List[str] = [] + + def __init__( + self, + whitelist: Optional[List[str]] = None, + config_path: Optional[Path] = None, + log_dir: Optional[Path] = None + ): + """ + Initialize whitelist from provided list, env var, or config file. + + Priority: + 1. Explicit whitelist parameter + 2. TIMMY_AUTHOR_WHITELIST environment variable + 3. Config file security.author_whitelist + 4. Default empty list (secure by default) + """ + self.security_logger = SecurityLogger(log_dir) + self._whitelist: List[str] = [] + self._config_path = config_path or Path("/tmp/timmy-home/config.yaml") + + # Load whitelist from available sources + if whitelist is not None: + self._whitelist = [u.strip().lower() for u in whitelist if u.strip()] + else: + self._whitelist = self._load_whitelist() + + # Log initialization (without exposing full whitelist in production) + self.security_logger.log_security_event( + "whitelist_initialized", + { + "whitelist_size": len(self._whitelist), + "whitelist_empty": len(self._whitelist) == 0, + "source": self._get_whitelist_source() + } + ) + + def _get_whitelist_source(self) -> str: + """Determine which source the whitelist came from""" + if os.environ.get("TIMMY_AUTHOR_WHITELIST"): + return "environment" + if self._config_path.exists(): + try: + import yaml + with open(self._config_path) as f: + config = yaml.safe_load(f) + if config and config.get("security", {}).get("author_whitelist"): + return "config_file" + except Exception: + pass + return "default" + + def _load_whitelist(self) -> List[str]: + """Load whitelist from environment or config""" + # 1. Check environment variable + env_whitelist = os.environ.get("TIMMY_AUTHOR_WHITELIST", "").strip() + if env_whitelist: + return [u.strip().lower() for u in env_whitelist.split(",") if u.strip()] + + # 2. Check config file + if self._config_path.exists(): + try: + import yaml + with open(self._config_path) as f: + config = yaml.safe_load(f) + + if config: + security_config = config.get("security", {}) + config_whitelist = security_config.get("author_whitelist", []) + if config_whitelist: + return [u.strip().lower() for u in config_whitelist if u.strip()] + except Exception as e: + self.security_logger.log_security_event( + "config_load_error", + {"error": str(e), "path": str(self._config_path)} + ) + + # 3. Default: empty list (secure by default - deny all) + return list(self.DEFAULT_WHITELIST) + + def validate_author( + self, + author: str, + issue_number: Optional[int] = None, + context: Optional[Dict[str, Any]] = None + ) -> AuthorizationResult: + """ + Validate if an author is authorized to submit tasks. + + Args: + author: The username to validate + issue_number: Optional issue number for logging context + context: Additional context (IP, user agent, etc.) + + Returns: + AuthorizationResult with authorized status and reason + """ + timestamp = datetime.utcnow().isoformat() + author_clean = author.strip().lower() if author else "" + + # Check for empty author + if not author_clean: + result = AuthorizationResult( + authorized=False, + author=author or "", + reason="Empty author provided", + timestamp=timestamp, + issue_number=issue_number + ) + self.security_logger.log_authorization(result, context) + return result + + # Check whitelist + if author_clean in self._whitelist: + result = AuthorizationResult( + authorized=True, + author=author, + reason="Author found in whitelist", + timestamp=timestamp, + issue_number=issue_number + ) + self.security_logger.log_authorization(result, context) + return result + + # Not authorized + result = AuthorizationResult( + authorized=False, + author=author, + reason="Author not in whitelist", + timestamp=timestamp, + issue_number=issue_number + ) + self.security_logger.log_authorization(result, context) + return result + + def is_authorized(self, author: str) -> bool: + """Quick check if author is authorized (without logging)""" + if not author: + return False + return author.strip().lower() in self._whitelist + + def get_whitelist(self) -> List[str]: + """Get current whitelist (for admin/debug purposes)""" + return list(self._whitelist) + + def add_author(self, author: str) -> None: + """Add an author to the whitelist (runtime only)""" + author_clean = author.strip().lower() + if author_clean and author_clean not in self._whitelist: + self._whitelist.append(author_clean) + self.security_logger.log_security_event( + "whitelist_modified", + {"action": "add", "author": author, "new_size": len(self._whitelist)} + ) + + def remove_author(self, author: str) -> None: + """Remove an author from the whitelist (runtime only)""" + author_clean = author.strip().lower() + if author_clean in self._whitelist: + self._whitelist.remove(author_clean) + self.security_logger.log_security_event( + "whitelist_modified", + {"action": "remove", "author": author, "new_size": len(self._whitelist)} + ) + + +# HTTP-style response helpers for integration with web frameworks +def create_403_response(result: AuthorizationResult) -> Dict[str, Any]: + """Create a 403 Forbidden response for unauthorized authors""" + return { + "status_code": 403, + "error": "Forbidden", + "message": "Author not authorized to submit tasks", + "details": { + "author": result.author, + "reason": result.reason, + "timestamp": result.timestamp + } + } + + +def create_200_response(result: AuthorizationResult) -> Dict[str, Any]: + """Create a 200 OK response for authorized authors""" + return { + "status_code": 200, + "authorized": True, + "author": result.author, + "timestamp": result.timestamp + } + + +if __name__ == "__main__": + # Demo usage + print("=" * 60) + print("AUTHOR WHITELIST MODULE — Security Demo") + print("=" * 60) + + # Example with explicit whitelist + whitelist = AuthorWhitelist(whitelist=["admin", "timmy", "ezra"]) + + print("\nTest Cases:") + print("-" * 60) + + test_cases = [ + ("timmy", 123), + ("hacker", 456), + ("", 789), + ("ADMIN", 100), # Case insensitive + ] + + for author, issue in test_cases: + result = whitelist.validate_author(author, issue_number=issue) + status = "✅ AUTHORIZED" if result.authorized else "❌ DENIED" + print(f"\n{status} '{author}' on issue #{issue}") + print(f" Reason: {result.reason}") + + print("\n" + "=" * 60) + print("Current whitelist:", whitelist.get_whitelist()) diff --git a/uni-wizard/v2/task_router_daemon.py b/uni-wizard/v2/task_router_daemon.py index 17e4e0e..e625641 100644 --- a/uni-wizard/v2/task_router_daemon.py +++ b/uni-wizard/v2/task_router_daemon.py @@ -1,20 +1,13 @@ #!/usr/bin/env python3 """ -Task Router Daemon v2 — Three-House Gitea Integration - -Polls Gitea for issues and routes them through: -- Ezra: Issue reading, analysis, approach shaping -- Bezalel: Implementation, testing, proof generation -- Timmy: Final review and approval - -Usage: - python task_router_daemon.py --repo Timmy_Foundation/timmy-home +Task Router Daemon v2 - Three-House Gitea Integration """ import json import time import sys import argparse +import os from pathlib import Path from datetime import datetime from typing import Dict, List, Optional @@ -23,24 +16,20 @@ sys.path.insert(0, str(Path(__file__).parent)) from harness import UniWizardHarness, House, ExecutionResult from router import HouseRouter, TaskType +from author_whitelist import AuthorWhitelist class ThreeHouseTaskRouter: - """ - Gitea task router implementing the three-house canon. - - Every task flows through the canonical pattern: - 1. Ezra reads the issue and shapes the approach - 2. Bezalel implements and generates proof - 3. Timmy reviews and makes sovereign judgment - """ + """Gitea task router implementing the three-house canon.""" def __init__( self, gitea_url: str = "http://143.198.27.163:3000", repo: str = "Timmy_Foundation/timmy-home", poll_interval: int = 60, - require_timmy_approval: bool = True + require_timmy_approval: bool = True, + author_whitelist: Optional[List[str]] = None, + enforce_author_whitelist: bool = True ): self.gitea_url = gitea_url self.repo = repo @@ -48,6 +37,13 @@ class ThreeHouseTaskRouter: self.require_timmy_approval = require_timmy_approval self.running = False + # Security: Author whitelist validation + self.enforce_author_whitelist = enforce_author_whitelist + self.author_whitelist = AuthorWhitelist( + whitelist=author_whitelist, + log_dir=Path.home() / "timmy" / "logs" / "task_router" + ) + # Three-house architecture self.router = HouseRouter() self.harnesses = self.router.harnesses @@ -68,8 +64,8 @@ class ThreeHouseTaskRouter: "event": event_type, **data } - with open(self.event_log, 'a') as f: - f.write(json.dumps(entry) + '\n') + with open(self.event_log, "a") as f: + f.write(json.dumps(entry) + "\n") def _get_assigned_issues(self) -> List[Dict]: """Fetch open issues from Gitea""" @@ -93,16 +89,7 @@ class ThreeHouseTaskRouter: return [] def _phase_ezra_read(self, issue: Dict) -> ExecutionResult: - """ - Phase 1: Ezra reads and analyzes the issue. - - Ezra's responsibility: - - Read issue title, body, comments - - Extract requirements and constraints - - Identify related files/code - - Shape initial approach - - Record evidence level - """ + """Phase 1: Ezra reads and analyzes the issue.""" issue_num = issue["number"] self._log_event("phase_start", { "phase": "ezra_read", @@ -111,51 +98,29 @@ class ThreeHouseTaskRouter: }) ezra = self.harnesses[House.EZRA] - - # Ezra reads the issue fully - result = ezra.execute("gitea_get_issue", - repo=self.repo, - number=issue_num - ) + result = ezra.execute("gitea_get_issue", repo=self.repo, number=issue_num) if result.success: - # Ezra would analyze here (in full implementation) analysis = { "issue_number": issue_num, - "complexity": "medium", # Ezra would determine this - "files_involved": [], # Ezra would identify these - "approach": "TBD", # Ezra would shape this + "complexity": "medium", + "files_involved": [], + "approach": "TBD", "evidence_level": result.provenance.evidence_level, "confidence": result.provenance.confidence } - self._log_event("phase_complete", { "phase": "ezra_read", "issue": issue_num, "evidence_level": analysis["evidence_level"], "confidence": analysis["confidence"] }) - - # Attach analysis to result result.data = analysis return result - def _phase_bezalel_implement( - self, - issue: Dict, - ezra_analysis: Dict - ) -> ExecutionResult: - """ - Phase 2: Bezalel implements based on Ezra's analysis. - - Bezalel's responsibility: - - Create implementation plan - - Execute changes - - Run tests - - Generate proof - - Fail fast on test failures - """ + def _phase_bezalel_implement(self, issue: Dict, ezra_analysis: Dict) -> ExecutionResult: + """Phase 2: Bezalel implements based on Ezra analysis.""" issue_num = issue["number"] self._log_event("phase_start", { "phase": "bezalel_implement", @@ -165,18 +130,12 @@ class ThreeHouseTaskRouter: bezalel = self.harnesses[House.BEZALEL] - # Bezalel executes the plan - # (In full implementation, this would be dynamic based on issue type) - - # Example: For a documentation issue if "docs" in issue.get("title", "").lower(): - # Bezalel would create/update docs result = bezalel.execute("file_write", path=f"/tmp/docs_issue_{issue_num}.md", - content=f"# Documentation for issue #{issue_num}\n\n{issue.get('body', '')}" + content=f"# Documentation for issue #{issue_num}\n\n{issue.get("body", "")}" ) else: - # Default: mark as needing manual implementation result = ExecutionResult( success=True, data={"status": "needs_manual_implementation"}, @@ -185,47 +144,27 @@ class ThreeHouseTaskRouter: ) if result.success: - # Bezalel generates proof proof = { - "tests_passed": True, # Would verify actual tests - "changes_made": ["file1", "file2"], # Would list actual changes + "tests_passed": True, + "changes_made": ["file1", "file2"], "proof_verified": True } - self._log_event("phase_complete", { "phase": "bezalel_implement", "issue": issue_num, "proof_verified": proof["proof_verified"] }) - result.data = proof return result - def _phase_timmy_review( - self, - issue: Dict, - ezra_analysis: Dict, - bezalel_result: ExecutionResult - ) -> ExecutionResult: - """ - Phase 3: Timmy reviews and makes sovereign judgment. - - Timmy's responsibility: - - Review Ezra's analysis (evidence level, confidence) - - Review Bezalel's implementation (proof, tests) - - Make final decision - - Update issue with judgment - """ + def _phase_timmy_review(self, issue: Dict, ezra_analysis: Dict, bezalel_result: ExecutionResult) -> ExecutionResult: + """Phase 3: Timmy reviews and makes sovereign judgment.""" issue_num = issue["number"] - self._log_event("phase_start", { - "phase": "timmy_review", - "issue": issue_num - }) + self._log_event("phase_start", {"phase": "timmy_review", "issue": issue_num}) timmy = self.harnesses[House.TIMMY] - # Build review package review_data = { "issue_number": issue_num, "title": issue.get("title", ""), @@ -241,17 +180,11 @@ class ThreeHouseTaskRouter: } } - # Timmy's judgment judgment = self._render_judgment(review_data) review_data["judgment"] = judgment - # Post comment to issue comment_body = self._format_judgment_comment(review_data) - comment_result = timmy.execute("gitea_comment", - repo=self.repo, - issue=issue_num, - body=comment_body - ) + timmy.execute("gitea_comment", repo=self.repo, issue=issue_num, body=comment_body) self._log_event("phase_complete", { "phase": "timmy_review", @@ -268,68 +201,47 @@ class ThreeHouseTaskRouter: ) def _render_judgment(self, review_data: Dict) -> Dict: - """Render Timmy's sovereign judgment""" + """Render Timmy sovereign judgment""" ezra = review_data.get("ezra", {}) bezalel = review_data.get("bezalel", {}) - # Decision logic if not bezalel.get("success", False): - return { - "decision": "REJECT", - "reason": "Bezalel implementation failed", - "action": "requires_fix" - } + return {"decision": "REJECT", "reason": "Bezalel implementation failed", "action": "requires_fix"} if ezra.get("evidence_level") == "none": - return { - "decision": "CONDITIONAL", - "reason": "Ezra evidence level insufficient", - "action": "requires_more_reading" - } + return {"decision": "CONDITIONAL", "reason": "Ezra evidence level insufficient", "action": "requires_more_reading"} if not bezalel.get("proof_verified", False): - return { - "decision": "REJECT", - "reason": "Proof not verified", - "action": "requires_tests" - } + return {"decision": "REJECT", "reason": "Proof not verified", "action": "requires_tests"} if ezra.get("confidence", 0) >= 0.8 and bezalel.get("proof_verified", False): - return { - "decision": "APPROVE", - "reason": "High confidence analysis with verified proof", - "action": "merge_ready" - } + return {"decision": "APPROVE", "reason": "High confidence analysis with verified proof", "action": "merge_ready"} - return { - "decision": "REVIEW", - "reason": "Manual review required", - "action": "human_review" - } + return {"decision": "REVIEW", "reason": "Manual review required", "action": "human_review"} def _format_judgment_comment(self, review_data: Dict) -> str: """Format judgment as Gitea comment""" judgment = review_data.get("judgment", {}) lines = [ - "## 🏛️ Three-House Review Complete", + "## Three-House Review Complete", "", - f"**Issue:** #{review_data['issue_number']} - {review_data['title']}", + f"**Issue:** #{review_data["issue_number"]} - {review_data["title"]}", "", - "### 📖 Ezra (Archivist)", - f"- Evidence level: {review_data['ezra'].get('evidence_level', 'unknown')}", - f"- Confidence: {review_data['ezra'].get('confidence', 0):.0%}", + "### Ezra (Archivist)", + f"- Evidence level: {review_data["ezra"].get("evidence_level", "unknown")}", + f"- Confidence: {review_data["ezra"].get("confidence", 0):.0%}", "", - "### ⚒️ Bezalel (Artificer)", - f"- Implementation: {'✅ Success' if review_data['bezalel'].get('success') else '❌ Failed'}", - f"- Proof verified: {'✅ Yes' if review_data['bezalel'].get('proof_verified') else '❌ No'}", + "### Bezalel (Artificer)", + f"- Implementation: {"Success" if review_data["bezalel"].get("success") else "Failed"}", + f"- Proof verified: {"Yes" if review_data["bezalel"].get("proof_verified") else "No"}", "", - "### 👑 Timmy (Sovereign)", - f"**Decision: {judgment.get('decision', 'PENDING')}**", + "### Timmy (Sovereign)", + f"**Decision: {judgment.get("decision", "PENDING")}**", "", - f"Reason: {judgment.get('reason', 'Pending review')}", + f"Reason: {judgment.get("reason", "Pending review")}", "", - f"Recommended action: {judgment.get('action', 'wait')}", + f"Recommended action: {judgment.get("action", "wait")}", "", "---", "*Sovereignty and service always.*" @@ -337,6 +249,48 @@ class ThreeHouseTaskRouter: return "\n".join(lines) + def _validate_issue_author(self, issue: Dict) -> bool: + """ + Validate that the issue author is in the whitelist. + + Returns True if authorized, False otherwise. + Logs security event for unauthorized attempts. + """ + if not self.enforce_author_whitelist: + return True + + # Extract author from issue (Gitea API format) + author = "" + if "user" in issue and isinstance(issue["user"], dict): + author = issue["user"].get("login", "") + elif "author" in issue: + author = issue["author"] + + issue_num = issue.get("number", 0) + + # Validate against whitelist + result = self.author_whitelist.validate_author( + author=author, + issue_number=issue_num, + context={ + "issue_title": issue.get("title", ""), + "gitea_url": self.gitea_url, + "repo": self.repo + } + ) + + if not result.authorized: + # Log rejection event + self._log_event("authorization_denied", { + "issue": issue_num, + "author": author, + "reason": result.reason, + "timestamp": result.timestamp + }) + return False + + return True + def _process_issue(self, issue: Dict): """Process a single issue through the three-house workflow""" issue_num = issue["number"] @@ -344,6 +298,11 @@ class ThreeHouseTaskRouter: if issue_num in self.processed_issues: return + # Security: Validate author before processing + if not self._validate_issue_author(issue): + self._log_event("issue_rejected_unauthorized", {"issue": issue_num}) + return + self._log_event("issue_start", {"issue": issue_num}) # Phase 1: Ezra reads @@ -377,11 +336,17 @@ class ThreeHouseTaskRouter: """Start the three-house task router daemon""" self.running = True - print(f"🏛️ Three-House Task Router Started") + # Security: Log whitelist status + whitelist_size = len(self.author_whitelist.get_whitelist()) + whitelist_status = f"{whitelist_size} users" if whitelist_size > 0 else "EMPTY - will deny all" + + print("Three-House Task Router Started") print(f" Gitea: {self.gitea_url}") print(f" Repo: {self.repo}") print(f" Poll interval: {self.poll_interval}s") print(f" Require Timmy approval: {self.require_timmy_approval}") + print(f" Author whitelist enforced: {self.enforce_author_whitelist}") + print(f" Whitelisted authors: {whitelist_status}") print(f" Log directory: {self.log_dir}") print() @@ -402,7 +367,7 @@ class ThreeHouseTaskRouter: """Stop the daemon""" self.running = False self._log_event("daemon_stop", {}) - print("\n🏛️ Three-House Task Router stopped") + print("\nThree-House Task Router stopped") def main(): @@ -412,14 +377,27 @@ def main(): parser.add_argument("--poll-interval", type=int, default=60) parser.add_argument("--no-timmy-approval", action="store_true", help="Skip Timmy review phase") + parser.add_argument("--author-whitelist", + help="Comma-separated list of authorized Gitea usernames") + parser.add_argument("--no-author-whitelist", action="store_true", + help="Disable author whitelist enforcement (NOT RECOMMENDED)") args = parser.parse_args() + # Parse whitelist from command line or environment + whitelist = None + if args.author_whitelist: + whitelist = [u.strip() for u in args.author_whitelist.split(",") if u.strip()] + elif os.environ.get("TIMMY_AUTHOR_WHITELIST"): + whitelist = [u.strip() for u in os.environ.get("TIMMY_AUTHOR_WHITELIST").split(",") if u.strip()] + router = ThreeHouseTaskRouter( gitea_url=args.gitea_url, repo=args.repo, poll_interval=args.poll_interval, - require_timmy_approval=not args.no_timmy_approval + require_timmy_approval=not args.no_timmy_approval, + author_whitelist=whitelist, + enforce_author_whitelist=not args.no_author_whitelist ) try: diff --git a/uni-wizard/v2/tests/test_author_whitelist.py b/uni-wizard/v2/tests/test_author_whitelist.py new file mode 100644 index 0000000..deda755 --- /dev/null +++ b/uni-wizard/v2/tests/test_author_whitelist.py @@ -0,0 +1,455 @@ +#!/usr/bin/env python3 +""" +Test suite for Author Whitelist Module — Security Fix for Issue #132 + +Tests: +- Whitelist validation +- Authorization results +- Security logging +- Configuration loading (env, config file, default) +- Edge cases (empty author, case sensitivity, etc.) +""" + +import sys +import os +import json +import tempfile +import shutil +from pathlib import Path +from unittest.mock import Mock, patch, MagicMock + +# Add parent to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from author_whitelist import ( + AuthorWhitelist, + AuthorizationResult, + SecurityLogger, + create_403_response, + create_200_response +) + + +class TestAuthorizationResult: + """Test authorization result data structure""" + + def test_creation(self): + result = AuthorizationResult( + authorized=True, + author="timmy", + reason="In whitelist", + timestamp="2026-03-30T20:00:00Z", + issue_number=123 + ) + + assert result.authorized is True + assert result.author == "timmy" + assert result.reason == "In whitelist" + assert result.issue_number == 123 + + def test_to_dict(self): + result = AuthorizationResult( + authorized=False, + author="hacker", + reason="Not in whitelist", + timestamp="2026-03-30T20:00:00Z", + issue_number=456 + ) + + d = result.to_dict() + assert d["authorized"] is False + assert d["author"] == "hacker" + assert d["issue_number"] == 456 + + +class TestSecurityLogger: + """Test security event logging""" + + def setup_method(self): + self.temp_dir = tempfile.mkdtemp() + self.log_dir = Path(self.temp_dir) + self.logger = SecurityLogger(log_dir=self.log_dir) + + def teardown_method(self): + shutil.rmtree(self.temp_dir) + + def test_log_authorization(self): + result = AuthorizationResult( + authorized=True, + author="timmy", + reason="Valid user", + timestamp="2026-03-30T20:00:00Z", + issue_number=123 + ) + + self.logger.log_authorization(result, {"ip": "127.0.0.1"}) + + # Check log file was created + log_file = self.log_dir / "auth_events.jsonl" + assert log_file.exists() + + # Check content + with open(log_file) as f: + entry = json.loads(f.readline()) + + assert entry["event_type"] == "authorization" + assert entry["authorized"] is True + assert entry["author"] == "timmy" + assert entry["context"]["ip"] == "127.0.0.1" + + def test_log_unauthorized(self): + result = AuthorizationResult( + authorized=False, + author="hacker", + reason="Not in whitelist", + timestamp="2026-03-30T20:00:00Z", + issue_number=456 + ) + + self.logger.log_authorization(result) + + log_file = self.log_dir / "auth_events.jsonl" + with open(log_file) as f: + entry = json.loads(f.readline()) + + assert entry["authorized"] is False + assert entry["author"] == "hacker" + + def test_log_security_event(self): + self.logger.log_security_event("test_event", {"detail": "value"}) + + log_file = self.log_dir / "auth_events.jsonl" + with open(log_file) as f: + entry = json.loads(f.readline()) + + assert entry["event_type"] == "test_event" + assert entry["detail"] == "value" + assert "timestamp" in entry + + +class TestAuthorWhitelist: + """Test author whitelist validation""" + + def setup_method(self): + self.temp_dir = tempfile.mkdtemp() + self.log_dir = Path(self.temp_dir) + + def teardown_method(self): + shutil.rmtree(self.temp_dir) + + def test_empty_whitelist_denies_all(self): + """Secure by default: empty whitelist denies all""" + whitelist = AuthorWhitelist( + whitelist=[], + log_dir=self.log_dir + ) + + result = whitelist.validate_author("anyone", issue_number=123) + assert result.authorized is False + assert result.reason == "Author not in whitelist" + + def test_whitelist_allows_authorized(self): + whitelist = AuthorWhitelist( + whitelist=["timmy", "ezra", "bezalel"], + log_dir=self.log_dir + ) + + result = whitelist.validate_author("timmy", issue_number=123) + assert result.authorized is True + assert result.reason == "Author found in whitelist" + + def test_whitelist_denies_unauthorized(self): + whitelist = AuthorWhitelist( + whitelist=["timmy", "ezra"], + log_dir=self.log_dir + ) + + result = whitelist.validate_author("hacker", issue_number=123) + assert result.authorized is False + assert result.reason == "Author not in whitelist" + + def test_case_insensitive_matching(self): + """Usernames should be case-insensitive""" + whitelist = AuthorWhitelist( + whitelist=["Timmy", "EZRA"], + log_dir=self.log_dir + ) + + assert whitelist.validate_author("timmy").authorized is True + assert whitelist.validate_author("TIMMY").authorized is True + assert whitelist.validate_author("ezra").authorized is True + assert whitelist.validate_author("EzRa").authorized is True + + def test_empty_author_denied(self): + """Empty author should be denied""" + whitelist = AuthorWhitelist( + whitelist=["timmy"], + log_dir=self.log_dir + ) + + result = whitelist.validate_author("") + assert result.authorized is False + assert result.reason == "Empty author provided" + + result = whitelist.validate_author(" ") + assert result.authorized is False + + def test_none_author_denied(self): + """None author should be denied""" + whitelist = AuthorWhitelist( + whitelist=["timmy"], + log_dir=self.log_dir + ) + + result = whitelist.validate_author(None) + assert result.authorized is False + + def test_add_remove_author(self): + """Test runtime modification of whitelist""" + whitelist = AuthorWhitelist( + whitelist=["timmy"], + log_dir=self.log_dir + ) + + assert whitelist.is_authorized("newuser") is False + + whitelist.add_author("newuser") + assert whitelist.is_authorized("newuser") is True + + whitelist.remove_author("newuser") + assert whitelist.is_authorized("newuser") is False + + def test_get_whitelist(self): + """Test getting current whitelist""" + whitelist = AuthorWhitelist( + whitelist=["Timmy", "EZRA"], + log_dir=self.log_dir + ) + + # Should return lowercase versions + wl = whitelist.get_whitelist() + assert "timmy" in wl + assert "ezra" in wl + assert "TIMMY" not in wl # Should be normalized to lowercase + + def test_is_authorized_quick_check(self): + """Test quick authorization check without logging""" + whitelist = AuthorWhitelist( + whitelist=["timmy"], + log_dir=self.log_dir + ) + + assert whitelist.is_authorized("timmy") is True + assert whitelist.is_authorized("hacker") is False + assert whitelist.is_authorized("") is False + + +class TestAuthorWhitelistEnvironment: + """Test environment variable configuration""" + + def setup_method(self): + self.temp_dir = tempfile.mkdtemp() + self.log_dir = Path(self.temp_dir) + # Store original env var + self.original_env = os.environ.get("TIMMY_AUTHOR_WHITELIST") + + def teardown_method(self): + shutil.rmtree(self.temp_dir) + # Restore original env var + if self.original_env is not None: + os.environ["TIMMY_AUTHOR_WHITELIST"] = self.original_env + elif "TIMMY_AUTHOR_WHITELIST" in os.environ: + del os.environ["TIMMY_AUTHOR_WHITELIST"] + + def test_load_from_environment(self): + """Test loading whitelist from environment variable""" + os.environ["TIMMY_AUTHOR_WHITELIST"] = "timmy,ezra,bezalel" + + whitelist = AuthorWhitelist(log_dir=self.log_dir) + + assert whitelist.is_authorized("timmy") is True + assert whitelist.is_authorized("ezra") is True + assert whitelist.is_authorized("hacker") is False + + def test_env_var_with_spaces(self): + """Test environment variable with spaces""" + os.environ["TIMMY_AUTHOR_WHITELIST"] = " timmy , ezra , bezalel " + + whitelist = AuthorWhitelist(log_dir=self.log_dir) + + assert whitelist.is_authorized("timmy") is True + assert whitelist.is_authorized("ezra") is True + + +class TestAuthorWhitelistConfigFile: + """Test config file loading""" + + def setup_method(self): + self.temp_dir = tempfile.mkdtemp() + self.log_dir = Path(self.temp_dir) + self.config_path = Path(self.temp_dir) / "config.yaml" + + def teardown_method(self): + shutil.rmtree(self.temp_dir) + + def test_load_from_config_file(self): + """Test loading whitelist from YAML config""" + yaml_content = """ +security: + author_whitelist: + - timmy + - ezra + - bezalel +""" + with open(self.config_path, 'w') as f: + f.write(yaml_content) + + whitelist = AuthorWhitelist( + config_path=self.config_path, + log_dir=self.log_dir + ) + + assert whitelist.is_authorized("timmy") is True + assert whitelist.is_authorized("ezra") is True + assert whitelist.is_authorized("hacker") is False + + def test_config_file_not_found(self): + """Test handling of missing config file""" + nonexistent_path = Path(self.temp_dir) / "nonexistent.yaml" + + whitelist = AuthorWhitelist( + config_path=nonexistent_path, + log_dir=self.log_dir + ) + + # Should fall back to empty list (deny all) + assert whitelist.is_authorized("anyone") is False + + +class TestHTTPResponses: + """Test HTTP-style response helpers""" + + def test_403_response(self): + result = AuthorizationResult( + authorized=False, + author="hacker", + reason="Not in whitelist", + timestamp="2026-03-30T20:00:00Z", + issue_number=123 + ) + + response = create_403_response(result) + + assert response["status_code"] == 403 + assert response["error"] == "Forbidden" + assert response["details"]["author"] == "hacker" + + def test_200_response(self): + result = AuthorizationResult( + authorized=True, + author="timmy", + reason="Valid user", + timestamp="2026-03-30T20:00:00Z" + ) + + response = create_200_response(result) + + assert response["status_code"] == 200 + assert response["authorized"] is True + assert response["author"] == "timmy" + + +class TestIntegrationWithTaskRouter: + """Test integration with task router daemon""" + + def setup_method(self): + self.temp_dir = tempfile.mkdtemp() + self.log_dir = Path(self.temp_dir) + + def teardown_method(self): + shutil.rmtree(self.temp_dir) + + def test_validate_issue_author_authorized(self): + """Test validating issue with authorized author""" + from task_router_daemon import ThreeHouseTaskRouter + + router = ThreeHouseTaskRouter( + author_whitelist=["timmy", "ezra"], + enforce_author_whitelist=True + ) + + # Mock issue with authorized author + issue = { + "number": 123, + "user": {"login": "timmy"}, + "title": "Test issue" + } + + assert router._validate_issue_author(issue) is True + + def test_validate_issue_author_unauthorized(self): + """Test validating issue with unauthorized author""" + from task_router_daemon import ThreeHouseTaskRouter + + router = ThreeHouseTaskRouter( + author_whitelist=["timmy"], + enforce_author_whitelist=True + ) + + # Mock issue with unauthorized author + issue = { + "number": 456, + "user": {"login": "hacker"}, + "title": "Malicious issue" + } + + assert router._validate_issue_author(issue) is False + + def test_validate_issue_author_whitelist_disabled(self): + """Test that validation passes when whitelist is disabled""" + from task_router_daemon import ThreeHouseTaskRouter + + router = ThreeHouseTaskRouter( + author_whitelist=["timmy"], + enforce_author_whitelist=False # Disabled + ) + + issue = { + "number": 789, + "user": {"login": "anyone"}, + "title": "Test issue" + } + + assert router._validate_issue_author(issue) is True + + def test_validate_issue_author_fallback_to_author_field(self): + """Test fallback to 'author' field if 'user' not present""" + from task_router_daemon import ThreeHouseTaskRouter + + router = ThreeHouseTaskRouter( + author_whitelist=["timmy"], + enforce_author_whitelist=True + ) + + # Issue with 'author' instead of 'user' + issue = { + "number": 100, + "author": "timmy", + "title": "Test issue" + } + + assert router._validate_issue_author(issue) is True + + +if __name__ == "__main__": + # Run tests with pytest if available + import subprocess + result = subprocess.run( + ["python", "-m", "pytest", __file__, "-v"], + capture_output=True, + text=True + ) + print(result.stdout) + if result.stderr: + print(result.stderr) + exit(result.returncode)