feat: add website blocklist enforcement for web/browser tools (#1064)

Adds security.website_blocklist config for user-managed domain blocking
across URL-capable tools. Enforced at the tool level (not monkey-patching)
so it's safe and predictable.

- tools/website_policy.py: shared policy loader with domain normalization,
  wildcard support (*.tracking.example), shared file imports, and
  structured block metadata
- web_extract: pre-fetch URL check + post-redirect recheck
- web_crawl: pre-crawl URL check + per-page URL recheck
- browser_navigate: pre-navigation URL check
- Blocked responses include blocked_by_policy metadata so the agent
  can explain exactly what was denied

Config:
  security:
    website_blocklist:
      enabled: true
      domains: ["evil.com", "*.tracking.example"]
      shared_files: ["team-blocklist.txt"]

Salvaged from PR #1086 by @kshitijk4poor. Browser post-redirect checks
deferred (browser_tool was fully rewritten since the PR branched).

Co-authored-by: kshitijk4poor <kshitijk4poor@users.noreply.github.com>
This commit is contained in:
teknium1
2026-03-17 02:59:28 -07:00
parent 6020db0243
commit 30c417fe70
5 changed files with 758 additions and 2 deletions

View File

@@ -354,6 +354,11 @@ DEFAULT_CONFIG = {
"tirith_path": "tirith",
"tirith_timeout": 5,
"tirith_fail_open": True,
"website_blocklist": {
"enabled": True,
"domains": [],
"shared_files": [],
},
},
# Config schema version - bump this when adding new required fields

View File

@@ -0,0 +1,486 @@
import json
from pathlib import Path
import pytest
import yaml
from tools.website_policy import WebsitePolicyError, check_website_access, load_website_blocklist
def test_load_website_blocklist_merges_config_and_shared_file(tmp_path):
shared = tmp_path / "community-blocklist.txt"
shared.write_text("# comment\nexample.org\nsub.bad.net\n", encoding="utf-8")
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["example.com", "https://www.evil.test/path"],
"shared_files": [str(shared)],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
policy = load_website_blocklist(config_path)
assert policy["enabled"] is True
assert {rule["pattern"] for rule in policy["rules"]} == {
"example.com",
"evil.test",
"example.org",
"sub.bad.net",
}
def test_check_website_access_matches_parent_domain_subdomains(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["example.com"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
blocked = check_website_access("https://docs.example.com/page", config_path=config_path)
assert blocked is not None
assert blocked["host"] == "docs.example.com"
assert blocked["rule"] == "example.com"
def test_check_website_access_supports_wildcard_subdomains_only(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["*.tracking.example"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
assert check_website_access("https://a.tracking.example", config_path=config_path) is not None
assert check_website_access("https://www.tracking.example", config_path=config_path) is not None
assert check_website_access("https://tracking.example", config_path=config_path) is None
def test_default_config_exposes_website_blocklist_shape():
from hermes_cli.config import DEFAULT_CONFIG
website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"]
assert website_blocklist["enabled"] is True
assert website_blocklist["domains"] == []
assert website_blocklist["shared_files"] == []
def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump({"display": {"tool_progress": "all"}}, sort_keys=False), encoding="utf-8")
policy = load_website_blocklist(config_path)
assert policy == {"enabled": True, "rules": []}
def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": "example.com",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.domains must be a list"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_shared_files_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": "community-blocklist.txt",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.shared_files must be a list"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_top_level_config_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump(["not", "a", "mapping"], sort_keys=False), encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="config root must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_security_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump({"security": []}, sort_keys=False), encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="security must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_website_blocklist_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": "block everything",
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist must be a mapping"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_invalid_enabled_type(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": "false",
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
with pytest.raises(WebsitePolicyError, match="security.website_blocklist.enabled must be a boolean"):
load_website_blocklist(config_path)
def test_load_website_blocklist_raises_clean_error_for_malformed_yaml(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text("security: [oops\n", encoding="utf-8")
with pytest.raises(WebsitePolicyError, match="Invalid config YAML"):
load_website_blocklist(config_path)
def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypatch):
shared = tmp_path / "community-blocklist.txt"
shared.write_text("example.org\n", encoding="utf-8")
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": [str(shared)],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
def failing_read_text(self, *args, **kwargs):
raise PermissionError("no permission")
monkeypatch.setattr(Path, "read_text", failing_read_text)
with pytest.raises(WebsitePolicyError, match="Failed to read shared blocklist file"):
load_website_blocklist(config_path)
def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path):
hermes_home = tmp_path / "hermes-home"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["dynamic.example"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
blocked = check_website_access("https://dynamic.example/path")
assert blocked is not None
assert blocked["rule"] == "dynamic.example"
def test_check_website_access_blocks_scheme_less_urls(tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"domains": ["blocked.test"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
blocked = check_website_access("www.blocked.test/path", config_path=config_path)
assert blocked is not None
assert blocked["host"] == "www.blocked.test"
assert blocked["rule"] == "blocked.test"
def test_browser_navigate_returns_policy_block(monkeypatch):
from tools import browser_tool
monkeypatch.setattr(
browser_tool,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
browser_tool,
"_run_browser_command",
lambda *args, **kwargs: pytest.fail("browser command should not run for blocked URL"),
)
result = json.loads(browser_tool.browser_navigate("https://blocked.test"))
assert result["success"] is False
assert result["blocked_by_policy"]["rule"] == "blocked.test"
def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(monkeypatch, tmp_path):
from tools import browser_tool
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump(
{
"security": {
"website_blocklist": {
"enabled": True,
"shared_files": ["missing-blocklist.txt"],
}
}
},
sort_keys=False,
),
encoding="utf-8",
)
monkeypatch.setattr(browser_tool, "check_website_access", lambda url: check_website_access(url, config_path=config_path))
result = json.loads(browser_tool.browser_navigate("https://allowed.test"))
assert result["success"] is False
assert "Website policy error" in result["error"]
@pytest.mark.asyncio
async def test_web_extract_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
monkeypatch.setattr(
web_tools,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
web_tools,
"_get_firecrawl_client",
lambda: pytest.fail("firecrawl should not run for blocked URL"),
)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_extract_tool(["https://blocked.test"], use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test"
assert "Blocked by website policy" in result["results"][0]["error"]
@pytest.mark.asyncio
async def test_web_extract_returns_clean_policy_error_for_malformed_config(monkeypatch, tmp_path):
from tools import web_tools
config_path = tmp_path / "config.yaml"
config_path.write_text("security: [oops\n", encoding="utf-8")
monkeypatch.setattr(web_tools, "check_website_access", lambda url: check_website_access(url, config_path=config_path))
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False))
assert result["results"][0]["url"] == "https://allowed.test"
assert "Website policy error" in result["results"][0]["error"]
@pytest.mark.asyncio
async def test_web_extract_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
def fake_check(url):
if url == "https://allowed.test":
return None
if url == "https://blocked.test/final":
return {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
}
pytest.fail(f"unexpected URL checked: {url}")
class FakeFirecrawlClient:
def scrape(self, url, formats):
return {
"markdown": "secret content",
"metadata": {
"title": "Redirected",
"sourceURL": "https://blocked.test/final",
},
}
monkeypatch.setattr(web_tools, "check_website_access", fake_check)
monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeFirecrawlClient())
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test/final"
assert result["results"][0]["content"] == ""
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
@pytest.mark.asyncio
async def test_web_crawl_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
monkeypatch.setattr(
web_tools,
"check_website_access",
lambda url: {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
},
)
monkeypatch.setattr(
web_tools,
"_get_firecrawl_client",
lambda: pytest.fail("firecrawl should not run for blocked crawl URL"),
)
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_crawl_tool("https://blocked.test", use_llm_processing=False))
assert result["results"][0]["url"] == "https://blocked.test"
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"
@pytest.mark.asyncio
async def test_web_crawl_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
def fake_check(url):
if url == "https://allowed.test":
return None
if url == "https://blocked.test/final":
return {
"host": "blocked.test",
"rule": "blocked.test",
"source": "config",
"message": "Blocked by website policy",
}
pytest.fail(f"unexpected URL checked: {url}")
class FakeCrawlClient:
def crawl(self, url, **kwargs):
return {
"data": [
{
"markdown": "secret crawl content",
"metadata": {
"title": "Redirected crawl page",
"sourceURL": "https://blocked.test/final",
},
}
]
}
monkeypatch.setattr(web_tools, "check_website_access", fake_check)
monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeCrawlClient())
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
result = json.loads(await web_tools.web_crawl_tool("https://allowed.test", use_llm_processing=False))
assert result["results"][0]["content"] == ""
assert result["results"][0]["error"] == "Blocked by website policy"
assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test"

View File

@@ -65,6 +65,7 @@ import requests
from typing import Dict, Any, Optional, List
from pathlib import Path
from agent.auxiliary_client import call_llm
from tools.website_policy import check_website_access
from tools.browser_providers.base import CloudBrowserProvider
from tools.browser_providers.browserbase import BrowserbaseProvider
from tools.browser_providers.browser_use import BrowserUseProvider
@@ -901,6 +902,19 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
Returns:
JSON string with navigation result (includes stealth features info on first nav)
"""
# Website policy check — block before navigating
try:
blocked = check_website_access(url)
except Exception as _policy_err:
return json.dumps({"success": False, "error": f"Website policy error: {_policy_err}"})
if blocked:
logger.info("Blocked browser_navigate to %s by rule %s", blocked["host"], blocked["rule"])
return json.dumps({
"success": False,
"error": blocked["message"],
"blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]},
})
effective_task_id = task_id or "default"
# Get session info to check if this is a new session

View File

@@ -49,6 +49,7 @@ from typing import List, Dict, Any, Optional
from firecrawl import Firecrawl
from agent.auxiliary_client import async_call_llm
from tools.debug_helpers import DebugSession
from tools.website_policy import WebsitePolicyError, check_website_access
logger = logging.getLogger(__name__)
@@ -616,6 +617,21 @@ async def web_extract_tool(
results.append({"url": url, "error": "Interrupted", "title": ""})
continue
# Website policy check — block before fetching
try:
blocked = check_website_access(url)
except WebsitePolicyError as policy_err:
results.append({"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"})
continue
if blocked:
logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"])
results.append({
"url": url, "title": "", "content": "",
"error": blocked["message"],
"blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]},
})
continue
try:
logger.info("Scraping: %s", url)
scrape_result = _get_firecrawl_client().scrape(
@@ -669,11 +685,26 @@ async def web_extract_tool(
# Get title from metadata
title = metadata.get("title", "")
# Re-check final URL after redirect
final_url = metadata.get("sourceURL", url)
try:
final_blocked = check_website_access(final_url)
except WebsitePolicyError:
final_blocked = None
if final_blocked:
logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"])
results.append({
"url": final_url, "title": title, "content": "", "raw_content": "",
"error": final_blocked["message"],
"blocked_by_policy": {"host": final_blocked["host"], "rule": final_blocked["rule"], "source": final_blocked["source"]},
})
continue
# Choose content based on requested format
chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or ""
results.append({
"url": metadata.get("sourceURL", url),
"url": final_url,
"title": title,
"content": chosen_content,
"raw_content": chosen_content,
@@ -778,6 +809,7 @@ async def web_extract_tool(
"title": r.get("title", ""),
"content": r.get("content", ""),
"error": r.get("error"),
**({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}),
}
for r in response.get("results", [])
]
@@ -870,6 +902,16 @@ async def web_crawl_tool(
instructions_text = f" with instructions: '{instructions}'" if instructions else ""
logger.info("Crawling %s%s", url, instructions_text)
# Website policy check — block before crawling
try:
blocked = check_website_access(url)
except WebsitePolicyError as policy_err:
return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}]}, ensure_ascii=False)
if blocked:
logger.info("Blocked web_crawl for %s by rule %s", blocked["host"], blocked["rule"])
return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": blocked["message"],
"blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]}}]}, ensure_ascii=False)
# Use Firecrawl's v2 crawl functionality
# Docs: https://docs.firecrawl.dev/features/crawl
# The crawl() method automatically waits for completion and returns all data
@@ -975,6 +1017,20 @@ async def web_crawl_tool(
page_url = metadata.get("sourceURL", metadata.get("url", "Unknown URL"))
title = metadata.get("title", "")
# Re-check crawled page URL against policy
try:
page_blocked = check_website_access(page_url)
except WebsitePolicyError:
page_blocked = None
if page_blocked:
logger.info("Blocked crawled page %s by rule %s", page_blocked["host"], page_blocked["rule"])
pages.append({
"url": page_url, "title": title, "content": "", "raw_content": "",
"error": page_blocked["message"],
"blocked_by_policy": {"host": page_blocked["host"], "rule": page_blocked["rule"], "source": page_blocked["source"]},
})
continue
# Choose content (prefer markdown)
content = content_markdown or content_html or ""
@@ -1070,9 +1126,11 @@ async def web_crawl_tool(
# Trim output to minimal fields per entry: title, content, error
trimmed_results = [
{
"url": r.get("url", ""),
"title": r.get("title", ""),
"content": r.get("content", ""),
"error": r.get("error")
"error": r.get("error"),
**({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}),
}
for r in response.get("results", [])
]

193
tools/website_policy.py Normal file
View File

@@ -0,0 +1,193 @@
"""Website access policy helpers for URL-capable tools.
This module loads a user-managed website blocklist from ~/.hermes/config.yaml
and optional shared list files. It is intentionally lightweight so web/browser
tools can enforce URL policy without pulling in the heavier CLI config stack.
"""
from __future__ import annotations
import fnmatch
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
import yaml
_DEFAULT_WEBSITE_BLOCKLIST = {
"enabled": True,
"domains": [],
"shared_files": [],
}
def _get_hermes_home() -> Path:
return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
def _get_default_config_path() -> Path:
return _get_hermes_home() / "config.yaml"
class WebsitePolicyError(Exception):
"""Raised when a website policy file is malformed."""
def _normalize_host(host: str) -> str:
return (host or "").strip().lower().rstrip(".")
def _normalize_rule(rule: Any) -> Optional[str]:
if not isinstance(rule, str):
return None
value = rule.strip().lower()
if not value or value.startswith("#"):
return None
if "://" in value:
parsed = urlparse(value)
value = parsed.netloc or parsed.path
value = value.split("/", 1)[0].strip().rstrip(".")
if value.startswith("www."):
value = value[4:]
return value or None
def _iter_blocklist_file_rules(path: Path) -> List[str]:
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise WebsitePolicyError(f"Shared blocklist file not found: {path}") from exc
except (OSError, UnicodeDecodeError) as exc:
raise WebsitePolicyError(f"Failed to read shared blocklist file {path}: {exc}") from exc
rules: List[str] = []
for line in raw.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
normalized = _normalize_rule(stripped)
if normalized:
rules.append(normalized)
return rules
def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
config_path = config_path or _get_default_config_path()
if not config_path.exists():
return dict(_DEFAULT_WEBSITE_BLOCKLIST)
try:
with open(config_path, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
except yaml.YAMLError as exc:
raise WebsitePolicyError(f"Invalid config YAML at {config_path}: {exc}") from exc
except OSError as exc:
raise WebsitePolicyError(f"Failed to read config file {config_path}: {exc}") from exc
if not isinstance(config, dict):
raise WebsitePolicyError("config root must be a mapping")
security = config.get("security", {})
if security is None:
security = {}
if not isinstance(security, dict):
raise WebsitePolicyError("security must be a mapping")
website_blocklist = security.get("website_blocklist", {})
if website_blocklist is None:
website_blocklist = {}
if not isinstance(website_blocklist, dict):
raise WebsitePolicyError("security.website_blocklist must be a mapping")
policy = dict(_DEFAULT_WEBSITE_BLOCKLIST)
policy.update(website_blocklist)
return policy
def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]:
config_path = config_path or _get_default_config_path()
policy = _load_policy_config(config_path)
raw_domains = policy.get("domains", []) or []
if not isinstance(raw_domains, list):
raise WebsitePolicyError("security.website_blocklist.domains must be a list")
raw_shared_files = policy.get("shared_files", []) or []
if not isinstance(raw_shared_files, list):
raise WebsitePolicyError("security.website_blocklist.shared_files must be a list")
enabled = policy.get("enabled", True)
if not isinstance(enabled, bool):
raise WebsitePolicyError("security.website_blocklist.enabled must be a boolean")
rules: List[Dict[str, str]] = []
seen: set[Tuple[str, str]] = set()
for raw_rule in raw_domains:
normalized = _normalize_rule(raw_rule)
if normalized and ("config", normalized) not in seen:
rules.append({"pattern": normalized, "source": "config"})
seen.add(("config", normalized))
for shared_file in raw_shared_files:
if not isinstance(shared_file, str) or not shared_file.strip():
continue
path = Path(shared_file).expanduser()
if not path.is_absolute():
path = (_get_hermes_home() / path).resolve()
for normalized in _iter_blocklist_file_rules(path):
key = (str(path), normalized)
if key in seen:
continue
rules.append({"pattern": normalized, "source": str(path)})
seen.add(key)
return {"enabled": enabled, "rules": rules}
def _match_host_against_rule(host: str, pattern: str) -> bool:
if not host or not pattern:
return False
if pattern.startswith("*."):
return fnmatch.fnmatch(host, pattern)
return host == pattern or host.endswith(f".{pattern}")
def _extract_host_from_urlish(url: str) -> str:
parsed = urlparse(url)
host = _normalize_host(parsed.hostname or parsed.netloc)
if host:
return host
if "://" not in url:
schemeless = urlparse(f"//{url}")
host = _normalize_host(schemeless.hostname or schemeless.netloc)
if host:
return host
return ""
def check_website_access(url: str, config_path: Optional[Path] = None) -> Optional[Dict[str, str]]:
host = _extract_host_from_urlish(url)
if not host:
return None
policy = load_website_blocklist(config_path)
if not policy.get("enabled"):
return None
for rule in policy.get("rules", []):
pattern = rule.get("pattern", "")
if _match_host_against_rule(host, pattern):
return {
"url": url,
"host": host,
"rule": pattern,
"source": rule.get("source", "config"),
"message": (
f"Blocked by website policy: '{host}' matched rule '{pattern}'"
f" from {rule.get('source', 'config')}"
),
}
return None