Files
hermes-agent/tools/website_policy.py
Teknium 77bcaba2d7 refactor: consolidate get_hermes_home() and parse_reasoning_effort() (#3062)
Centralizes two widely-duplicated patterns into hermes_constants.py:

1. get_hermes_home() — Path resolution for ~/.hermes (HERMES_HOME env var)
   - Was copy-pasted inline across 30+ files as:
     Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
   - Now defined once in hermes_constants.py (zero-dependency module)
   - hermes_cli/config.py re-exports it for backward compatibility
   - Removed local wrapper functions in honcho_integration/client.py,
     tools/website_policy.py, tools/tirith_security.py, hermes_cli/uninstall.py

2. parse_reasoning_effort() — Reasoning effort string validation
   - Was copy-pasted in cli.py, gateway/run.py, cron/scheduler.py
   - Same validation logic: check against (xhigh, high, medium, low, minimal, none)
   - Now defined once in hermes_constants.py, called from all 3 locations
   - Warning log for unknown values kept at call sites (context-specific)

31 files changed, net +31 lines (125 insertions, 94 deletions)
Full test suite: 6179 passed, 0 failed
2026-03-25 15:54:28 -07:00

284 lines
9.6 KiB
Python

"""Website access policy helpers for URL-capable tools.
This module loads a user-managed website blocklist from ~/.hermes/config.yaml
and optional shared list files. It is intentionally lightweight so web/browser
tools can enforce URL policy without pulling in the heavier CLI config stack.
Policy is cached in memory with a short TTL so config changes take effect
quickly without re-reading the file on every URL check.
"""
from __future__ import annotations
import fnmatch
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
_DEFAULT_WEBSITE_BLOCKLIST = {
"enabled": False,
"domains": [],
"shared_files": [],
}
# Cache: parsed policy + timestamp. Avoids re-reading config.yaml on every
# URL check (a web_crawl with 50 pages would otherwise mean 51 YAML parses).
_CACHE_TTL_SECONDS = 30.0
_cache_lock = threading.Lock()
_cached_policy: Optional[Dict[str, Any]] = None
_cached_policy_path: Optional[str] = None
_cached_policy_time: float = 0.0
def _get_default_config_path() -> Path:
return get_hermes_home() / "config.yaml"
class WebsitePolicyError(Exception):
"""Raised when a website policy file is malformed."""
def _normalize_host(host: str) -> str:
return (host or "").strip().lower().rstrip(".")
def _normalize_rule(rule: Any) -> Optional[str]:
if not isinstance(rule, str):
return None
value = rule.strip().lower()
if not value or value.startswith("#"):
return None
if "://" in value:
parsed = urlparse(value)
value = parsed.netloc or parsed.path
value = value.split("/", 1)[0].strip().rstrip(".")
if value.startswith("www."):
value = value[4:]
return value or None
def _iter_blocklist_file_rules(path: Path) -> List[str]:
"""Load rules from a shared blocklist file.
Missing or unreadable files log a warning and return an empty list
rather than raising — a bad file path should not disable all web tools.
"""
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError:
logger.warning("Shared blocklist file not found (skipping): %s", path)
return []
except (OSError, UnicodeDecodeError) as exc:
logger.warning("Failed to read shared blocklist file %s (skipping): %s", path, exc)
return []
rules: List[str] = []
for line in raw.splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
normalized = _normalize_rule(stripped)
if normalized:
rules.append(normalized)
return rules
def _load_policy_config(config_path: Optional[Path] = None) -> Dict[str, Any]:
config_path = config_path or _get_default_config_path()
if not config_path.exists():
return dict(_DEFAULT_WEBSITE_BLOCKLIST)
try:
import yaml
except ImportError:
logger.debug("PyYAML not installed — website blocklist disabled")
return dict(_DEFAULT_WEBSITE_BLOCKLIST)
try:
with open(config_path, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
except yaml.YAMLError as exc:
raise WebsitePolicyError(f"Invalid config YAML at {config_path}: {exc}") from exc
except OSError as exc:
raise WebsitePolicyError(f"Failed to read config file {config_path}: {exc}") from exc
if not isinstance(config, dict):
raise WebsitePolicyError("config root must be a mapping")
security = config.get("security", {})
if security is None:
security = {}
if not isinstance(security, dict):
raise WebsitePolicyError("security must be a mapping")
website_blocklist = security.get("website_blocklist", {})
if website_blocklist is None:
website_blocklist = {}
if not isinstance(website_blocklist, dict):
raise WebsitePolicyError("security.website_blocklist must be a mapping")
policy = dict(_DEFAULT_WEBSITE_BLOCKLIST)
policy.update(website_blocklist)
return policy
def load_website_blocklist(config_path: Optional[Path] = None) -> Dict[str, Any]:
"""Load and return the parsed website blocklist policy.
Results are cached for ``_CACHE_TTL_SECONDS`` to avoid re-reading
config.yaml on every URL check. Pass an explicit ``config_path``
to bypass the cache (used by tests).
"""
global _cached_policy, _cached_policy_path, _cached_policy_time
resolved_path = str(config_path) if config_path else "__default__"
now = time.monotonic()
# Return cached policy if still fresh and same path
if config_path is None:
with _cache_lock:
if (
_cached_policy is not None
and _cached_policy_path == resolved_path
and (now - _cached_policy_time) < _CACHE_TTL_SECONDS
):
return _cached_policy
config_path = config_path or _get_default_config_path()
policy = _load_policy_config(config_path)
raw_domains = policy.get("domains", []) or []
if not isinstance(raw_domains, list):
raise WebsitePolicyError("security.website_blocklist.domains must be a list")
raw_shared_files = policy.get("shared_files", []) or []
if not isinstance(raw_shared_files, list):
raise WebsitePolicyError("security.website_blocklist.shared_files must be a list")
enabled = policy.get("enabled", True)
if not isinstance(enabled, bool):
raise WebsitePolicyError("security.website_blocklist.enabled must be a boolean")
rules: List[Dict[str, str]] = []
seen: set[Tuple[str, str]] = set()
for raw_rule in raw_domains:
normalized = _normalize_rule(raw_rule)
if normalized and ("config", normalized) not in seen:
rules.append({"pattern": normalized, "source": "config"})
seen.add(("config", normalized))
for shared_file in raw_shared_files:
if not isinstance(shared_file, str) or not shared_file.strip():
continue
path = Path(shared_file).expanduser()
if not path.is_absolute():
path = (get_hermes_home() / path).resolve()
for normalized in _iter_blocklist_file_rules(path):
key = (str(path), normalized)
if key in seen:
continue
rules.append({"pattern": normalized, "source": str(path)})
seen.add(key)
result = {"enabled": enabled, "rules": rules}
# Cache the result (only for the default path — explicit paths are tests)
if config_path == _get_default_config_path():
with _cache_lock:
_cached_policy = result
_cached_policy_path = "__default__"
_cached_policy_time = now
return result
def invalidate_cache() -> None:
"""Force the next ``check_website_access`` call to re-read config."""
global _cached_policy
with _cache_lock:
_cached_policy = None
def _match_host_against_rule(host: str, pattern: str) -> bool:
if not host or not pattern:
return False
if pattern.startswith("*."):
return fnmatch.fnmatch(host, pattern)
return host == pattern or host.endswith(f".{pattern}")
def _extract_host_from_urlish(url: str) -> str:
parsed = urlparse(url)
host = _normalize_host(parsed.hostname or parsed.netloc)
if host:
return host
if "://" not in url:
schemeless = urlparse(f"//{url}")
host = _normalize_host(schemeless.hostname or schemeless.netloc)
if host:
return host
return ""
def check_website_access(url: str, config_path: Optional[Path] = None) -> Optional[Dict[str, str]]:
"""Check whether a URL is allowed by the website blocklist policy.
Returns ``None`` if access is allowed, or a dict with block metadata
(``host``, ``rule``, ``source``, ``message``) if blocked.
Never raises on policy errors — logs a warning and returns ``None``
(fail-open) so a config typo doesn't break all web tools. Pass
``config_path`` explicitly (tests) to get strict error propagation.
"""
# Fast path: if no explicit config_path and the cached policy is disabled
# or empty, skip all work (no YAML read, no host extraction).
if config_path is None:
with _cache_lock:
if _cached_policy is not None and not _cached_policy.get("enabled"):
return None
host = _extract_host_from_urlish(url)
if not host:
return None
try:
policy = load_website_blocklist(config_path)
except WebsitePolicyError as exc:
if config_path is not None:
raise # Tests pass explicit paths — let errors propagate
logger.warning("Website policy config error (failing open): %s", exc)
return None
except Exception as exc:
logger.warning("Unexpected error loading website policy (failing open): %s", exc)
return None
if not policy.get("enabled"):
return None
for rule in policy.get("rules", []):
pattern = rule.get("pattern", "")
if _match_host_against_rule(host, pattern):
logger.info("Blocked URL %s — matched rule '%s' from %s",
url, pattern, rule.get("source", "config"))
return {
"url": url,
"host": host,
"rule": pattern,
"source": rule.get("source", "config"),
"message": (
f"Blocked by website policy: '{host}' matched rule '{pattern}'"
f" from {rule.get('source', 'config')}"
),
}
return None