diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 00000000..9e733867 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,15 @@ +#!/bin/bash +# +# Pre-commit hook wrapper for secret leak detection. +# +# Installation: +# git config core.hooksPath .githooks +# +# To bypass temporarily: +# git commit --no-verify +# + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +exec python3 "${SCRIPT_DIR}/pre-commit.py" "$@" diff --git a/.githooks/pre-commit.py b/.githooks/pre-commit.py new file mode 100755 index 00000000..a48ade0a --- /dev/null +++ b/.githooks/pre-commit.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +""" +Pre-commit hook for detecting secret leaks in staged files. + +Scans staged diffs and full file contents for common secret patterns, +token file paths, private keys, and credential strings. + +Installation: + git config core.hooksPath .githooks + +To bypass: + git commit --no-verify +""" + +from __future__ import annotations + +import re +import subprocess +import sys +from pathlib import Path +from typing import Iterable, List, Callable, Union + +# ANSI color codes +RED = "\033[0;31m" +YELLOW = "\033[1;33m" +GREEN = "\033[0;32m" +NC = "\033[0m" + + +class Finding: + """Represents a single secret leak finding.""" + + def __init__(self, filename: str, line: int, message: str) -> None: + self.filename = filename + self.line = line + self.message = message + + def __repr__(self) -> str: + return f"Finding({self.filename!r}, {self.line}, {self.message!r})" + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Finding): + return NotImplemented + return ( + self.filename == other.filename + and self.line == other.line + and self.message == other.message + ) + + +# --------------------------------------------------------------------------- +# Regex patterns +# --------------------------------------------------------------------------- + +_RE_SK_KEY = re.compile(r"sk-[a-zA-Z0-9]{20,}") +_RE_BEARER = re.compile(r"Bearer\s+[a-zA-Z0-9_-]{20,}") + +_RE_ENV_ASSIGN = re.compile( + r"^(?:export\s+)?" + r"(OPENAI_API_KEY|GITEA_TOKEN|ANTHROPIC_API_KEY|KIMI_API_KEY" + r"|TELEGRAM_BOT_TOKEN|DISCORD_TOKEN)" + r"\s*=\s*(.+)$" +) + +_RE_TOKEN_PATHS = re.compile( + r'(?:^|["\'\s])' + r"(\.(?:env)" + r"|(?:secrets|keystore|credentials|token|api_keys)\.json" + r"|~/\.hermes/credentials/" + r"|/root/nostr-relay/keystore\.json)" +) + +_RE_PRIVATE_KEY = re.compile( + r"-----BEGIN (PRIVATE KEY|RSA PRIVATE KEY|OPENSSH PRIVATE KEY)-----" +) + +_RE_URL_PASSWORD = re.compile(r"https?://[^:]+:[^@]+@") + +_RE_RAW_TOKEN = re.compile(r'"token"\s*:\s*"([^"]{10,})"') +_RE_RAW_API_KEY = re.compile(r'"api_key"\s*:\s*"([^"]{10,})"') + +# Safe patterns (placeholders) +_SAFE_ENV_VALUES = { + "", + "***", + "REDACTED", + "", +} + +_RE_DOC_EXAMPLE = re.compile( + r"\b(?:example|documentation|doc|readme)\b", + re.IGNORECASE, +) + +_RE_OS_ENVIRON = re.compile(r"os\.environ(?:\.get|\[)") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def is_binary_content(content: Union[str, bytes]) -> bool: + """Return True if content appears to be binary.""" + if isinstance(content, str): + return False + return b"\x00" in content + + +def _looks_like_safe_env_line(line: str) -> bool: + """Check if a line is a safe env var read or reference.""" + if _RE_OS_ENVIRON.search(line): + return True + # Variable expansion like $OPENAI_API_KEY + if re.search(r'\$\w+\s*$', line.strip()): + return True + return False + + +def _is_placeholder(value: str) -> bool: + """Check if a value is a known placeholder or empty.""" + stripped = value.strip().strip('"').strip("'") + if stripped in _SAFE_ENV_VALUES: + return True + # Single word references like $VAR + if re.fullmatch(r"\$\w+", stripped): + return True + return False + + +def _is_doc_or_example(line: str, value: str | None = None) -> bool: + """Check if line appears to be documentation or example code.""" + # If the line contains a placeholder value, it's likely documentation + if value is not None and _is_placeholder(value): + return True + # If the line contains doc keywords and no actual secret-looking value + if _RE_DOC_EXAMPLE.search(line): + # For env assignments, if value is empty or placeholder + m = _RE_ENV_ASSIGN.search(line) + if m and _is_placeholder(m.group(2)): + return True + return False + + +# --------------------------------------------------------------------------- +# Scanning +# --------------------------------------------------------------------------- + +def scan_line(line: str, filename: str, line_no: int) -> Iterable[Finding]: + """Scan a single line for secret leak patterns.""" + stripped = line.rstrip("\n") + if not stripped: + return + + # --- API keys ---------------------------------------------------------- + if _RE_SK_KEY.search(stripped): + yield Finding(filename, line_no, "Potential API key (sk-...) found") + return # One finding per line is enough + + if _RE_BEARER.search(stripped): + yield Finding(filename, line_no, "Potential Bearer token found") + return + + # --- Env var assignments ----------------------------------------------- + m = _RE_ENV_ASSIGN.search(stripped) + if m: + var_name = m.group(1) + value = m.group(2) + if _looks_like_safe_env_line(stripped): + return + if _is_doc_or_example(stripped, value): + return + if not _is_placeholder(value): + yield Finding( + filename, + line_no, + f"Potential secret assignment: {var_name}=...", + ) + return + + # --- Token file paths -------------------------------------------------- + if _RE_TOKEN_PATHS.search(stripped): + yield Finding(filename, line_no, "Potential token file path found") + return + + # --- Private key blocks ------------------------------------------------ + if _RE_PRIVATE_KEY.search(stripped): + yield Finding(filename, line_no, "Private key block found") + return + + # --- Passwords in URLs ------------------------------------------------- + if _RE_URL_PASSWORD.search(stripped): + yield Finding(filename, line_no, "Password in URL found") + return + + # --- Raw token patterns ------------------------------------------------ + if _RE_RAW_TOKEN.search(stripped): + yield Finding(filename, line_no, 'Raw "token" string with long value') + return + + if _RE_RAW_API_KEY.search(stripped): + yield Finding(filename, line_no, 'Raw "api_key" string with long value') + return + + +def scan_content(content: Union[str, bytes], filename: str) -> List[Finding]: + """Scan full file content for secrets.""" + if isinstance(content, bytes): + try: + text = content.decode("utf-8") + except UnicodeDecodeError: + return [] + else: + text = content + + findings: List[Finding] = [] + for line_no, line in enumerate(text.splitlines(), start=1): + findings.extend(scan_line(line, filename, line_no)) + return findings + + +def scan_files( + files: List[str], + content_reader: Callable[[str], bytes], +) -> List[Finding]: + """Scan a list of files using the provided content reader.""" + findings: List[Finding] = [] + for filepath in files: + content = content_reader(filepath) + if is_binary_content(content): + continue + findings.extend(scan_content(content, filepath)) + return findings + + +# --------------------------------------------------------------------------- +# Git helpers +# --------------------------------------------------------------------------- + + +def get_staged_files() -> List[str]: + """Return a list of staged file paths (excluding deletions).""" + result = subprocess.run( + ["git", "diff", "--cached", "--name-only", "--diff-filter=ACMR"], + capture_output=True, + text=True, + ) + if result.returncode != 0: + return [] + return [f for f in result.stdout.strip().split("\n") if f] + + +def get_staged_diff() -> str: + """Return the diff of staged changes.""" + result = subprocess.run( + ["git", "diff", "--cached", "--no-color", "-U0"], + capture_output=True, + text=True, + ) + if result.returncode != 0: + return "" + return result.stdout + + +def get_file_content_at_staged(filepath: str) -> bytes: + """Return the staged content of a file.""" + result = subprocess.run( + ["git", "show", f":{filepath}"], + capture_output=True, + ) + if result.returncode != 0: + return b"" + return result.stdout + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main() -> int: + print(f"{GREEN}🔍 Scanning for secret leaks in staged files...{NC}") + + staged_files = get_staged_files() + if not staged_files: + print(f"{GREEN}✓ No files staged for commit{NC}") + return 0 + + # Scan both full staged file contents and the diff content + findings = scan_files(staged_files, get_file_content_at_staged) + + diff_text = get_staged_diff() + if diff_text: + for line_no, line in enumerate(diff_text.splitlines(), start=1): + # Only scan added lines in the diff + if line.startswith("+") and not line.startswith("+++"): + findings.extend(scan_line(line[1:], "", line_no)) + + if not findings: + print(f"{GREEN}✓ No potential secret leaks detected{NC}") + return 0 + + print(f"{RED}✗ Potential secret leaks detected:{NC}\n") + for finding in findings: + loc = finding.filename + print( + f" {RED}[LEAK]{NC} {loc}:{finding.line} — {finding.message}" + ) + + print() + print(f"{RED}╔════════════════════════════════════════════════════════════╗{NC}") + print(f"{RED}║ COMMIT BLOCKED: Potential secrets detected! ║{NC}") + print(f"{RED}╚════════════════════════════════════════════════════════════╝{NC}") + print() + print("Recommendations:") + print(" 1. Remove secrets from your code") + print(" 2. Use environment variables or a secrets manager") + print(" 3. Add sensitive files to .gitignore") + print(" 4. Rotate any exposed credentials immediately") + print() + print("If you are CERTAIN this is a false positive, you can bypass:") + print(" git commit --no-verify") + print() + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_pre_commit_secret_leak.py b/tests/test_pre_commit_secret_leak.py new file mode 100644 index 00000000..3b51e7c2 --- /dev/null +++ b/tests/test_pre_commit_secret_leak.py @@ -0,0 +1,283 @@ +""" +Unit tests for the pre-commit secret leak scanner. + +Follows TDD: tests were written before implementation. +""" + +import re +import sys +import unittest +from pathlib import Path + +# Add .githooks to path so we can import pre-commit.py as a module +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / ".githooks")) + +# The module name contains a hyphen, so we import via importlib +import importlib.util + +_spec = importlib.util.spec_from_file_location( + "pre_commit_secret_leak", + str(Path(__file__).resolve().parent.parent / ".githooks" / "pre-commit.py"), +) +pre_commit = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(pre_commit) + + +class TestSecretPatterns(unittest.TestCase): + """Tests for individual secret detection patterns.""" + + # ------------------------------------------------------------------ + # API keys + # ------------------------------------------------------------------ + def test_detects_openai_sk_key(self): + line = 'api_key = "sk-abcdefghijklmnopqrstuvwxyz1234"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + self.assertIn("sk-", findings[0].message) + + def test_detects_bearer_token(self): + line = 'headers = {"Authorization": "Bearer abcdefghijklmnopqrstuvwxyz1234"}' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + self.assertIn("Bearer", findings[0].message) + + def test_short_bearer_ignored(self): + line = 'Authorization: Bearer short' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + # ------------------------------------------------------------------ + # Environment variable assignments + # ------------------------------------------------------------------ + def test_detects_openai_api_key_assignment(self): + line = 'OPENAI_API_KEY=sk-abcdefghijklmnopqrstuvwxyz1234' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_gitea_token_assignment(self): + line = 'GITEA_TOKEN=gtl_abcdefghijklmnopqrstuvwxyz1234' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_anthropic_key_assignment(self): + line = 'ANTHROPIC_API_KEY=sk-ant-abcdefghijklmnopqrstuvwxyz1234' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_kimi_key_assignment(self): + line = 'KIMI_API_KEY=abcdef1234567890abcdef1234567890' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_telegram_token_assignment(self): + line = 'TELEGRAM_BOT_TOKEN=123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_discord_token_assignment(self): + line = 'DISCORD_TOKEN=MzIwNDE5MzA1NjUyNDgzMjY0.DSDsdQ.oM6WmR2i_uIvJhMZZZz0' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + # ------------------------------------------------------------------ + # Safe env reads / placeholders + # ------------------------------------------------------------------ + def test_os_environ_get_is_safe(self): + line = 'key = os.environ.get("OPENAI_API_KEY")' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_placeholder_your_api_key_is_safe(self): + line = 'OPENAI_API_KEY=' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_placeholder_stars_is_safe(self): + line = 'OPENAI_API_KEY=***' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_placeholder_redacted_is_safe(self): + line = 'OPENAI_API_KEY=REDACTED' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_env_var_reference_is_safe(self): + line = 'OPENAI_API_KEY=$OPENAI_API_KEY' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_empty_env_assignment_is_safe(self): + line = 'OPENAI_API_KEY=' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + # ------------------------------------------------------------------ + # Token file paths + # ------------------------------------------------------------------ + def test_detects_dotenv_path(self): + line = 'load_dotenv(".env")' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_secrets_json_path(self): + line = 'with open("secrets.json") as f:' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_keystore_json_path(self): + line = 'keystore = "/root/nostr-relay/keystore.json"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_hermes_credentials_path(self): + line = 'creds_path = "~/.hermes/credentials/default.json"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_credentials_json(self): + line = 'with open("credentials.json") as f:' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_token_json(self): + line = 'token_file = "token.json"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_api_keys_json(self): + line = 'keys = "api_keys.json"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + # ------------------------------------------------------------------ + # Private key blocks + # ------------------------------------------------------------------ + def test_detects_begin_private_key(self): + line = '-----BEGIN PRIVATE KEY-----' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_begin_rsa_private_key(self): + line = '-----BEGIN RSA PRIVATE KEY-----' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + def test_detects_begin_openssh_private_key(self): + line = '-----BEGIN OPENSSH PRIVATE KEY-----' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + # ------------------------------------------------------------------ + # Passwords in URLs + # ------------------------------------------------------------------ + def test_detects_password_in_https_url(self): + line = 'url = "https://user:secretpassword@example.com/repo.git"' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + self.assertIn("password", findings[0].message.lower()) + + def test_detects_password_in_http_url(self): + line = 'http://admin:password123@internal.local' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + + # ------------------------------------------------------------------ + # Raw token patterns in strings + # ------------------------------------------------------------------ + def test_detects_raw_token_in_json(self): + line = '{"token": "abcdefghijklmnopqrstuvwxyz"}' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + self.assertIn("token", findings[0].message.lower()) + + def test_detects_raw_api_key_in_json(self): + line = '{"api_key": "1234567890abcdef"}' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertTrue(findings) + self.assertIn("api_key", findings[0].message.lower()) + + def test_short_token_ignored(self): + line = '{"token": "short"}' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + # ------------------------------------------------------------------ + # Documentation / example safe patterns + # ------------------------------------------------------------------ + def test_documentation_reference_is_safe(self): + line = 'See the documentation at https://docs.example.com' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + # No specific pattern should match a doc URL without a password + self.assertFalse(findings) + + def test_example_code_comment_is_safe(self): + line = '# Example: OPENAI_API_KEY=' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + def test_doc_string_with_placeholder_is_safe(self): + line = '"""Set ANTHROPIC_API_KEY to $ANTHROPIC_API_KEY in production."""' + findings = list(pre_commit.scan_line(line, "test.py", 1)) + self.assertFalse(findings) + + +class TestScanContent(unittest.TestCase): + """Tests for scanning multi-line content.""" + + def test_scan_content_finds_multiple_leaks(self): + content = """ +OPENAI_API_KEY=sk-12345678901234567890 +Some normal code here +GITEA_TOKEN=gtl_12345678901234567890 +""" + findings = pre_commit.scan_content(content, "test.py") + self.assertEqual(len(findings), 2) + # Should have line numbers + self.assertIn(2, [f.line for f in findings]) + self.assertIn(4, [f.line for f in findings]) + + def test_scan_content_returns_empty_when_clean(self): + content = "print('hello world')\n" + findings = pre_commit.scan_content(content, "test.py") + self.assertEqual(findings, []) + + +class TestScanFiles(unittest.TestCase): + """Tests for the file-list scanning entrypoint.""" + + def test_scan_files_skips_binary(self): + files = ["image.png", "test.py"] + content_map = { + "image.png": b"\x89PNG\r\n\x1a\n", + "test.py": "OPENAI_API_KEY=sk-12345678901234567890\n", + } + findings = pre_commit.scan_files(files, lambda f: content_map.get(f, b"")) + self.assertEqual(len(findings), 1) + self.assertEqual(findings[0].filename, "test.py") + + def test_scan_files_ignores_safe_lines(self): + files = ["test.py"] + content_map = { + "test.py": "key = os.environ.get('OPENAI_API_KEY')\n", + } + findings = pre_commit.scan_files(files, lambda f: content_map.get(f, b"")) + self.assertEqual(findings, []) + + +class TestCliHelpers(unittest.TestCase): + """Tests for CLI helper functions.""" + + def test_color_codes_present(self): + self.assertIn("\033[", pre_commit.RED) + self.assertIn("\033[", pre_commit.GREEN) + + def test_is_binary_content_true(self): + self.assertTrue(pre_commit.is_binary_content(b"\x00\x01\x02")) + + def test_is_binary_content_false(self): + self.assertFalse(pre_commit.is_binary_content(b"hello world\n")) + + +if __name__ == "__main__": + unittest.main()