From 30c417fe7092cf0f4b69fc2f1b029d056d9a5e7e Mon Sep 17 00:00:00 2001 From: teknium1 Date: Tue, 17 Mar 2026 02:59:28 -0700 Subject: [PATCH] feat: add website blocklist enforcement for web/browser tools (#1064) Adds security.website_blocklist config for user-managed domain blocking across URL-capable tools. Enforced at the tool level (not monkey-patching) so it's safe and predictable. - tools/website_policy.py: shared policy loader with domain normalization, wildcard support (*.tracking.example), shared file imports, and structured block metadata - web_extract: pre-fetch URL check + post-redirect recheck - web_crawl: pre-crawl URL check + per-page URL recheck - browser_navigate: pre-navigation URL check - Blocked responses include blocked_by_policy metadata so the agent can explain exactly what was denied Config: security: website_blocklist: enabled: true domains: ["evil.com", "*.tracking.example"] shared_files: ["team-blocklist.txt"] Salvaged from PR #1086 by @kshitijk4poor. Browser post-redirect checks deferred (browser_tool was fully rewritten since the PR branched). Co-authored-by: kshitijk4poor --- hermes_cli/config.py | 5 + tests/tools/test_website_policy.py | 486 +++++++++++++++++++++++++++++ tools/browser_tool.py | 14 + tools/web_tools.py | 62 +++- tools/website_policy.py | 193 ++++++++++++ 5 files changed, 758 insertions(+), 2 deletions(-) create mode 100644 tests/tools/test_website_policy.py create mode 100644 tools/website_policy.py diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 62d8a19a..8f8d90c4 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -354,6 +354,11 @@ DEFAULT_CONFIG = { "tirith_path": "tirith", "tirith_timeout": 5, "tirith_fail_open": True, + "website_blocklist": { + "enabled": True, + "domains": [], + "shared_files": [], + }, }, # Config schema version - bump this when adding new required fields diff --git a/tests/tools/test_website_policy.py b/tests/tools/test_website_policy.py new file mode 100644 index 00000000..e66ce9d8 --- /dev/null +++ b/tests/tools/test_website_policy.py @@ -0,0 +1,486 @@ +import json +from pathlib import Path + +import pytest +import yaml + +from tools.website_policy import WebsitePolicyError, check_website_access, load_website_blocklist + + +def test_load_website_blocklist_merges_config_and_shared_file(tmp_path): + shared = tmp_path / "community-blocklist.txt" + shared.write_text("# comment\nexample.org\nsub.bad.net\n", encoding="utf-8") + + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": ["example.com", "https://www.evil.test/path"], + "shared_files": [str(shared)], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + policy = load_website_blocklist(config_path) + + assert policy["enabled"] is True + assert {rule["pattern"] for rule in policy["rules"]} == { + "example.com", + "evil.test", + "example.org", + "sub.bad.net", + } + + +def test_check_website_access_matches_parent_domain_subdomains(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": ["example.com"], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + blocked = check_website_access("https://docs.example.com/page", config_path=config_path) + + assert blocked is not None + assert blocked["host"] == "docs.example.com" + assert blocked["rule"] == "example.com" + + +def test_check_website_access_supports_wildcard_subdomains_only(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": ["*.tracking.example"], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + assert check_website_access("https://a.tracking.example", config_path=config_path) is not None + assert check_website_access("https://www.tracking.example", config_path=config_path) is not None + assert check_website_access("https://tracking.example", config_path=config_path) is None + + +def test_default_config_exposes_website_blocklist_shape(): + from hermes_cli.config import DEFAULT_CONFIG + + website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"] + assert website_blocklist["enabled"] is True + assert website_blocklist["domains"] == [] + assert website_blocklist["shared_files"] == [] + + +def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text(yaml.safe_dump({"display": {"tool_progress": "all"}}, sort_keys=False), encoding="utf-8") + + policy = load_website_blocklist(config_path) + + assert policy == {"enabled": True, "rules": []} + + +def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": "example.com", + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with pytest.raises(WebsitePolicyError, match="security.website_blocklist.domains must be a list"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_invalid_shared_files_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "shared_files": "community-blocklist.txt", + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with pytest.raises(WebsitePolicyError, match="security.website_blocklist.shared_files must be a list"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_invalid_top_level_config_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text(yaml.safe_dump(["not", "a", "mapping"], sort_keys=False), encoding="utf-8") + + with pytest.raises(WebsitePolicyError, match="config root must be a mapping"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_invalid_security_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text(yaml.safe_dump({"security": []}, sort_keys=False), encoding="utf-8") + + with pytest.raises(WebsitePolicyError, match="security must be a mapping"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_invalid_website_blocklist_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": "block everything", + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with pytest.raises(WebsitePolicyError, match="security.website_blocklist must be a mapping"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_invalid_enabled_type(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": "false", + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + with pytest.raises(WebsitePolicyError, match="security.website_blocklist.enabled must be a boolean"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_raises_clean_error_for_malformed_yaml(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text("security: [oops\n", encoding="utf-8") + + with pytest.raises(WebsitePolicyError, match="Invalid config YAML"): + load_website_blocklist(config_path) + + +def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypatch): + shared = tmp_path / "community-blocklist.txt" + shared.write_text("example.org\n", encoding="utf-8") + + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "shared_files": [str(shared)], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + def failing_read_text(self, *args, **kwargs): + raise PermissionError("no permission") + + monkeypatch.setattr(Path, "read_text", failing_read_text) + + with pytest.raises(WebsitePolicyError, match="Failed to read shared blocklist file"): + load_website_blocklist(config_path) + + +def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path): + hermes_home = tmp_path / "hermes-home" + hermes_home.mkdir() + (hermes_home / "config.yaml").write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": ["dynamic.example"], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + blocked = check_website_access("https://dynamic.example/path") + + assert blocked is not None + assert blocked["rule"] == "dynamic.example" + + +def test_check_website_access_blocks_scheme_less_urls(tmp_path): + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "domains": ["blocked.test"], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + blocked = check_website_access("www.blocked.test/path", config_path=config_path) + + assert blocked is not None + assert blocked["host"] == "www.blocked.test" + assert blocked["rule"] == "blocked.test" + + +def test_browser_navigate_returns_policy_block(monkeypatch): + from tools import browser_tool + + monkeypatch.setattr( + browser_tool, + "check_website_access", + lambda url: { + "host": "blocked.test", + "rule": "blocked.test", + "source": "config", + "message": "Blocked by website policy", + }, + ) + monkeypatch.setattr( + browser_tool, + "_run_browser_command", + lambda *args, **kwargs: pytest.fail("browser command should not run for blocked URL"), + ) + + result = json.loads(browser_tool.browser_navigate("https://blocked.test")) + + assert result["success"] is False + assert result["blocked_by_policy"]["rule"] == "blocked.test" + + +def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(monkeypatch, tmp_path): + from tools import browser_tool + + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump( + { + "security": { + "website_blocklist": { + "enabled": True, + "shared_files": ["missing-blocklist.txt"], + } + } + }, + sort_keys=False, + ), + encoding="utf-8", + ) + + monkeypatch.setattr(browser_tool, "check_website_access", lambda url: check_website_access(url, config_path=config_path)) + + result = json.loads(browser_tool.browser_navigate("https://allowed.test")) + + assert result["success"] is False + assert "Website policy error" in result["error"] + + +@pytest.mark.asyncio +async def test_web_extract_short_circuits_blocked_url(monkeypatch): + from tools import web_tools + + monkeypatch.setattr( + web_tools, + "check_website_access", + lambda url: { + "host": "blocked.test", + "rule": "blocked.test", + "source": "config", + "message": "Blocked by website policy", + }, + ) + monkeypatch.setattr( + web_tools, + "_get_firecrawl_client", + lambda: pytest.fail("firecrawl should not run for blocked URL"), + ) + monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + + result = json.loads(await web_tools.web_extract_tool(["https://blocked.test"], use_llm_processing=False)) + + assert result["results"][0]["url"] == "https://blocked.test" + assert "Blocked by website policy" in result["results"][0]["error"] + + +@pytest.mark.asyncio +async def test_web_extract_returns_clean_policy_error_for_malformed_config(monkeypatch, tmp_path): + from tools import web_tools + + config_path = tmp_path / "config.yaml" + config_path.write_text("security: [oops\n", encoding="utf-8") + + monkeypatch.setattr(web_tools, "check_website_access", lambda url: check_website_access(url, config_path=config_path)) + monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + + result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False)) + + assert result["results"][0]["url"] == "https://allowed.test" + assert "Website policy error" in result["results"][0]["error"] + + +@pytest.mark.asyncio +async def test_web_extract_blocks_redirected_final_url(monkeypatch): + from tools import web_tools + + def fake_check(url): + if url == "https://allowed.test": + return None + if url == "https://blocked.test/final": + return { + "host": "blocked.test", + "rule": "blocked.test", + "source": "config", + "message": "Blocked by website policy", + } + pytest.fail(f"unexpected URL checked: {url}") + + class FakeFirecrawlClient: + def scrape(self, url, formats): + return { + "markdown": "secret content", + "metadata": { + "title": "Redirected", + "sourceURL": "https://blocked.test/final", + }, + } + + monkeypatch.setattr(web_tools, "check_website_access", fake_check) + monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeFirecrawlClient()) + monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + + result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False)) + + assert result["results"][0]["url"] == "https://blocked.test/final" + assert result["results"][0]["content"] == "" + assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test" + + +@pytest.mark.asyncio +async def test_web_crawl_short_circuits_blocked_url(monkeypatch): + from tools import web_tools + + monkeypatch.setattr( + web_tools, + "check_website_access", + lambda url: { + "host": "blocked.test", + "rule": "blocked.test", + "source": "config", + "message": "Blocked by website policy", + }, + ) + monkeypatch.setattr( + web_tools, + "_get_firecrawl_client", + lambda: pytest.fail("firecrawl should not run for blocked crawl URL"), + ) + monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + + result = json.loads(await web_tools.web_crawl_tool("https://blocked.test", use_llm_processing=False)) + + assert result["results"][0]["url"] == "https://blocked.test" + assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test" + + +@pytest.mark.asyncio +async def test_web_crawl_blocks_redirected_final_url(monkeypatch): + from tools import web_tools + + def fake_check(url): + if url == "https://allowed.test": + return None + if url == "https://blocked.test/final": + return { + "host": "blocked.test", + "rule": "blocked.test", + "source": "config", + "message": "Blocked by website policy", + } + pytest.fail(f"unexpected URL checked: {url}") + + class FakeCrawlClient: + def crawl(self, url, **kwargs): + return { + "data": [ + { + "markdown": "secret crawl content", + "metadata": { + "title": "Redirected crawl page", + "sourceURL": "https://blocked.test/final", + }, + } + ] + } + + monkeypatch.setattr(web_tools, "check_website_access", fake_check) + monkeypatch.setattr(web_tools, "_get_firecrawl_client", lambda: FakeCrawlClient()) + monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + + result = json.loads(await web_tools.web_crawl_tool("https://allowed.test", use_llm_processing=False)) + + assert result["results"][0]["content"] == "" + assert result["results"][0]["error"] == "Blocked by website policy" + assert result["results"][0]["blocked_by_policy"]["rule"] == "blocked.test" diff --git a/tools/browser_tool.py b/tools/browser_tool.py index d57eedee..c127f685 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -65,6 +65,7 @@ import requests from typing import Dict, Any, Optional, List from pathlib import Path from agent.auxiliary_client import call_llm +from tools.website_policy import check_website_access from tools.browser_providers.base import CloudBrowserProvider from tools.browser_providers.browserbase import BrowserbaseProvider from tools.browser_providers.browser_use import BrowserUseProvider @@ -901,6 +902,19 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: Returns: JSON string with navigation result (includes stealth features info on first nav) """ + # Website policy check — block before navigating + try: + blocked = check_website_access(url) + except Exception as _policy_err: + return json.dumps({"success": False, "error": f"Website policy error: {_policy_err}"}) + if blocked: + logger.info("Blocked browser_navigate to %s by rule %s", blocked["host"], blocked["rule"]) + return json.dumps({ + "success": False, + "error": blocked["message"], + "blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]}, + }) + effective_task_id = task_id or "default" # Get session info to check if this is a new session diff --git a/tools/web_tools.py b/tools/web_tools.py index ede1adb0..c478f25f 100644 --- a/tools/web_tools.py +++ b/tools/web_tools.py @@ -49,6 +49,7 @@ from typing import List, Dict, Any, Optional from firecrawl import Firecrawl from agent.auxiliary_client import async_call_llm from tools.debug_helpers import DebugSession +from tools.website_policy import WebsitePolicyError, check_website_access logger = logging.getLogger(__name__) @@ -616,6 +617,21 @@ async def web_extract_tool( results.append({"url": url, "error": "Interrupted", "title": ""}) continue + # Website policy check — block before fetching + try: + blocked = check_website_access(url) + except WebsitePolicyError as policy_err: + results.append({"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}) + continue + if blocked: + logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"]) + results.append({ + "url": url, "title": "", "content": "", + "error": blocked["message"], + "blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]}, + }) + continue + try: logger.info("Scraping: %s", url) scrape_result = _get_firecrawl_client().scrape( @@ -669,11 +685,26 @@ async def web_extract_tool( # Get title from metadata title = metadata.get("title", "") + # Re-check final URL after redirect + final_url = metadata.get("sourceURL", url) + try: + final_blocked = check_website_access(final_url) + except WebsitePolicyError: + final_blocked = None + if final_blocked: + logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"]) + results.append({ + "url": final_url, "title": title, "content": "", "raw_content": "", + "error": final_blocked["message"], + "blocked_by_policy": {"host": final_blocked["host"], "rule": final_blocked["rule"], "source": final_blocked["source"]}, + }) + continue + # Choose content based on requested format chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or "" results.append({ - "url": metadata.get("sourceURL", url), + "url": final_url, "title": title, "content": chosen_content, "raw_content": chosen_content, @@ -778,6 +809,7 @@ async def web_extract_tool( "title": r.get("title", ""), "content": r.get("content", ""), "error": r.get("error"), + **({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}), } for r in response.get("results", []) ] @@ -870,6 +902,16 @@ async def web_crawl_tool( instructions_text = f" with instructions: '{instructions}'" if instructions else "" logger.info("Crawling %s%s", url, instructions_text) + # Website policy check — block before crawling + try: + blocked = check_website_access(url) + except WebsitePolicyError as policy_err: + return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}]}, ensure_ascii=False) + if blocked: + logger.info("Blocked web_crawl for %s by rule %s", blocked["host"], blocked["rule"]) + return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": blocked["message"], + "blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]}}]}, ensure_ascii=False) + # Use Firecrawl's v2 crawl functionality # Docs: https://docs.firecrawl.dev/features/crawl # The crawl() method automatically waits for completion and returns all data @@ -975,6 +1017,20 @@ async def web_crawl_tool( page_url = metadata.get("sourceURL", metadata.get("url", "Unknown URL")) title = metadata.get("title", "") + # Re-check crawled page URL against policy + try: + page_blocked = check_website_access(page_url) + except WebsitePolicyError: + page_blocked = None + if page_blocked: + logger.info("Blocked crawled page %s by rule %s", page_blocked["host"], page_blocked["rule"]) + pages.append({ + "url": page_url, "title": title, "content": "", "raw_content": "", + "error": page_blocked["message"], + "blocked_by_policy": {"host": page_blocked["host"], "rule": page_blocked["rule"], "source": page_blocked["source"]}, + }) + continue + # Choose content (prefer markdown) content = content_markdown or content_html or "" @@ -1070,9 +1126,11 @@ async def web_crawl_tool( # Trim output to minimal fields per entry: title, content, error trimmed_results = [ { + "url": r.get("url", ""), "title": r.get("title", ""), "content": r.get("content", ""), - "error": r.get("error") + "error": r.get("error"), + **({ "blocked_by_policy": r["blocked_by_policy"]} if "blocked_by_policy" in r else {}), } for r in response.get("results", []) ] diff --git a/tools/website_policy.py b/tools/website_policy.py new file mode 100644 index 00000000..21e8dad7 --- /dev/null +++ b/tools/website_policy.py @@ -0,0 +1,193 @@ +"""Website access policy helpers for URL-capable tools. + +This module loads a user-managed website blocklist from ~/.hermes/config.yaml +and optional shared list files. It is intentionally lightweight so web/browser +tools can enforce URL policy without pulling in the heavier CLI config stack. +""" + +from __future__ import annotations + +import fnmatch +import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import yaml + + +_DEFAULT_WEBSITE_BLOCKLIST = { + "enabled": True, + "domains": [], + "shared_files": [], +} + + +def _get_hermes_home() -> Path: + return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + + +def _get_default_config_path() -> Path: + return _get_hermes_home() / "config.yaml" + + +class WebsitePolicyError(Exception): + """Raised when a website policy file is malformed.""" + + +def _normalize_host(host: str) -> str: + return (host or "").strip().lower().rstrip(".") + + +def _normalize_rule(rule: Any) -> Optional[str]: + if not isinstance(rule, str): + return None + value = rule.strip().lower() + if not value or value.startswith("#"): + return None + if "://" in value: + parsed = urlparse(value) + value = parsed.netloc or parsed.path + value = value.split("/", 1)[0].strip().rstrip(".") + if value.startswith("www."): + value = value[4:] + return value or None + + +def _iter_blocklist_file_rules(path: Path) -> List[str]: + try: + raw = path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise WebsitePolicyError(f"Shared blocklist file not found: {path}") from exc + except (OSError, UnicodeDecodeError) as exc: + raise WebsitePolicyError(f"Failed to read shared blocklist file {path}: {exc}") from exc + + rules: List[str] = [] + for line in raw.splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + normalized = _normalize_rule(stripped) + if normalized: + rules.append(normalized) + return rules + + +def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]: + config_path = config_path or _get_default_config_path() + if not config_path.exists(): + return dict(_DEFAULT_WEBSITE_BLOCKLIST) + try: + with open(config_path, encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + except yaml.YAMLError as exc: + raise WebsitePolicyError(f"Invalid config YAML at {config_path}: {exc}") from exc + except OSError as exc: + raise WebsitePolicyError(f"Failed to read config file {config_path}: {exc}") from exc + if not isinstance(config, dict): + raise WebsitePolicyError("config root must be a mapping") + + security = config.get("security", {}) + if security is None: + security = {} + if not isinstance(security, dict): + raise WebsitePolicyError("security must be a mapping") + + website_blocklist = security.get("website_blocklist", {}) + if website_blocklist is None: + website_blocklist = {} + if not isinstance(website_blocklist, dict): + raise WebsitePolicyError("security.website_blocklist must be a mapping") + + policy = dict(_DEFAULT_WEBSITE_BLOCKLIST) + policy.update(website_blocklist) + return policy + + +def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]: + config_path = config_path or _get_default_config_path() + policy = _load_policy_config(config_path) + + raw_domains = policy.get("domains", []) or [] + if not isinstance(raw_domains, list): + raise WebsitePolicyError("security.website_blocklist.domains must be a list") + + raw_shared_files = policy.get("shared_files", []) or [] + if not isinstance(raw_shared_files, list): + raise WebsitePolicyError("security.website_blocklist.shared_files must be a list") + + enabled = policy.get("enabled", True) + if not isinstance(enabled, bool): + raise WebsitePolicyError("security.website_blocklist.enabled must be a boolean") + + rules: List[Dict[str, str]] = [] + seen: set[Tuple[str, str]] = set() + + for raw_rule in raw_domains: + normalized = _normalize_rule(raw_rule) + if normalized and ("config", normalized) not in seen: + rules.append({"pattern": normalized, "source": "config"}) + seen.add(("config", normalized)) + + for shared_file in raw_shared_files: + if not isinstance(shared_file, str) or not shared_file.strip(): + continue + path = Path(shared_file).expanduser() + if not path.is_absolute(): + path = (_get_hermes_home() / path).resolve() + for normalized in _iter_blocklist_file_rules(path): + key = (str(path), normalized) + if key in seen: + continue + rules.append({"pattern": normalized, "source": str(path)}) + seen.add(key) + + return {"enabled": enabled, "rules": rules} + + +def _match_host_against_rule(host: str, pattern: str) -> bool: + if not host or not pattern: + return False + if pattern.startswith("*."): + return fnmatch.fnmatch(host, pattern) + return host == pattern or host.endswith(f".{pattern}") + + +def _extract_host_from_urlish(url: str) -> str: + parsed = urlparse(url) + host = _normalize_host(parsed.hostname or parsed.netloc) + if host: + return host + + if "://" not in url: + schemeless = urlparse(f"//{url}") + host = _normalize_host(schemeless.hostname or schemeless.netloc) + if host: + return host + + return "" + + +def check_website_access(url: str, config_path: Optional[Path] = None) -> Optional[Dict[str, str]]: + host = _extract_host_from_urlish(url) + if not host: + return None + + policy = load_website_blocklist(config_path) + if not policy.get("enabled"): + return None + + for rule in policy.get("rules", []): + pattern = rule.get("pattern", "") + if _match_host_against_rule(host, pattern): + return { + "url": url, + "host": host, + "rule": pattern, + "source": rule.get("source", "config"), + "message": ( + f"Blocked by website policy: '{host}' matched rule '{pattern}'" + f" from {rule.get('source', 'config')}" + ), + } + return None