fix: harden website blocklist — default off, TTL cache, fail-open, guarded imports

- Default enabled: false (zero overhead when not configured)
- Fast path: cached disabled state skips all work immediately
- TTL cache (30s) for parsed policy — avoids re-reading config.yaml
  on every URL check
- Missing shared files warn + skip instead of crashing all web tools
- Lazy yaml import — missing PyYAML doesn't break browser toolset
- Guarded browser_tool import — fail-open lambda fallback
- check_website_access never raises for default path (fail-open with
  warning log); only raises with explicit config_path (test mode)
- Simplified enforcement code in web_tools/browser_tool — no more
  try/except wrappers since errors are handled internally
This commit is contained in:
teknium1
2026-03-17 03:11:21 -07:00
parent d132a3dfbb
commit 6fc76ef954
5 changed files with 136 additions and 53 deletions

View File

@@ -356,7 +356,7 @@ DEFAULT_CONFIG = {
"tirith_timeout": 5,
"tirith_fail_open": True,
"website_blocklist": {
"enabled": True,
"enabled": False,
"domains": [],
"shared_files": [],
},

View File

@@ -89,7 +89,7 @@ def test_default_config_exposes_website_blocklist_shape():
from hermes_cli.config import DEFAULT_CONFIG
website_blocklist = DEFAULT_CONFIG["security"]["website_blocklist"]
assert website_blocklist["enabled"] is True
assert website_blocklist["enabled"] is False
assert website_blocklist["domains"] == []
assert website_blocklist["shared_files"] == []
@@ -100,7 +100,7 @@ def test_load_website_blocklist_uses_enabled_default_when_section_missing(tmp_pa
policy = load_website_blocklist(config_path)
assert policy == {"enabled": True, "rules": []}
assert policy == {"enabled": False, "rules": []}
def test_load_website_blocklist_raises_clean_error_for_invalid_domains_type(tmp_path):
@@ -232,8 +232,11 @@ def test_load_website_blocklist_wraps_shared_file_read_errors(tmp_path, monkeypa
monkeypatch.setattr(Path, "read_text", failing_read_text)
with pytest.raises(WebsitePolicyError, match="Failed to read shared blocklist file"):
load_website_blocklist(config_path)
# Unreadable shared files are now warned and skipped (not raised),
# so the blocklist loads successfully but without those rules.
result = load_website_blocklist(config_path)
assert result["enabled"] is True
assert result["rules"] == [] # shared file rules skipped
def test_check_website_access_uses_dynamic_hermes_home(monkeypatch, tmp_path):
@@ -311,7 +314,8 @@ def test_browser_navigate_returns_policy_block(monkeypatch):
assert result["blocked_by_policy"]["rule"] == "blocked.test"
def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(monkeypatch, tmp_path):
def test_browser_navigate_allows_when_shared_file_missing(monkeypatch, tmp_path):
"""Missing shared blocklist files are warned and skipped, not fatal."""
from tools import browser_tool
config_path = tmp_path / "config.yaml"
@@ -330,12 +334,9 @@ def test_browser_navigate_returns_clean_policy_error_for_missing_shared_file(mon
encoding="utf-8",
)
monkeypatch.setattr(browser_tool, "check_website_access", lambda url: check_website_access(url, config_path=config_path))
result = json.loads(browser_tool.browser_navigate("https://allowed.test"))
assert result["success"] is False
assert "Website policy error" in result["error"]
# check_website_access should return None (allow) — missing file is skipped
result = check_website_access("https://allowed.test", config_path=config_path)
assert result is None
@pytest.mark.asyncio
@@ -365,20 +366,23 @@ async def test_web_extract_short_circuits_blocked_url(monkeypatch):
assert "Blocked by website policy" in result["results"][0]["error"]
@pytest.mark.asyncio
async def test_web_extract_returns_clean_policy_error_for_malformed_config(monkeypatch, tmp_path):
from tools import web_tools
def test_check_website_access_fails_open_on_malformed_config(tmp_path, monkeypatch):
"""Malformed config with default path should fail open (return None), not crash."""
config_path = tmp_path / "config.yaml"
config_path.write_text("security: [oops\n", encoding="utf-8")
monkeypatch.setattr(web_tools, "check_website_access", lambda url: check_website_access(url, config_path=config_path))
monkeypatch.setattr("tools.interrupt.is_interrupted", lambda: False)
# With explicit config_path (test mode), errors propagate
with pytest.raises(WebsitePolicyError):
check_website_access("https://example.com", config_path=config_path)
result = json.loads(await web_tools.web_extract_tool(["https://allowed.test"], use_llm_processing=False))
# Simulate default path by pointing HERMES_HOME to tmp_path
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools import website_policy
website_policy.invalidate_cache()
assert result["results"][0]["url"] == "https://allowed.test"
assert "Website policy error" in result["results"][0]["error"]
# With default path, errors are caught and fail open
result = check_website_access("https://example.com")
assert result is None # allowed, not crashed
@pytest.mark.asyncio

View File

@@ -65,7 +65,11 @@ import requests
from typing import Dict, Any, Optional, List
from pathlib import Path
from agent.auxiliary_client import call_llm
from tools.website_policy import check_website_access
try:
from tools.website_policy import check_website_access
except Exception:
check_website_access = lambda url: None # noqa: E731 — fail-open if policy module unavailable
from tools.browser_providers.base import CloudBrowserProvider
from tools.browser_providers.browserbase import BrowserbaseProvider
from tools.browser_providers.browser_use import BrowserUseProvider
@@ -903,12 +907,8 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
JSON string with navigation result (includes stealth features info on first nav)
"""
# Website policy check — block before navigating
try:
blocked = check_website_access(url)
except Exception as _policy_err:
return json.dumps({"success": False, "error": f"Website policy error: {_policy_err}"})
blocked = check_website_access(url)
if blocked:
logger.info("Blocked browser_navigate to %s by rule %s", blocked["host"], blocked["rule"])
return json.dumps({
"success": False,
"error": blocked["message"],

View File

@@ -49,7 +49,7 @@ from typing import List, Dict, Any, Optional
from firecrawl import Firecrawl
from agent.auxiliary_client import async_call_llm
from tools.debug_helpers import DebugSession
from tools.website_policy import WebsitePolicyError, check_website_access
from tools.website_policy import check_website_access
logger = logging.getLogger(__name__)
@@ -618,11 +618,7 @@ async def web_extract_tool(
continue
# Website policy check — block before fetching
try:
blocked = check_website_access(url)
except WebsitePolicyError as policy_err:
results.append({"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"})
continue
blocked = check_website_access(url)
if blocked:
logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"])
results.append({
@@ -687,10 +683,7 @@ async def web_extract_tool(
# Re-check final URL after redirect
final_url = metadata.get("sourceURL", url)
try:
final_blocked = check_website_access(final_url)
except WebsitePolicyError:
final_blocked = None
final_blocked = check_website_access(final_url)
if final_blocked:
logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"])
results.append({
@@ -903,10 +896,7 @@ async def web_crawl_tool(
logger.info("Crawling %s%s", url, instructions_text)
# Website policy check — block before crawling
try:
blocked = check_website_access(url)
except WebsitePolicyError as policy_err:
return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": f"Website policy error: {policy_err}"}]}, ensure_ascii=False)
blocked = check_website_access(url)
if blocked:
logger.info("Blocked web_crawl for %s by rule %s", blocked["host"], blocked["rule"])
return json.dumps({"results": [{"url": url, "title": "", "content": "", "error": blocked["message"],
@@ -1018,10 +1008,7 @@ async def web_crawl_tool(
title = metadata.get("title", "")
# Re-check crawled page URL against policy
try:
page_blocked = check_website_access(page_url)
except WebsitePolicyError:
page_blocked = None
page_blocked = check_website_access(page_url)
if page_blocked:
logger.info("Blocked crawled page %s by rule %s", page_blocked["host"], page_blocked["rule"])
pages.append({

View File

@@ -3,25 +3,38 @@
This module loads a user-managed website blocklist from ~/.hermes/config.yaml
and optional shared list files. It is intentionally lightweight so web/browser
tools can enforce URL policy without pulling in the heavier CLI config stack.
Policy is cached in memory with a short TTL so config changes take effect
quickly without re-reading the file on every URL check.
"""
from __future__ import annotations
import fnmatch
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
import yaml
logger = logging.getLogger(__name__)
_DEFAULT_WEBSITE_BLOCKLIST = {
"enabled": True,
"enabled": False,
"domains": [],
"shared_files": [],
}
# Cache: parsed policy + timestamp. Avoids re-reading config.yaml on every
# URL check (a web_crawl with 50 pages would otherwise mean 51 YAML parses).
_CACHE_TTL_SECONDS = 30.0
_cache_lock = threading.Lock()
_cached_policy: Optional[Dict[str, Any]] = None
_cached_policy_path: Optional[str] = None
_cached_policy_time: float = 0.0
def _get_hermes_home() -> Path:
return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
@@ -55,12 +68,19 @@ def _normalize_rule(rule: Any) -> Optional[str]:
def _iter_blocklist_file_rules(path: Path) -> List[str]:
"""Load rules from a shared blocklist file.
Missing or unreadable files log a warning and return an empty list
rather than raising — a bad file path should not disable all web tools.
"""
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise WebsitePolicyError(f"Shared blocklist file not found: {path}") from exc
except FileNotFoundError:
logger.warning("Shared blocklist file not found (skipping): %s", path)
return []
except (OSError, UnicodeDecodeError) as exc:
raise WebsitePolicyError(f"Failed to read shared blocklist file {path}: {exc}") from exc
logger.warning("Failed to read shared blocklist file %s (skipping): %s", path, exc)
return []
rules: List[str] = []
for line in raw.splitlines():
@@ -77,6 +97,13 @@ def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
config_path = config_path or _get_default_config_path()
if not config_path.exists():
return dict(_DEFAULT_WEBSITE_BLOCKLIST)
try:
import yaml
except ImportError:
logger.debug("PyYAML not installed — website blocklist disabled")
return dict(_DEFAULT_WEBSITE_BLOCKLIST)
try:
with open(config_path, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
@@ -105,6 +132,27 @@ def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]:
"""Load and return the parsed website blocklist policy.
Results are cached for ``_CACHE_TTL_SECONDS`` to avoid re-reading
config.yaml on every URL check. Pass an explicit ``config_path``
to bypass the cache (used by tests).
"""
global _cached_policy, _cached_policy_path, _cached_policy_time
resolved_path = str(config_path) if config_path else "__default__"
now = time.monotonic()
# Return cached policy if still fresh and same path
if config_path is None:
with _cache_lock:
if (
_cached_policy is not None
and _cached_policy_path == resolved_path
and (now - _cached_policy_time) < _CACHE_TTL_SECONDS
):
return _cached_policy
config_path = config_path or _get_default_config_path()
policy = _load_policy_config(config_path)
@@ -142,7 +190,23 @@ def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]
rules.append({"pattern": normalized, "source": str(path)})
seen.add(key)
return {"enabled": enabled, "rules": rules}
result = {"enabled": enabled, "rules": rules}
# Cache the result (only for the default path — explicit paths are tests)
if config_path == _get_default_config_path():
with _cache_lock:
_cached_policy = result
_cached_policy_path = "__default__"
_cached_policy_time = now
return result
def invalidate_cache() -> None:
"""Force the next ``check_website_access`` call to re-read config."""
global _cached_policy
with _cache_lock:
_cached_policy = None
def _match_host_against_rule(host: str, pattern: str) -> bool:
@@ -169,17 +233,45 @@ def _extract_host_from_urlish(url: str) -> str:
def check_website_access(url: str, config_path: Optional[Path] = None) -> Optional[Dict[str, str]]:
"""Check whether a URL is allowed by the website blocklist policy.
Returns ``None`` if access is allowed, or a dict with block metadata
(``host``, ``rule``, ``source``, ``message``) if blocked.
Never raises on policy errors — logs a warning and returns ``None``
(fail-open) so a config typo doesn't break all web tools. Pass
``config_path`` explicitly (tests) to get strict error propagation.
"""
# Fast path: if no explicit config_path and the cached policy is disabled
# or empty, skip all work (no YAML read, no host extraction).
if config_path is None:
with _cache_lock:
if _cached_policy is not None and not _cached_policy.get("enabled"):
return None
host = _extract_host_from_urlish(url)
if not host:
return None
policy = load_website_blocklist(config_path)
try:
policy = load_website_blocklist(config_path)
except WebsitePolicyError as exc:
if config_path is not None:
raise # Tests pass explicit paths — let errors propagate
logger.warning("Website policy config error (failing open): %s", exc)
return None
except Exception as exc:
logger.warning("Unexpected error loading website policy (failing open): %s", exc)
return None
if not policy.get("enabled"):
return None
for rule in policy.get("rules", []):
pattern = rule.get("pattern", "")
if _match_host_against_rule(host, pattern):
logger.info("Blocked URL %s — matched rule '%s' from %s",
url, pattern, rule.get("source", "config"))
return {
"url": url,
"host": host,