fix(security): add SSRF protection to vision_tools and web_tools (hardened)

* fix(security): add SSRF protection to vision_tools and web_tools

Both vision_analyze and web_extract/web_crawl accept arbitrary URLs
without checking if they target private/internal network addresses.
A prompt-injected or malicious skill could use this to access cloud
metadata endpoints (169.254.169.254), localhost services, or private
network hosts.

Adds a shared url_safety.is_safe_url() that resolves hostnames and
blocks private, loopback, link-local, and reserved IP ranges. Also
blocks known internal hostnames (metadata.google.internal).

Integrated at the URL validation layer in vision_tools and before
each website_policy check in web_tools (extract, crawl).

* test(vision): update localhost test to reflect SSRF protection

The existing test_valid_url_with_port asserted localhost URLs pass
validation. With SSRF protection, localhost is now correctly blocked.
Update the test to verify the block, and add a separate test for
valid URLs with ports using a public hostname.

* fix(security): harden SSRF protection — fail-closed, CGNAT, multicast, redirect guard

Follow-up hardening on top of dieutx's SSRF protection (PR #2630):

- Change fail-open to fail-closed: DNS errors and unexpected exceptions
  now block the request instead of allowing it (OWASP best practice)
- Block CGNAT range (100.64.0.0/10): Python's ipaddress.is_private
  does NOT cover this range (returns False for both is_private and
  is_global). Used by Tailscale/WireGuard and carrier infrastructure.
- Add is_multicast and is_unspecified checks: multicast (224.0.0.0/4)
  and unspecified (0.0.0.0) addresses were not caught by the original
  four-check chain
- Add redirect guard for vision_tools: httpx event hook re-validates
  each redirect target against SSRF checks, preventing the classic
  redirect-based SSRF bypass (302 to internal IP)
- Move SSRF filtering before backend dispatch in web_extract: now
  covers Parallel and Tavily backends, not just Firecrawl
- Extract _is_blocked_ip() helper for cleaner IP range checking
- Add 24 new tests (CGNAT, multicast, IPv4-mapped IPv6, fail-closed
  behavior, parametrized blocked/allowed IP lists)
- Fix existing tests to mock DNS resolution for test hostnames

---------

Co-authored-by: dieutx <dangtc94@gmail.com>
This commit is contained in:
Teknium
2026-03-23 15:40:42 -07:00
committed by GitHub
parent 934fbe3c06
commit 0791efe2c3
6 changed files with 472 additions and 124 deletions

View File

@@ -0,0 +1,176 @@
"""Tests for SSRF protection in url_safety module."""
import socket
from unittest.mock import patch
from tools.url_safety import is_safe_url, _is_blocked_ip
import ipaddress
import pytest
class TestIsSafeUrl:
def test_public_url_allowed(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert is_safe_url("https://example.com/image.png") is True
def test_localhost_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("127.0.0.1", 0)),
]):
assert is_safe_url("http://localhost:8080/secret") is False
def test_loopback_ip_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("127.0.0.1", 0)),
]):
assert is_safe_url("http://127.0.0.1/admin") is False
def test_private_10_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("10.0.0.1", 0)),
]):
assert is_safe_url("http://internal-service.local/api") is False
def test_private_172_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("172.16.0.1", 0)),
]):
assert is_safe_url("http://private.corp/data") is False
def test_private_192_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("192.168.1.1", 0)),
]):
assert is_safe_url("http://router.local") is False
def test_link_local_169_254_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("169.254.169.254", 0)),
]):
assert is_safe_url("http://169.254.169.254/latest/meta-data/") is False
def test_metadata_google_internal_blocked(self):
assert is_safe_url("http://metadata.google.internal/computeMetadata/v1/") is False
def test_ipv6_loopback_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(10, 1, 6, "", ("::1", 0, 0, 0)),
]):
assert is_safe_url("http://[::1]:8080/") is False
def test_dns_failure_blocked(self):
"""DNS failures now fail closed — block the request."""
with patch("socket.getaddrinfo", side_effect=socket.gaierror("Name resolution failed")):
assert is_safe_url("https://nonexistent.example.com") is False
def test_empty_url_blocked(self):
assert is_safe_url("") is False
def test_no_hostname_blocked(self):
assert is_safe_url("http://") is False
def test_public_ip_allowed(self):
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert is_safe_url("https://example.com") is True
# ── New tests for hardened SSRF protection ──
def test_cgnat_100_64_blocked(self):
"""100.64.0.0/10 (CGNAT/Shared Address Space) is NOT covered by
ipaddress.is_private — must be blocked explicitly."""
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("100.64.0.1", 0)),
]):
assert is_safe_url("http://some-cgnat-host.example/") is False
def test_cgnat_100_127_blocked(self):
"""Upper end of CGNAT range (100.127.255.255)."""
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("100.127.255.254", 0)),
]):
assert is_safe_url("http://tailscale-peer.example/") is False
def test_multicast_blocked(self):
"""Multicast addresses (224.0.0.0/4) not caught by is_private."""
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("224.0.0.251", 0)),
]):
assert is_safe_url("http://mdns-host.local/") is False
def test_multicast_ipv6_blocked(self):
with patch("socket.getaddrinfo", return_value=[
(10, 1, 6, "", ("ff02::1", 0, 0, 0)),
]):
assert is_safe_url("http://[ff02::1]/") is False
def test_ipv4_mapped_ipv6_loopback_blocked(self):
"""::ffff:127.0.0.1 — IPv4-mapped IPv6 loopback."""
with patch("socket.getaddrinfo", return_value=[
(10, 1, 6, "", ("::ffff:127.0.0.1", 0, 0, 0)),
]):
assert is_safe_url("http://[::ffff:127.0.0.1]/") is False
def test_ipv4_mapped_ipv6_metadata_blocked(self):
"""::ffff:169.254.169.254 — IPv4-mapped IPv6 cloud metadata."""
with patch("socket.getaddrinfo", return_value=[
(10, 1, 6, "", ("::ffff:169.254.169.254", 0, 0, 0)),
]):
assert is_safe_url("http://[::ffff:169.254.169.254]/") is False
def test_unspecified_address_blocked(self):
"""0.0.0.0 — unspecified address, can bind to all interfaces."""
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("0.0.0.0", 0)),
]):
assert is_safe_url("http://0.0.0.0/") is False
def test_unexpected_error_fails_closed(self):
"""Unexpected exceptions should block, not allow."""
with patch("tools.url_safety.urlparse", side_effect=ValueError("bad url")):
assert is_safe_url("http://evil.com/") is False
def test_metadata_goog_blocked(self):
assert is_safe_url("http://metadata.goog/computeMetadata/v1/") is False
def test_ipv6_unique_local_blocked(self):
"""fc00::/7 — IPv6 unique local addresses."""
with patch("socket.getaddrinfo", return_value=[
(10, 1, 6, "", ("fd12::1", 0, 0, 0)),
]):
assert is_safe_url("http://[fd12::1]/internal") is False
def test_non_cgnat_100_allowed(self):
"""100.0.0.1 is NOT in CGNAT range (100.64.0.0/10), should be allowed."""
with patch("socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("100.0.0.1", 0)),
]):
# 100.0.0.1 is a global IP, not in CGNAT range
assert is_safe_url("http://legit-host.example/") is True
class TestIsBlockedIp:
"""Direct tests for the _is_blocked_ip helper."""
@pytest.mark.parametrize("ip_str", [
"127.0.0.1", "10.0.0.1", "172.16.0.1", "192.168.1.1",
"169.254.169.254", "0.0.0.0", "224.0.0.1", "255.255.255.255",
"100.64.0.1", "100.100.100.100", "100.127.255.254",
"::1", "fe80::1", "fc00::1", "fd12::1", "ff02::1",
"::ffff:127.0.0.1", "::ffff:169.254.169.254",
])
def test_blocked_ips(self, ip_str):
ip = ipaddress.ip_address(ip_str)
assert _is_blocked_ip(ip) is True, f"{ip_str} should be blocked"
@pytest.mark.parametrize("ip_str", [
"8.8.8.8", "93.184.216.34", "1.1.1.1", "100.0.0.1",
"2606:4700::1", "2001:4860:4860::8888",
])
def test_allowed_ips(self, ip_str):
ip = ipaddress.ip_address(ip_str)
assert _is_blocked_ip(ip) is False, f"{ip_str} should be allowed"

View File

@@ -33,17 +33,30 @@ class TestValidateImageUrl:
assert _validate_image_url("https://example.com/image.jpg") is True
def test_valid_http_url(self):
assert _validate_image_url("http://cdn.example.org/photo.png") is True
with patch("tools.url_safety.socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert _validate_image_url("http://cdn.example.org/photo.png") is True
def test_valid_url_without_extension(self):
"""CDN endpoints that redirect to images should still pass."""
assert _validate_image_url("https://cdn.example.com/abcdef123") is True
with patch("tools.url_safety.socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert _validate_image_url("https://cdn.example.com/abcdef123") is True
def test_valid_url_with_query_params(self):
assert _validate_image_url("https://img.example.com/pic?w=200&h=200") is True
with patch("tools.url_safety.socket.getaddrinfo", return_value=[
(2, 1, 6, "", ("93.184.216.34", 0)),
]):
assert _validate_image_url("https://img.example.com/pic?w=200&h=200") is True
def test_localhost_url_blocked_by_ssrf(self):
"""localhost URLs are now blocked by SSRF protection."""
assert _validate_image_url("http://localhost:8080/image.png") is False
def test_valid_url_with_port(self):
assert _validate_image_url("http://localhost:8080/image.png") is True
assert _validate_image_url("http://example.com:8080/image.png") is True
def test_valid_url_with_path_only(self):
assert _validate_image_url("https://example.com/") is True

View File

@@ -343,6 +343,8 @@ def test_browser_navigate_allows_when_shared_file_missing(monkeypatch, tmp_path)
async def test_web_extract_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
monkeypatch.setattr(
web_tools,
"check_website_access",
@@ -389,6 +391,9 @@ def test_check_website_access_fails_open_on_malformed_config(tmp_path, monkeypat
async def test_web_extract_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
def fake_check(url):
if url == "https://allowed.test":
return None
@@ -428,6 +433,8 @@ async def test_web_crawl_short_circuits_blocked_url(monkeypatch):
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
monkeypatch.setattr(
web_tools,
"check_website_access",
@@ -457,6 +464,8 @@ async def test_web_crawl_blocks_redirected_final_url(monkeypatch):
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
# Allow test URLs past SSRF check so website policy is what gets tested
monkeypatch.setattr(web_tools, "is_safe_url", lambda url: True)
def fake_check(url):
if url == "https://allowed.test":