diff --git a/hermes_cli/config.py b/hermes_cli/config.py index d0b260d7..a72c0ff6 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -356,7 +356,7 @@ DEFAULT_CONFIG = { "tirith_timeout": 5, "tirith_fail_open": True, "website_blocklist": { - "enabled": True, + "enabled": False, "domains": [], "shared_files": [], }, diff --git a/tests/tools/test_website_policy.py b/tests/tools/test_website_policy.py index e66ce9d8..13f7aedd 100644 --- a/tests/tools/test_website_policy.py +++ b/tests/tools/test_website_policy.py @@ -89,7 +89,7 @@ def test_default_config_exposes_website_blocklist_shape(): from hermes_cli.config import DEFAULT_CONFIG website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"] - assert website_blocklist["enabled"] is True + assert website_blocklist["enabled"] is False assert website_blocklist["domains"] == [] assert website_blocklist["shared_files"] == [] @@ -100,7 +100,7 @@ def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_pa policy = load_website_blocklist(config_path) - assert policy == {"enabled": True, "rules": []} + assert policy == {"enabled": False, "rules": []} def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path): @@ -232,8 +232,11 @@ def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypa monkeypatch.setattr(Path, "read_text", failing_read_text) - with pytest.raises(WebsitePolicyError, match="Failed to read shared blocklist file"): - load_website_blocklist(config_path) + # Unreadable shared files are now warned and skipped (not raised), + # so the blocklist loads successfully but without those rules. + result = load_website_blocklist(config_path) + assert result["enabled"] is True + assert result["rules"] == [] # shared file rules skipped def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path): @@ -311,7 +314,8 @@ def test_browser_navigate_returns_policy_block(monkeypatch): assert result["blocked_by_policy"]["rule"] == "blocked.test" -def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(monkeypatch, tmp_path): +def test_browser_navigate_allows_when_shared_file_missing(monkeypatch, tmp_path): + """Missing shared blocklist files are warned and skipped, not fatal.""" from tools import browser_tool config_path = tmp_path / "config.yaml" @@ -330,12 +334,9 @@ def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(mon encoding="utf-8", ) - monkeypatch.setattr(browser_tool, "check_website_access", lambda url: check_website_access(url, config_path=config_path)) - - result = json.loads(browser_tool.browser_navigate("https://allowed.test")) - - assert result["success"] is False - assert "Website policy error" in result["error"] + # check_website_access should return None (allow) — missing file is skipped + result = check_website_access("https://allowed.test", config_path=config_path) + assert result is None @pytest.mark.asyncio @@ -365,20 +366,23 @@ async def test_web_extract_short_circuits_blocked_url(monkeypatch): assert "Blocked by website policy" in result["results"][0]["error"] -@pytest.mark.asyncio -async def test_web_extract_returns_clean_policy_error_for_malformed_config(monkeypatch, tmp_path): - from tools import web_tools - +def test_check_website_access_fails_open_on_malformed_config(tmp_path, monkeypatch): + """Malformed config with default path should fail open (return None), not crash.""" config_path = tmp_path / "config.yaml" config_path.write_text("security: [oops\n", encoding="utf-8") - monkeypatch.setattr(web_tools, "check_website_access", lambda url: check_website_access(url, config_path=config_path)) - monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False) + # With explicit config_path (test mode), errors propagate + with pytest.raises(WebsitePolicyError): + check_website_access("https://example.com", config_path=config_path) - result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False)) + # Simulate default path by pointing HERMES_HOME to tmp_path + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools import website_policy + website_policy.invalidate_cache() - assert result["results"][0]["url"] == "https://allowed.test" - assert "Website policy error" in result["results"][0]["error"] + # With default path, errors are caught and fail open + result = check_website_access("https://example.com") + assert result is None # allowed, not crashed @pytest.mark.asyncio diff --git a/tools/browser_tool.py b/tools/browser_tool.py index c127f685..c396c6e5 100644 --- a/tools/browser_tool.py +++ b/tools/browser_tool.py @@ -65,7 +65,11 @@ import requests from typing import Dict, Any, Optional, List from pathlib import Path from agent.auxiliary_client import call_llm -from tools.website_policy import check_website_access + +try: + from tools.website_policy import check_website_access +except Exception: + check_website_access = lambda url: None # noqa: E731 — fail-open if policy module unavailable from tools.browser_providers.base import CloudBrowserProvider from tools.browser_providers.browserbase import BrowserbaseProvider from tools.browser_providers.browser_use import BrowserUseProvider @@ -903,12 +907,8 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str: JSON string with navigation result (includes stealth features info on first nav) """ # Website policy check — block before navigating - try: - blocked = check_website_access(url) - except Exception as _policy_err: - return json.dumps({"success": False, "error": f"Website policy error: {_policy_err}"}) + blocked = check_website_access(url) if blocked: - logger.info("Blocked browser_navigate to %s by rule %s", blocked["host"], blocked["rule"]) return json.dumps({ "success": False, "error": blocked["message"], diff --git a/tools/web_tools.py b/tools/web_tools.py index c478f25f..92c0ae60 100644 --- a/tools/web_tools.py +++ b/tools/web_tools.py @@ -49,7 +49,7 @@ from typing import List, Dict, Any, Optional from firecrawl import Firecrawl from agent.auxiliary_client import async_call_llm from tools.debug_helpers import DebugSession -from tools.website_policy import WebsitePolicyError, check_website_access +from tools.website_policy import check_website_access logger = logging.getLogger(__name__) @@ -618,11 +618,7 @@ async def web_extract_tool( continue # Website policy check — block before fetching - try: - blocked = check_website_access(url) - except WebsitePolicyError as policy_err: - results.append({"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}) - continue + blocked = check_website_access(url) if blocked: logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"]) results.append({ @@ -687,10 +683,7 @@ async def web_extract_tool( # Re-check final URL after redirect final_url = metadata.get("sourceURL", url) - try: - final_blocked = check_website_access(final_url) - except WebsitePolicyError: - final_blocked = None + final_blocked = check_website_access(final_url) if final_blocked: logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"]) results.append({ @@ -903,10 +896,7 @@ async def web_crawl_tool( logger.info("Crawling %s%s", url, instructions_text) # Website policy check — block before crawling - try: - blocked = check_website_access(url) - except WebsitePolicyError as policy_err: - return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}]}, ensure_ascii=False) + blocked = check_website_access(url) if blocked: logger.info("Blocked web_crawl for %s by rule %s", blocked["host"], blocked["rule"]) return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": blocked["message"], @@ -1018,10 +1008,7 @@ async def web_crawl_tool( title = metadata.get("title", "") # Re-check crawled page URL against policy - try: - page_blocked = check_website_access(page_url) - except WebsitePolicyError: - page_blocked = None + page_blocked = check_website_access(page_url) if page_blocked: logger.info("Blocked crawled page %s by rule %s", page_blocked["host"], page_blocked["rule"]) pages.append({ diff --git a/tools/website_policy.py b/tools/website_policy.py index 21e8dad7..2a3d2470 100644 --- a/tools/website_policy.py +++ b/tools/website_policy.py @@ -3,25 +3,38 @@ This module loads a user-managed website blocklist from ~/.hermes/config.yaml and optional shared list files. It is intentionally lightweight so web/browser tools can enforce URL policy without pulling in the heavier CLI config stack. + +Policy is cached in memory with a short TTL so config changes take effect +quickly without re-reading the file on every URL check. """ from __future__ import annotations import fnmatch +import logging import os +import threading +import time from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse -import yaml - +logger = logging.getLogger(__name__) _DEFAULT_WEBSITE_BLOCKLIST = { - "enabled": True, + "enabled": False, "domains": [], "shared_files": [], } +# Cache: parsed policy + timestamp. Avoids re-reading config.yaml on every +# URL check (a web_crawl with 50 pages would otherwise mean 51 YAML parses). +_CACHE_TTL_SECONDS = 30.0 +_cache_lock = threading.Lock() +_cached_policy: Optional[Dict[str, Any]] = None +_cached_policy_path: Optional[str] = None +_cached_policy_time: float = 0.0 + def _get_hermes_home() -> Path: return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) @@ -55,12 +68,19 @@ def _normalize_rule(rule: Any) -> Optional[str]: def _iter_blocklist_file_rules(path: Path) -> List[str]: + """Load rules from a shared blocklist file. + + Missing or unreadable files log a warning and return an empty list + rather than raising — a bad file path should not disable all web tools. + """ try: raw = path.read_text(encoding="utf-8") - except FileNotFoundError as exc: - raise WebsitePolicyError(f"Shared blocklist file not found: {path}") from exc + except FileNotFoundError: + logger.warning("Shared blocklist file not found (skipping): %s", path) + return [] except (OSError, UnicodeDecodeError) as exc: - raise WebsitePolicyError(f"Failed to read shared blocklist file {path}: {exc}") from exc + logger.warning("Failed to read shared blocklist file %s (skipping): %s", path, exc) + return [] rules: List[str] = [] for line in raw.splitlines(): @@ -77,6 +97,13 @@ def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]: config_path = config_path or _get_default_config_path() if not config_path.exists(): return dict(_DEFAULT_WEBSITE_BLOCKLIST) + + try: + import yaml + except ImportError: + logger.debug("PyYAML not installed — website blocklist disabled") + return dict(_DEFAULT_WEBSITE_BLOCKLIST) + try: with open(config_path, encoding="utf-8") as f: config = yaml.safe_load(f) or {} @@ -105,6 +132,27 @@ def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]: def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]: + """Load and return the parsed website blocklist policy. + + Results are cached for ``_CACHE_TTL_SECONDS`` to avoid re-reading + config.yaml on every URL check. Pass an explicit ``config_path`` + to bypass the cache (used by tests). + """ + global _cached_policy, _cached_policy_path, _cached_policy_time + + resolved_path = str(config_path) if config_path else "__default__" + now = time.monotonic() + + # Return cached policy if still fresh and same path + if config_path is None: + with _cache_lock: + if ( + _cached_policy is not None + and _cached_policy_path == resolved_path + and (now - _cached_policy_time) < _CACHE_TTL_SECONDS + ): + return _cached_policy + config_path = config_path or _get_default_config_path() policy = _load_policy_config(config_path) @@ -142,7 +190,23 @@ def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any] rules.append({"pattern": normalized, "source": str(path)}) seen.add(key) - return {"enabled": enabled, "rules": rules} + result = {"enabled": enabled, "rules": rules} + + # Cache the result (only for the default path — explicit paths are tests) + if config_path == _get_default_config_path(): + with _cache_lock: + _cached_policy = result + _cached_policy_path = "__default__" + _cached_policy_time = now + + return result + + +def invalidate_cache() -> None: + """Force the next ``check_website_access`` call to re-read config.""" + global _cached_policy + with _cache_lock: + _cached_policy = None def _match_host_against_rule(host: str, pattern: str) -> bool: @@ -169,17 +233,45 @@ def _extract_host_from_urlish(url: str) -> str: def check_website_access(url: str, config_path: Optional[Path] = None) -> Optional[Dict[str, str]]: + """Check whether a URL is allowed by the website blocklist policy. + + Returns ``None`` if access is allowed, or a dict with block metadata + (``host``, ``rule``, ``source``, ``message``) if blocked. + + Never raises on policy errors — logs a warning and returns ``None`` + (fail-open) so a config typo doesn't break all web tools. Pass + ``config_path`` explicitly (tests) to get strict error propagation. + """ + # Fast path: if no explicit config_path and the cached policy is disabled + # or empty, skip all work (no YAML read, no host extraction). + if config_path is None: + with _cache_lock: + if _cached_policy is not None and not _cached_policy.get("enabled"): + return None + host = _extract_host_from_urlish(url) if not host: return None - policy = load_website_blocklist(config_path) + try: + policy = load_website_blocklist(config_path) + except WebsitePolicyError as exc: + if config_path is not None: + raise # Tests pass explicit paths — let errors propagate + logger.warning("Website policy config error (failing open): %s", exc) + return None + except Exception as exc: + logger.warning("Unexpected error loading website policy (failing open): %s", exc) + return None + if not policy.get("enabled"): return None for rule in policy.get("rules", []): pattern = rule.get("pattern", "") if _match_host_against_rule(host, pattern): + logger.info("Blocked URL %s — matched rule '%s' from %s", + url, pattern, rule.get("source", "config")) return { "url": url, "host": host,