#!/usr/bin/env python3 """ vulnerability_scanner.py — Check Python dependencies against CVE databases (Issue #108) Scans requirements.txt (or any pip-compatible dependency file) and queries the Open Source Vulnerability (OSV) database for known security issues. OSV API: https://api.osv.dev/v1/query (free, no auth, PyPI ecosystem supported) Output: - Human-readable summary on stdout - JSON report with full vulnerability details - Exit code: 0 if no vulnerabilities found, 1 if critical/high found, 2 otherwise Usage: python3 scripts/vulnerability_scanner.py python3 scripts/vulnerability_scanner.py --deps requirements.txt --output json python3 scripts/vulnerability_scanner.py --min-severity high python3 scripts/vulnerability_scanner.py --deps requirements.txt --report-format markdown """ import argparse import json import os import re import sys import urllib.request import urllib.error from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional, Tuple # --- Configuration --- OSV_API_URL = "https://api.osv.dev/v1/query" DEFAULT_REQUIREMENTS_PATH = "requirements.txt" SEVERITY_LEVELS = ["critical", "high", "medium", "low", "unknown"] # Map OSV severities to our buckets CVSS_SEVERITY_MAP = { "CRITICAL": "critical", "HIGH": "high", "MEDIUM": "medium", "LOW": "low", "NONE": "none", } # --- Data Structures --- @dataclass class Vulnerability: """A single vulnerability finding.""" package: str version: str vuln_id: str severity: str cvss_score: Optional[float] summary: str details_url: str fixed_versions: List[str] @dataclass class ScanResult: """Results from a vulnerability scan.""" scanned_packages: int vulnerabilities: List[Vulnerability] errors: List[Tuple[str, str]] # (package, error_message) # --- Requirement Parsing --- def parse_requirements_file(path: str) -> Dict[str, str]: """ Parse a requirements.txt file into {package_name: version_spec}. Handles: - pkg==1.2.3 - pkg>=1.0.0 - pkg[extra]==1.2.3 - -e/--editable entries (skipped) - -r inclusions (recursive, limited depth) - comments and blank lines """ packages = {} processed_includes = set() def parse_line(line: str, filename: str, depth: int = 0) -> None: if depth > 3: print(f"WARNING: Max include depth exceeded in {filename}", file=sys.stderr) return line = line.strip() if not line or line.startswith('#'): return # Handle -r or --requirement includes if line.startswith('-r ') or line.startswith('--requirement '): if depth >= 3: return include_path = line.split(None, 1)[1].strip() # Resolve relative to current file's directory base_dir = os.path.dirname(os.path.abspath(filename)) full_path = os.path.join(base_dir, include_path) if full_path not in processed_includes: processed_includes.add(full_path) try: with open(full_path, 'r', encoding='utf-8') as f: for incl_line in f: parse_line(incl_line, full_path, depth + 1) except FileNotFoundError: print(f"WARNING: Could not read included file: {full_path}", file=sys.stderr) return # Skip editable installs and other flags if line.startswith('-e ') or line.startswith('--editable ') or line.startswith('-'): return # Extract package name and version spec # Handles: pkg==1.2.3, pkg>=1.0, pkg[extra]==1.2.3, pkg ~= 1.0 match = re.match( r'^([a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?)(\s*[[,{])?.*?((==|>=|<=|~=|!=|===)\s*([^\s;#]+))?', line ) if not match: # Try simpler: name at start before any comparison simple = re.match(r'^([a-zA-Z0-9][-a-zA-Z0-9_.]*)', line) if simple: pkg = simple.group(1).lower() packages[pkg] = "" return pkg_name = match.group(1).lower() # Strip extras like django[argon2] -> django pkg_name = re.sub(r'\[.*?\]', '', pkg_name).strip() version = "" if match.group(5): # comparison operator + version version = match.group(5) + match.group(6) packages[pkg_name] = version # Read and parse the file try: with open(path, 'r', encoding='utf-8') as f: for line in f: parse_line(line, path, 0) except FileNotFoundError: print(f"ERROR: Requirements file not found: {path}", file=sys.stderr) sys.exit(1) return packages # --- OSV API Queries --- def query_osv(package: str, version: str) -> List[dict]: """ Query the OSV API for vulnerabilities affecting a specific package version. Returns list of vulnerability dicts (raw API response) or empty list on error. """ # Normalize version spec for OSV query # OSV expects a specific version, not a range. We query for the exact version # if available, otherwise we query without version to get all vulns for the package # and let the caller filter. query_version = version if re.match(r'^[0-9]', version) else None payload = { "package": { "name": package, "ecosystem": "PyPI" } } if query_version: payload["version"] = query_version data = json.dumps(payload).encode('utf-8') req = urllib.request.Request( OSV_API_URL, data=data, headers={'Content-Type': 'application/json'}, method='POST' ) try: with urllib.request.urlopen(req, timeout=15) as response: result = json.loads(response.read().decode('utf-8')) return result.get('vulns', []) + result.get('vulnerabilities', []) except urllib.error.HTTPError as e: if e.code == 404: return [] # No vulnerabilities found print(f"WARNING: OSV query failed for {package}: HTTP {e.code}", file=sys.stderr) except (urllib.error.URLError, json.JSONDecodeError, TimeoutError) as e: print(f"WARNING: OSV query failed for {package}: {e}", file=sys.stderr) return [] def parse_osv_vuln(raw_vulns: List[dict], package: str, version_spec: str) -> List[Vulnerability]: """ Parse raw OSV API responses into Vulnerability objects. """ vulns = [] for v in raw_vulns: vuln_id = v.get('id', 'UNKNOWN') summary = v.get('summary', 'No summary provided.') # Severity from CVSS or ecosystem-specific severity = "unknown" cvss_score = None if 'severity' in v: for sev_info in v['severity']: if sev_info.get('type') == 'CVSS_V3': score = sev_info.get('score', '') if isinstance(score, dict): cvss_score = score.get('baseScore') sev_str = score.get('baseSeverity', '').upper() severity = CVSS_SEVERITY_MAP.get(sev_str, 'unknown') break elif sev_info.get('type') == 'CVSS_V2': # Fallback score = sev_info.get('score', '') if isinstance(score, dict): cvss_score = score.get('baseScore') sev_str = sev_info.get('type', '').upper() severity = "unknown" # Affected packages/ranges affected = v.get('affected', []) fixed_versions = [] for aff in affected: for r in aff.get('ranges', []): for event in r.get('events', []): if event.get('introduced'): # We have the version, fixed would be in 'fixed' events pass if event.get('fixed'): fixed_versions.append(event['fixed']) # Build details URL details_url = f"https://osv.dev/vulnerability/{vuln_id}" vuln = Vulnerability( package=package, version=version_spec, vuln_id=vuln_id, severity=severity, cvss_score=cvss_score, summary=summary, details_url=details_url, fixed_versions=list(set(fixed_versions)) ) vulns.append(vuln) return vulns # --- Filtering & Reporting --- def filter_by_severity(vulns: List[Vulnerability], min_severity: str) -> List[Vulnerability]: """Filter vulnerabilities to include only those at or above the given severity.""" if min_severity.lower() not in SEVERITY_LEVELS: return vulns # No filtering if invalid min_idx = SEVERITY_LEVELS.index(min_severity.lower()) filtered = [] for v in vulns: sev_idx = SEVERITY_LEVELS.index(v.severity.lower()) if sev_idx <= min_idx: # lower index = more severe filtered.append(v) return filtered def generate_text_report(result: ScanResult, packages: Dict[str, str]) -> str: """Generate human-readable text report.""" lines = [] lines.append("=" * 60) lines.append("Vulnerability Scan Report") lines.append("=" * 60) lines.append(f"Packages scanned: {result.scanned_packages}") lines.append(f"Vulnerabilities found: {len(result.vulnerabilities)}") if result.errors: lines.append(f"Errors: {len(result.errors)}") # Group by severity by_severity: Dict[str, List[Vulnerability]] = {} for v in result.vulnerabilities: by_severity.setdefault(v.severity.upper(), []).append(v) for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"]: vuln_list = by_severity.get(sev, []) if vuln_list: lines.append(f"\n{sev}: {len(vuln_list)}") for v in vuln_list: lines.append(f" [{v.package} {packages.get(v.package, '')}] {v.vuln_id}") lines.append(f" {v.summary[:80]}") if v.cvss_score: lines.append(f" CVSS: {v.cvss_score}") if v.fixed_versions: lines.append(f" Fixed in: {', '.join(v.fixed_versions[:3])}") lines.append(f" {v.details_url}") if result.errors: lines.append("\nERRORS:") for pkg, err in result.errors[:10]: lines.append(f" {pkg}: {err}") lines.append("\n" + "=" * 60) return "\n".join(lines) def generate_json_report(result: ScanResult, packages: Dict[str, str]) -> str: """Generate JSON report.""" report = { "scanned_packages": result.scanned_packages, "vulnerabilities": [ { "package": v.package, "version_spec": packages.get(v.package, v.version), "vulnerability_id": v.vuln_id, "severity": v.severity, "cvss_score": v.cvss_score, "summary": v.summary, "details_url": v.details_url, "fixed_versions": v.fixed_versions, } for v in result.vulnerabilities ], "errors": [{"package": p, "error": e} for p, e in result.errors], } return json.dumps(report, indent=2) # --- Main Orchestration --- def run_scan( deps_path: str, min_severity: str = "low", query_osv_api: bool = True ) -> ScanResult: """ Execute the full vulnerability scan pipeline. Args: deps_path: Path to requirements-style file min_severity: Minimum severity to include in results query_osv_api: If False, skip API calls (for testing/dry-run) Returns: ScanResult with all findings """ # 1. Parse dependencies packages = parse_requirements_file(deps_path) if not packages: return ScanResult(scanned_packages=0, vulnerabilities=[], errors=[]) # 2. Query OSV for each package vulnerabilities: List[Vulnerability] = [] errors: List[Tuple[str, str]] = [] for pkg, version_spec in packages.items(): if not query_osv_api: continue raw_vulns = query_osv(pkg, version_spec or "") if raw_vulns: parsed = parse_osv_vuln(raw_vulns, pkg, version_spec or "") vulnerabilities.extend(parsed) # 3. Filter by severity filtered = filter_by_severity(vulnerabilities, min_severity) # 4. Build result return ScanResult( scanned_packages=len(packages), vulnerabilities=filtered, errors=errors ) def main() -> int: parser = argparse.ArgumentParser( description="Scan Python dependencies for known vulnerabilities using OSV database" ) parser.add_argument( '--deps', '-d', default=DEFAULT_REQUIREMENTS_PATH, help='Path to requirements.txt (default: requirements.txt)' ) parser.add_argument( '--output', '-o', choices=['text', 'json', 'markdown'], default='text', help='Output format (default: text)' ) parser.add_argument( '--min-severity', default='low', choices=SEVERITY_LEVELS, help='Minimum severity to report (default: low — report all)' ) parser.add_argument( '--json', action='store_true', help='Output JSON (shorthand for --output json)' ) parser.add_argument( '--quiet', '-q', action='store_true', help='Only print summary, skip detailed vulnerability list' ) args = parser.parse_args() # Update output if --json flag is used if args.json: args.output = 'json' # Run the scan result = run_scan(args.deps, args.min_severity, query_osv_api=True) # Output if args.output == 'json': print(generate_json_report(result, parse_requirements_file(args.deps))) elif args.output == 'markdown': # Simple markdown table print("# Vulnerability Scan Report\n") print(f"**Packages scanned:** {result.scanned_packages}") print(f"**Vulnerabilities:** {len(result.vulnerabilities)}\n") if result.vulnerabilities: print("| Severity | Package | Version | Vuln ID | Summary |") print("|----------|---------|---------|---------|---------|") for v in result.vulnerabilities: print(f"| {v.severity.upper()} | {v.package} | {v.version} | [{v.vuln_id}]({v.details_url}) | {v.summary[:50]} |") print("\n") else: # text (default) if not args.quiet: print(generate_text_report(result, parse_requirements_file(args.deps))) else: crit = sum(1 for v in result.vulnerabilities if v.severity == 'critical') high = sum(1 for v in result.vulnerabilities if v.severity == 'high') med = sum(1 for v in result.vulnerabilities if v.severity == 'medium') print(f"CRITICAL={crit} HIGH={high} MEDIUM={med} TOTAL={len(result.vulnerabilities)}") # Exit code logic: 0 if no vulns at min_severity+, 1 if critical/high found, 2 for other vulns has_critical_high = any(v.severity in ('critical', 'high') for v in result.vulnerabilities) has_other = any(v.severity not in ('critical', 'high') for v in result.vulnerabilities) if has_critical_high: return 1 elif has_other: return 2 return 0 if __name__ == '__main__': sys.exit(main())