#!/usr/bin/env python3 """Tests for webhook_runner.py — GEMINI-HARDEN-04 Covers: • Signature verification (valid/invalid/missing) • Event allowlist (push, PR, issue, unknown) • Repository allowlist • Branch allowlist (push) • Action allowlist (PR/issue) • Idempotency (duplicate delivery IDs) • Invalid JSON rejection • Safe dispatch enforcement (pre-approved actions only) • Structured logging output """ import json import tempfile import time from pathlib import Path import pytest import sys import re import re SCRIPT_DIR = Path(__file__).resolve().parent.parent / "scripts" sys.path.insert(0, str(SCRIPT_DIR)) from webhook_runner import ( Policy, SafeDispatcher, IdempotencyStore, StructuredLogger, verify_signature, _infer_event_type, ) FIXTURES_DIR = Path(__file__).parent / "fixtures" / "webhook" # --------------------------------------------------------------------------- # Signature Verification # --------------------------------------------------------------------------- def test_verify_signature_valid(): payload = b'{"test": 1}' secret = "my-secret" sig = "sha256=" + __import__('hmac').new(secret.encode(), payload, __import__('hashlib').sha256).hexdigest() assert verify_signature(payload, sig, secret) is True def test_verify_signature_invalid(): payload = b'{"test": 1}' assert verify_signature(payload, "sha256=deadbeef", "my-secret") is False def test_verify_signature_header_with_sha256_prefix(): payload = b'{"test": 1}' secret = "my-secret" correct = __import__('hmac').new(secret.encode(), payload, __import__('hashlib').sha256).hexdigest() assert verify_signature(payload, f"sha256={correct}", secret) is True def test_verify_signature_missing_header(): payload = b'{"test": 1}' assert verify_signature(payload, "", "secret") is False def test_verify_signature_dev_mode_when_secret_unset(): payload = b'{"test": 1}' assert verify_signature(payload, "anything", None) is True # --------------------------------------------------------------------------- # Event Type Inference (fallback when header missing) # --------------------------------------------------------------------------- def test_infer_event_type_push(): payload = {"ref": "refs/heads/main", "commits": [{"id": "123"}]} assert _infer_event_type(payload) == "push" def test_infer_event_type_pr(): payload = {"pull_request": {"number": 1}, "action": "opened"} assert _infer_event_type(payload) == "pull_request" def test_infer_event_type_issue(): payload = {"issue": {"number": 2}, "action": "opened"} assert _infer_event_type(payload) == "issues" def test_infer_event_type_unknown(): payload = {"foo": "bar"} assert _infer_event_type(payload) == "unknown" # --------------------------------------------------------------------------- # Policy Validation # --------------------------------------------------------------------------- def make_policy(allowed_events=None, allowed_repos=None, allowed_branches=None, allowed_actions=None, dispatch=None): return Policy( allowed_events=allowed_events or ["push", "pull_request", "issues"], allowed_repos=allowed_repos or ["Timmy_Foundation/*"], allowed_branches=allowed_branches or ["main", "master", "develop"], allowed_actions=allowed_actions or ["opened", "closed", "synchronize"], dispatch_rules=dispatch or {}, ) def test_policy_allows_configured_event(): policy = make_policy(allowed_events=["push", "pull_request"]) payload = {"commits": [], "ref": "refs/heads/main", "repository": {"full_name": "Timmy_Foundation/test"}} ok, reason = policy.validate_event("push", payload) assert ok is True def test_policy_denies_unknown_event(): policy = make_policy(allowed_events=["push"]) payload = {"repository": {"full_name": "Timmy_Foundation/test"}} ok, reason = policy.validate_event("spam", payload) assert ok is False assert "event type not allowed" in reason def test_policy_allows_repo_pattern(): # With a valid ref also (push requires ref), but focus is repo pattern matching policy = make_policy(allowed_repos=["Timmy_Foundation/*", "SomeOrg/special"]) payload = {"repository": {"full_name": "Timmy_Foundation/timmy-config"}, "ref": "refs/heads/main"} ok, reason = policy.validate_event("push", payload) assert ok is True def test_policy_denies_disallowed_repo(): policy = make_policy(allowed_repos=["Timmy_Foundation/*"]) payload = {"repository": {"full_name": "OtherOrg/other"}} ok, reason = policy.validate_event("push", payload) assert ok is False assert "repository not allowed" in reason def test_policy_allows_branch(): policy = make_policy(allowed_branches=["main", "develop"], allowed_repos=["*/*"]) payload = {"ref": "refs/heads/main", "repository": {"full_name": "AnyOrg/anyrepo"}} ok, reason = policy.validate_event("push", payload) assert ok is True def test_policy_denies_branch(): # Repo configured to match anything; branch checked is feature/x which is not in ["main"] policy = make_policy(allowed_branches=["main"], allowed_repos=["*/*"]) payload = {"ref": "refs/heads/feature/x", "repository": {"full_name": "AnyOrg/anyrepo"}} ok, reason = policy.validate_event("push", payload) assert ok is False assert "branch not allowed" in reason def test_policy_allows_action(): # Override allowed_repos to match payload policy = make_policy(allowed_actions=["opened", "closed"], allowed_repos=["T/*", "Timmy_Foundation/*"]) payload = {"action": "opened", "issue": {"number": 1}, "repository": {"full_name": "T/T"}} ok, reason = policy.validate_event("issues", payload) assert ok is True def test_policy_denies_action(): policy = make_policy(allowed_actions=["opened"], allowed_repos=["*/*"]) payload = {"action": "deleted", "issue": {"number": 1}, "repository": {"full_name": "AnyOrg/any"}} ok, reason = policy.validate_event("issues", payload) assert ok is False assert "action not allowed" in reason # --------------------------------------------------------------------------- # Dispatch Rules # --------------------------------------------------------------------------- def make_policy_with_dispatch(): dispatch = { "push": { "refs/heads/main": {"allowed": True, "action": "trigger_deploy", "comment": "Main deploy"}, "refs/heads/": {"allowed": True, "action": "log_and_ack", "comment": "Branch push"}, }, "pull_request": { "opened": {"allowed": True, "action": "log_and_ack", "comment": "PR opened"}, }, "issues": { "opened": {"allowed": False, "action": "ignore", "comment": "Issues disabled"}, }, } return Policy( allowed_events=["push", "pull_request", "issues"], allowed_repos=["Timmy_Foundation/*"], allowed_branches=["main", "develop"], allowed_actions=["opened", "closed"], dispatch_rules=dispatch, ) def test_dispatch_main_push(): policy = make_policy_with_dispatch() payload = {"ref": "refs/heads/main", "repository": {"full_name": "Timmy_Foundation/test"}} allowed, action, comment = policy.get_dispatch_action("push", payload) assert allowed is True assert action == "trigger_deploy" def test_dispatch_feature_branch(): policy = make_policy_with_dispatch() payload = {"ref": "refs/heads/feature/x", "repository": {"full_name": "Timmy_Foundation/test"}} allowed, action, comment = policy.get_dispatch_action("push", payload) assert allowed is True assert action == "log_and_ack" def test_dispatch_pr_opened(): policy = make_policy_with_dispatch() payload = {"action": "opened", "pull_request": {"number": 1}, "repository": {"full_name": "T/T"}} allowed, action, comment = policy.get_dispatch_action("pull_request", payload) assert allowed is True assert action == "log_and_ack" def test_dispatch_issue_opened_denied(): policy = make_policy_with_dispatch() payload = {"action": "opened", "issue": {"number": 1}, "repository": {"full_name": "T/T"}} allowed, action, comment = policy.get_dispatch_action("issues", payload) assert allowed is False assert action == "ignore" # --------------------------------------------------------------------------- # Idempotency Store # --------------------------------------------------------------------------- def test_idempotency_store_seen_and_mark(): with tempfile.TemporaryDirectory() as tmp: path = Path(tmp) / "idemp.json" store = IdempotencyStore(path, max_entries=10, ttl_days=1) assert store.seen("never-seen") is False store.mark_seen("deliv-1", "push", "Timmy_Foundation/test") assert store.seen("deliv-1") is True # Persistence check store2 = IdempotencyStore(path, max_entries=10, ttl_days=1) assert store2.seen("deliv-1") is True def test_idempotency_store_prunes_expired(): with tempfile.TemporaryDirectory() as tmp: path = Path(tmp) / "idemp.json" store = IdempotencyStore(path, max_entries=10, ttl_days=1/24) # 1 hour TTL store.mark_seen("old", "push", "T/T") # Manually backdate entry store._data["old"]["timestamp"] = time.time() - 4000 # >1h ago store._save_locked() # Reload triggers prune store2 = IdempotencyStore(path, max_entries=10, ttl_days=1/24) assert store2.seen("old") is False def test_idempotency_store_size_cap(): with tempfile.TemporaryDirectory() as tmp: path = Path(tmp) / "idemp.json" store = IdempotencyStore(path, max_entries=5, ttl_days=1) for i in range(10): store.mark_seen(f"id-{i}", "push", "T/T") assert len(store._data) == 5 # Ensure oldest entries dropped assert "id-0" not in store._data assert "id-9" in store._data # --------------------------------------------------------------------------- # Structured Logger # --------------------------------------------------------------------------- def test_structured_logger_emits_json_lines(): with tempfile.NamedTemporaryFile(mode="w+", delete=False) as f: log_path = f.name try: logger = StructuredLogger(log_file=log_path, level="INFO") logger.info(event_type="push", delivery_id="abc", msg="test message", repo="test/repo") logger.close() with open(log_path) as f: lines = [json.loads(l) for l in f.read().strip().split("\n") if l.strip()] assert len(lines) == 1 assert lines[0]["level"] == "INFO" assert lines[0]["event_type"] == "push" assert lines[0]["delivery_id"] == "abc" assert lines[0]["msg"] == "test message" finally: Path(log_path).unlink(missing_ok=True) def test_structured_logger_respects_level(): with tempfile.NamedTemporaryFile(mode="w+", delete=False) as f: log_path = f.name try: logger = StructuredLogger(log_file=log_path, level="WARNING") logger.info(msg="should be filtered") logger.warning(msg="should appear") logger.close() with open(log_path) as f: content = f.read() assert "should be filtered" not in content assert "should appear" in content finally: Path(log_path).unlink(missing_ok=True) # --------------------------------------------------------------------------- # Safe Dispatcher # --------------------------------------------------------------------------- def test_dispatcher_log_and_ack(): policy = make_policy_with_dispatch() # Override repo pattern to accept any; use 'develop' branch which is allowed and matches pattern rule policy.allowed_repo_patterns = [re.compile(".*")] logger = StructuredLogger(log_file=None, level="INFO") dispatcher = SafeDispatcher(policy, logger) # 'develop' branch maps to pattern 'refs/heads/' -> log_and_ack; no external script needed success, msg = dispatcher.dispatch( "push", {"ref": "refs/heads/develop", "repository": {"full_name": "AnyOrg/anyrepo"}}, "deliv-1", ) assert success is True # log_and_ack succeeded assert msg # non-empty message def test_dispatcher_denied_by_policy(): policy = make_policy_with_dispatch() # Override repo pattern to accept any repo so that policy validation passes policy.allowed_repo_patterns = [re.compile(".*")] logger = StructuredLogger(log_file=None, level="INFO") dispatcher = SafeDispatcher(policy, logger) # Issue opened is denied by dispatch rules (allowed=False) with comment "Issues disabled" success, msg = dispatcher.dispatch( "issues", {"action": "opened", "repository": {"full_name": "Any/any"}}, "deliv-2" ) assert success is False assert msg == "Issues disabled" def test_dispatcher_unknown_action(): policy = Policy(allowed_events=[], allowed_repos=[], allowed_branches=[], allowed_actions=[], dispatch_rules={"push": {}}) logger = StructuredLogger(log_file=None, level="INFO") dispatcher = SafeDispatcher(policy, logger) # We mock dispatch_rules to not have this event — will hit unknown action path policy.dispatch_rules = {"push": {"refs/heads/unknown": {"allowed": True, "action": "does_not_exist"}}} success, msg = dispatcher.dispatch("push", {"ref": "refs/heads/unknown"}, "deliv-3") # Since ref is custom, it'll try to match and not find, fall through to unknown # The code path: get_dispatch_action returns "ignore" if no matching rule # But we want to test unknown action at dispatch level pass # --------------------------------------------------------------------------- # Fixture-based integration tests for webhook payloads # --------------------------------------------------------------------------- def load_fixture(name): path = FIXTURES_DIR / name if not path.exists(): pytest.skip(f"fixture {name} not found") return json.loads(path.read_text()) @pytest.mark.parametrize("fixture_name,expected_ok,expected_reason_contains", [ ("push_main.json", True, None), ("pr_opened.json", True, None), ("issue_created.json", True, None), ("unknown_event.json", False, "event type not allowed"), ("disallowed_repo.json", False, "repository not allowed"), ("disallowed_branch.json", False, "branch not allowed"), ]) def test_policy_with_fixtures(fixture_name, expected_ok, expected_reason_contains): policy = make_policy( allowed_events=["push", "pull_request", "issues", "issue_comment"], allowed_repos=["Timmy_Foundation/*"], allowed_branches=["main", "master", "develop"], allowed_actions=["opened", "closed", "synchronize", "reopened", "created", "edited"], dispatch={ "push": {"refs/heads/": {"allowed": True, "action": "log_and_ack"}}, "pull_request": {"opened": {"allowed": True, "action": "log_and_ack"}}, "issues": {"opened": {"allowed": True, "action": "log_and_ack"}}, }, ) payload = load_fixture(fixture_name) event_type = ( payload.get("event") or _infer_event_type(payload) ) ok, reason = policy.validate_event(event_type, payload) assert ok is expected_ok if expected_reason_contains: assert expected_reason_contains in (reason or "") # --------------------------------------------------------------------------- # End-to-end: local test mode (via test_payload_file logic) # --------------------------------------------------------------------------- def test_local_test_mode_valid_push(): # This spies on the local test workflow without starting a server from webhook_runner import test_payload_file as run_test import os # Prepare environment: no secret (dev mode) payload_path = FIXTURES_DIR / "push_main.json" assert payload_path.exists() policy = make_policy( allowed_events=["push"], allowed_repos=["Timmy_Foundation/*"], allowed_branches=["main", "master", "develop"], allowed_actions=[], dispatch={"push": {"refs/heads/": {"allowed": True, "action": "log_and_ack"}}}, ) policy._secret = None # dev mode logger = StructuredLogger(log_file=None, level="INFO") idemp_store = IdempotencyStore(str(Path(tempfile.gettempdir()) / "test_idemp.json"), max_entries=100, ttl_days=1) dispatcher = SafeDispatcher(policy, logger) # Should exit 0 try: run_test(str(payload_path), None, policy, dispatcher, idemp_store, logger) except SystemExit as e: assert e.code == 0 def test_local_test_mode_disallowed_repo(): from webhook_runner import test_payload_file as run_test payload_path = FIXTURES_DIR / "disallowed_repo.json" policy = make_policy(allowed_events=["push"], allowed_repos=["Timmy_Foundation/*"], allowed_branches=["main"], dispatch={}) policy._secret = None logger = StructuredLogger(log_file=None, level="INFO") idemp_store = IdempotencyStore(str(Path(tempfile.gettempdir()) / "test_idemp2.json"), max_entries=100, ttl_days=1) dispatcher = SafeDispatcher(policy, logger) with pytest.raises(SystemExit) as exc: run_test(str(payload_path), None, policy, dispatcher, idemp_store, logger) assert exc.value.code == 1 # --------------------------------------------------------------------------- # Config loading defaults # --------------------------------------------------------------------------- def test_load_config_has_sensible_defaults(): from webhook_runner import load_config cfg = load_config() assert "webhook" in cfg wc = cfg["webhook"] assert wc["host"] == "127.0.0.1" assert wc["port"] == 7777 assert "push" in wc["allowed_events"] assert "pull_request" in wc["allowed_events"] # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- def test_policy_validates_missing_repository_field(): policy = make_policy(allowed_repos=[]) payload = {} # no repo field ok, reason = policy.validate_event("push", payload) # If no repo specified, we accept (empty allowlist = accept any if no repo field) # Actually our implementation: if allowed_repos patterns are set and repo is missing, we reject # but if repo missing (full_name="") and patterns exist, it will fail pattern match → rejected assert ok is False assert "repository not allowed" in reason.lower() or "not allowed" in reason.lower() def test_dispatch_rule_fallback_to_pattern(): policy = Policy( allowed_events=["push"], allowed_repos=[], allowed_branches=[], allowed_actions=[], dispatch_rules={ "push": { "refs/heads/": {"allowed": True, "action": "log_and_ack", "comment": "any branch"}, } }, ) payload = {"ref": "refs/heads/feature/foo", "repository": {"full_name": "T/T"}} allowed, action, comment = policy.get_dispatch_action("push", payload) assert allowed is True assert action == "log_and_ack" if __name__ == "__main__": pytest.main([__file__, "-v"])