From 878b1d3d33490ae53e4e30f92de78de2315044f9 Mon Sep 17 00:00:00 2001 From: Awsh1 Date: Mon, 6 Apr 2026 12:12:45 -0700 Subject: [PATCH] fix(cron): harden scheduler against path traversal and env leaks Cherry-picked from PR #5503 by Awsh1. - Validate ALL script paths (absolute, relative, tilde) against scripts_dir boundary - Add API-boundary validation in cronjob_tools.py - Move os.environ injections inside try block so finally cleanup always runs - Comprehensive regression tests for path containment bypass --- cron/scheduler.py | 52 ++++--- tests/cron/test_cron_script.py | 275 +++++++++++++++++++++++++++++++-- tools/cronjob_tools.py | 49 ++++++ 3 files changed, 349 insertions(+), 27 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 606a9ba7b..63018d6ff 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -297,8 +297,15 @@ _SCRIPT_TIMEOUT = 120 # seconds def _run_job_script(script_path: str) -> tuple[bool, str]: """Execute a cron job's data-collection script and capture its output. + Scripts must reside within HERMES_HOME/scripts/. Both relative and + absolute paths are resolved and validated against this directory to + prevent arbitrary script execution via path traversal or absolute + path injection. + Args: - script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute). + script_path: Path to a Python script. Relative paths are resolved + against HERMES_HOME/scripts/. Absolute and ~-prefixed paths + are also validated to ensure they stay within the scripts dir. Returns: (success, output) — on failure *output* contains the error message so the @@ -306,16 +313,25 @@ def _run_job_script(script_path: str) -> tuple[bool, str]: """ from hermes_constants import get_hermes_home - path = Path(script_path).expanduser() - if not path.is_absolute(): - # Resolve relative paths against HERMES_HOME/scripts/ - scripts_dir = get_hermes_home() / "scripts" - path = (scripts_dir / path).resolve() - # Guard against path traversal (e.g. "../../etc/passwd") - try: - path.relative_to(scripts_dir.resolve()) - except ValueError: - return False, f"Script path escapes the scripts directory: {script_path!r}" + scripts_dir = get_hermes_home() / "scripts" + scripts_dir.mkdir(parents=True, exist_ok=True) + scripts_dir_resolved = scripts_dir.resolve() + + raw = Path(script_path).expanduser() + if raw.is_absolute(): + path = raw.resolve() + else: + path = (scripts_dir / raw).resolve() + + # Guard against path traversal, absolute path injection, and symlink + # escape — scripts MUST reside within HERMES_HOME/scripts/. + try: + path.relative_to(scripts_dir_resolved) + except ValueError: + return False, ( + f"Blocked: script path resolves outside the scripts directory " + f"({scripts_dir_resolved}): {script_path!r}" + ) if not path.exists(): return False, f"Script not found: {path}" @@ -473,14 +489,14 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) - # Inject origin context so the agent's send_message tool knows the chat - if origin: - os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"] - os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"]) - if origin.get("chat_name"): - os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"] - try: + # Inject origin context so the agent's send_message tool knows the chat. + # Must be INSIDE the try block so the finally cleanup always runs. + if origin: + os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"] + os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"]) + if origin.get("chat_name"): + os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"] # Re-read .env and config.yaml fresh every run so provider/key # changes take effect without a gateway restart. from dotenv import load_dotenv diff --git a/tests/cron/test_cron_script.py b/tests/cron/test_cron_script.py index e83396354..d7f278aa9 100644 --- a/tests/cron/test_cron_script.py +++ b/tests/cron/test_cron_script.py @@ -114,7 +114,7 @@ class TestRunJobScript: def test_script_not_found(self, cron_env): from cron.scheduler import _run_job_script - success, output = _run_job_script("/nonexistent/script.py") + success, output = _run_job_script("nonexistent_script.py") assert success is False assert "not found" in output.lower() @@ -198,7 +198,7 @@ class TestBuildJobPromptWithScript: job = { "prompt": "Report status.", - "script": "/nonexistent/script.py", + "script": "nonexistent_monitor.py", } prompt = _build_job_prompt(job) assert "## Script Error" in prompt @@ -239,10 +239,10 @@ class TestCronjobToolScript: action="create", schedule="every 1h", prompt="Monitor things", - script="/home/user/monitor.py", + script="monitor.py", )) assert result["success"] is True - assert result["job"]["script"] == "/home/user/monitor.py" + assert result["job"]["script"] == "monitor.py" def test_update_script(self, cron_env, monkeypatch): monkeypatch.setenv("HERMES_INTERACTIVE", "1") @@ -258,10 +258,10 @@ class TestCronjobToolScript: update_result = json.loads(cronjob( action="update", job_id=job_id, - script="/new/script.py", + script="new_script.py", )) assert update_result["success"] is True - assert update_result["job"]["script"] == "/new/script.py" + assert update_result["job"]["script"] == "new_script.py" def test_clear_script(self, cron_env, monkeypatch): monkeypatch.setenv("HERMES_INTERACTIVE", "1") @@ -271,7 +271,7 @@ class TestCronjobToolScript: action="create", schedule="every 1h", prompt="Monitor things", - script="/some/script.py", + script="some_script.py", )) job_id = create_result["job_id"] @@ -291,10 +291,267 @@ class TestCronjobToolScript: action="create", schedule="every 1h", prompt="Monitor things", - script="/path/to/script.py", + script="data_collector.py", ) list_result = json.loads(cronjob(action="list")) assert list_result["success"] is True assert len(list_result["jobs"]) == 1 - assert list_result["jobs"][0]["script"] == "/path/to/script.py" + assert list_result["jobs"][0]["script"] == "data_collector.py" + + +class TestScriptPathContainment: + """Regression tests for path containment bypass in _run_job_script(). + + Prior to the fix, absolute paths and ~-prefixed paths bypassed the + scripts_dir containment check entirely, allowing arbitrary script + execution through the cron system. + """ + + def test_absolute_path_outside_scripts_dir_blocked(self, cron_env): + """Absolute paths outside ~/.hermes/scripts/ must be rejected.""" + from cron.scheduler import _run_job_script + + # Create a script outside the scripts dir + outside_script = cron_env / "outside.py" + outside_script.write_text('print("should not run")\n') + + success, output = _run_job_script(str(outside_script)) + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + def test_absolute_path_tmp_blocked(self, cron_env): + """Absolute paths to /tmp must be rejected.""" + from cron.scheduler import _run_job_script + + success, output = _run_job_script("/tmp/evil.py") + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + def test_tilde_path_blocked(self, cron_env): + """~ prefixed paths must be rejected (expanduser bypasses check).""" + from cron.scheduler import _run_job_script + + success, output = _run_job_script("~/evil.py") + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + def test_tilde_traversal_blocked(self, cron_env): + """~/../../../tmp/evil.py must be rejected.""" + from cron.scheduler import _run_job_script + + success, output = _run_job_script("~/../../../tmp/evil.py") + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + def test_relative_traversal_still_blocked(self, cron_env): + """../../etc/passwd style traversal must still be blocked.""" + from cron.scheduler import _run_job_script + + success, output = _run_job_script("../../etc/passwd") + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + def test_relative_path_inside_scripts_dir_allowed(self, cron_env): + """Relative paths within the scripts dir should still work.""" + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "good.py" + script.write_text('print("ok")\n') + + success, output = _run_job_script("good.py") + assert success is True + assert output == "ok" + + def test_subdirectory_inside_scripts_dir_allowed(self, cron_env): + """Relative paths to subdirectories within scripts/ should work.""" + from cron.scheduler import _run_job_script + + subdir = cron_env / "scripts" / "monitors" + subdir.mkdir() + script = subdir / "check.py" + script.write_text('print("sub ok")\n') + + success, output = _run_job_script("monitors/check.py") + assert success is True + assert output == "sub ok" + + def test_absolute_path_inside_scripts_dir_allowed(self, cron_env): + """Absolute paths that resolve WITHIN scripts/ should work.""" + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "abs_ok.py" + script.write_text('print("abs ok")\n') + + success, output = _run_job_script(str(script)) + assert success is True + assert output == "abs ok" + + @pytest.mark.skipif( + sys.platform == "win32", + reason="Symlinks require elevated privileges on Windows", + ) + def test_symlink_escape_blocked(self, cron_env, tmp_path): + """Symlinks pointing outside scripts/ must be rejected.""" + from cron.scheduler import _run_job_script + + # Create a script outside the scripts dir + outside = tmp_path / "outside_evil.py" + outside.write_text('print("escaped")\n') + + # Create a symlink inside scripts/ pointing outside + link = cron_env / "scripts" / "sneaky.py" + link.symlink_to(outside) + + success, output = _run_job_script("sneaky.py") + assert success is False + assert "blocked" in output.lower() or "outside" in output.lower() + + +class TestCronjobToolScriptValidation: + """Test API-boundary validation of cron script paths in cronjob_tools.""" + + def test_create_with_absolute_script_rejected(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="/home/user/evil.py", + )) + assert result["success"] is False + assert "relative" in result["error"].lower() or "absolute" in result["error"].lower() + + def test_create_with_tilde_script_rejected(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="~/monitor.py", + )) + assert result["success"] is False + assert "relative" in result["error"].lower() or "absolute" in result["error"].lower() + + def test_create_with_traversal_script_rejected(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="../../etc/passwd", + )) + assert result["success"] is False + assert "escapes" in result["error"].lower() or "traversal" in result["error"].lower() + + def test_create_with_relative_script_allowed(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="monitor.py", + )) + assert result["success"] is True + assert result["job"]["script"] == "monitor.py" + + def test_update_with_absolute_script_rejected(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + create_result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + )) + job_id = create_result["job_id"] + + update_result = json.loads(cronjob( + action="update", + job_id=job_id, + script="/tmp/evil.py", + )) + assert update_result["success"] is False + assert "relative" in update_result["error"].lower() or "absolute" in update_result["error"].lower() + + def test_update_clear_script_allowed(self, cron_env, monkeypatch): + """Clearing a script (empty string) should always be permitted.""" + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + create_result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="monitor.py", + )) + job_id = create_result["job_id"] + + update_result = json.loads(cronjob( + action="update", + job_id=job_id, + script="", + )) + assert update_result["success"] is True + assert "script" not in update_result["job"] + + def test_windows_absolute_path_rejected(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="C:\\Users\\evil\\script.py", + )) + assert result["success"] is False + + +class TestRunJobEnvVarCleanup: + """Test that run_job() env vars are cleaned up even on early failure.""" + + def test_env_vars_cleaned_on_early_error(self, cron_env, monkeypatch): + """Origin env vars must be cleaned up even if run_job fails early.""" + # Ensure env vars are clean before test + for key in ( + "HERMES_SESSION_PLATFORM", + "HERMES_SESSION_CHAT_ID", + "HERMES_SESSION_CHAT_NAME", + ): + monkeypatch.delenv(key, raising=False) + + # Build a job with origin info that will fail during execution + # (no valid model, no API key — will raise inside try block) + job = { + "id": "test-envleak", + "name": "env-leak-test", + "prompt": "test", + "schedule_display": "every 1h", + "origin": { + "platform": "telegram", + "chat_id": "12345", + "chat_name": "Test Chat", + }, + } + + from cron.scheduler import run_job + + # Expect it to fail (no model/API key), but env vars must be cleaned + try: + run_job(job) + except Exception: + pass + + # Verify env vars were cleaned up by the finally block + assert os.environ.get("HERMES_SESSION_PLATFORM") is None + assert os.environ.get("HERMES_SESSION_CHAT_ID") is None + assert os.environ.get("HERMES_SESSION_CHAT_NAME") is None diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 965cfe130..eb13240b1 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -112,6 +112,45 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash: return text or None +def _validate_cron_script_path(script: Optional[str]) -> Optional[str]: + """Validate a cron job script path at the API boundary. + + Scripts must be relative paths that resolve within HERMES_HOME/scripts/. + Absolute paths and ~ expansion are rejected to prevent arbitrary script + execution via prompt injection. + + Returns an error string if blocked, else None (valid). + """ + if not script or not script.strip(): + return None # empty/None = clearing the field, always OK + + from pathlib import Path + from hermes_constants import get_hermes_home + + raw = script.strip() + + # Reject absolute paths and ~ expansion at the API boundary. + # Only relative paths within ~/.hermes/scripts/ are allowed. + if raw.startswith(("/", "~")) or (len(raw) >= 2 and raw[1] == ":"): + return ( + f"Script path must be relative to ~/.hermes/scripts/. " + f"Got absolute or home-relative path: {raw!r}. " + f"Place scripts in ~/.hermes/scripts/ and use just the filename." + ) + + # Validate containment after resolution + scripts_dir = get_hermes_home() / "scripts" + scripts_dir.mkdir(parents=True, exist_ok=True) + resolved = (scripts_dir / raw).resolve() + try: + resolved.relative_to(scripts_dir.resolve()) + except ValueError: + return ( + f"Script path escapes the scripts directory via traversal: {raw!r}" + ) + + return None + def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: prompt = job.get("prompt", "") @@ -176,6 +215,12 @@ def cronjob( if scan_error: return json.dumps({"success": False, "error": scan_error}, indent=2) + # Validate script path before storing + if script: + script_error = _validate_cron_script_path(script) + if script_error: + return json.dumps({"success": False, "error": script_error}, indent=2) + job = create_job( prompt=prompt or "", schedule=schedule, @@ -272,6 +317,10 @@ def cronjob( updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True) if script is not None: # Pass empty string to clear an existing script + if script: + script_error = _validate_cron_script_path(script) + if script_error: + return json.dumps({"success": False, "error": script_error}, indent=2) updates["script"] = _normalize_optional_job_value(script) if script else None if repeat is not None: # Normalize: treat 0 or negative as None (infinite)