Cherry-picked from PR #5503 by Awsh1. - Validate ALL script paths (absolute, relative, tilde) against scripts_dir boundary - Add API-boundary validation in cronjob_tools.py - Move os.environ injections inside try block so finally cleanup always runs - Comprehensive regression tests for path containment bypass
558 lines
19 KiB
Python
558 lines
19 KiB
Python
"""Tests for cron job script injection feature.
|
|
|
|
Tests cover:
|
|
- Script field in job creation / storage / update
|
|
- Script execution and output injection into prompts
|
|
- Error handling (missing script, timeout, non-zero exit)
|
|
- Path resolution (absolute, relative to HERMES_HOME/scripts/)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import stat
|
|
import sys
|
|
import textwrap
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
# Ensure project root is importable
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
|
|
@pytest.fixture
|
|
def cron_env(tmp_path, monkeypatch):
|
|
"""Isolated cron environment with temp HERMES_HOME."""
|
|
hermes_home = tmp_path / ".hermes"
|
|
hermes_home.mkdir()
|
|
(hermes_home / "cron").mkdir()
|
|
(hermes_home / "cron" / "output").mkdir()
|
|
(hermes_home / "scripts").mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
|
|
|
# Clear cached module-level paths
|
|
import cron.jobs as jobs_mod
|
|
monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home)
|
|
monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron")
|
|
monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json")
|
|
monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output")
|
|
|
|
return hermes_home
|
|
|
|
|
|
class TestJobScriptField:
|
|
"""Test that the script field is stored and retrieved correctly."""
|
|
|
|
def test_create_job_with_script(self, cron_env):
|
|
from cron.jobs import create_job, get_job
|
|
|
|
job = create_job(
|
|
prompt="Analyze the data",
|
|
schedule="every 30m",
|
|
script="/path/to/monitor.py",
|
|
)
|
|
assert job["script"] == "/path/to/monitor.py"
|
|
|
|
loaded = get_job(job["id"])
|
|
assert loaded["script"] == "/path/to/monitor.py"
|
|
|
|
def test_create_job_without_script(self, cron_env):
|
|
from cron.jobs import create_job
|
|
|
|
job = create_job(prompt="Hello", schedule="every 1h")
|
|
assert job.get("script") is None
|
|
|
|
def test_create_job_empty_script_normalized_to_none(self, cron_env):
|
|
from cron.jobs import create_job
|
|
|
|
job = create_job(prompt="Hello", schedule="every 1h", script=" ")
|
|
assert job.get("script") is None
|
|
|
|
def test_update_job_add_script(self, cron_env):
|
|
from cron.jobs import create_job, update_job
|
|
|
|
job = create_job(prompt="Hello", schedule="every 1h")
|
|
assert job.get("script") is None
|
|
|
|
updated = update_job(job["id"], {"script": "/new/script.py"})
|
|
assert updated["script"] == "/new/script.py"
|
|
|
|
def test_update_job_clear_script(self, cron_env):
|
|
from cron.jobs import create_job, update_job
|
|
|
|
job = create_job(prompt="Hello", schedule="every 1h", script="/some/script.py")
|
|
assert job["script"] == "/some/script.py"
|
|
|
|
updated = update_job(job["id"], {"script": None})
|
|
assert updated.get("script") is None
|
|
|
|
|
|
class TestRunJobScript:
|
|
"""Test the _run_job_script() function."""
|
|
|
|
def test_successful_script(self, cron_env):
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "test.py"
|
|
script.write_text('print("hello from script")\n')
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is True
|
|
assert output == "hello from script"
|
|
|
|
def test_script_relative_path(self, cron_env):
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "relative.py"
|
|
script.write_text('print("relative works")\n')
|
|
|
|
success, output = _run_job_script("relative.py")
|
|
assert success is True
|
|
assert output == "relative works"
|
|
|
|
def test_script_not_found(self, cron_env):
|
|
from cron.scheduler import _run_job_script
|
|
|
|
success, output = _run_job_script("nonexistent_script.py")
|
|
assert success is False
|
|
assert "not found" in output.lower()
|
|
|
|
def test_script_nonzero_exit(self, cron_env):
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "fail.py"
|
|
script.write_text(textwrap.dedent("""\
|
|
import sys
|
|
print("partial output")
|
|
print("error info", file=sys.stderr)
|
|
sys.exit(1)
|
|
"""))
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is False
|
|
assert "exited with code 1" in output
|
|
assert "error info" in output
|
|
|
|
def test_script_empty_output(self, cron_env):
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "empty.py"
|
|
script.write_text("# no output\n")
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is True
|
|
assert output == ""
|
|
|
|
def test_script_timeout(self, cron_env, monkeypatch):
|
|
from cron import scheduler as sched_mod
|
|
from cron.scheduler import _run_job_script
|
|
|
|
# Use a very short timeout
|
|
monkeypatch.setattr(sched_mod, "_SCRIPT_TIMEOUT", 1)
|
|
|
|
script = cron_env / "scripts" / "slow.py"
|
|
script.write_text("import time; time.sleep(30)\n")
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is False
|
|
assert "timed out" in output.lower()
|
|
|
|
def test_script_json_output(self, cron_env):
|
|
"""Scripts can output structured JSON for the LLM to parse."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "json_out.py"
|
|
script.write_text(textwrap.dedent("""\
|
|
import json
|
|
data = {"new_prs": [{"number": 42, "title": "Fix bug"}]}
|
|
print(json.dumps(data, indent=2))
|
|
"""))
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is True
|
|
parsed = json.loads(output)
|
|
assert parsed["new_prs"][0]["number"] == 42
|
|
|
|
|
|
class TestBuildJobPromptWithScript:
|
|
"""Test that script output is injected into the prompt."""
|
|
|
|
def test_script_output_injected(self, cron_env):
|
|
from cron.scheduler import _build_job_prompt
|
|
|
|
script = cron_env / "scripts" / "data.py"
|
|
script.write_text('print("new PR: #123 fix typo")\n')
|
|
|
|
job = {
|
|
"prompt": "Report any notable changes.",
|
|
"script": str(script),
|
|
}
|
|
prompt = _build_job_prompt(job)
|
|
assert "## Script Output" in prompt
|
|
assert "new PR: #123 fix typo" in prompt
|
|
assert "Report any notable changes." in prompt
|
|
|
|
def test_script_error_injected(self, cron_env):
|
|
from cron.scheduler import _build_job_prompt
|
|
|
|
job = {
|
|
"prompt": "Report status.",
|
|
"script": "nonexistent_monitor.py",
|
|
}
|
|
prompt = _build_job_prompt(job)
|
|
assert "## Script Error" in prompt
|
|
assert "not found" in prompt.lower()
|
|
assert "Report status." in prompt
|
|
|
|
def test_no_script_unchanged(self, cron_env):
|
|
from cron.scheduler import _build_job_prompt
|
|
|
|
job = {"prompt": "Simple job."}
|
|
prompt = _build_job_prompt(job)
|
|
assert "## Script Output" not in prompt
|
|
assert "Simple job." in prompt
|
|
|
|
def test_script_empty_output_noted(self, cron_env):
|
|
from cron.scheduler import _build_job_prompt
|
|
|
|
script = cron_env / "scripts" / "noop.py"
|
|
script.write_text("# nothing\n")
|
|
|
|
job = {
|
|
"prompt": "Check status.",
|
|
"script": str(script),
|
|
}
|
|
prompt = _build_job_prompt(job)
|
|
assert "no output" in prompt.lower()
|
|
assert "Check status." in prompt
|
|
|
|
|
|
class TestCronjobToolScript:
|
|
"""Test the cronjob tool's script parameter."""
|
|
|
|
def test_create_with_script(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="monitor.py",
|
|
))
|
|
assert result["success"] is True
|
|
assert result["job"]["script"] == "monitor.py"
|
|
|
|
def test_update_script(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
create_result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
))
|
|
job_id = create_result["job_id"]
|
|
|
|
update_result = json.loads(cronjob(
|
|
action="update",
|
|
job_id=job_id,
|
|
script="new_script.py",
|
|
))
|
|
assert update_result["success"] is True
|
|
assert update_result["job"]["script"] == "new_script.py"
|
|
|
|
def test_clear_script(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
create_result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="some_script.py",
|
|
))
|
|
job_id = create_result["job_id"]
|
|
|
|
update_result = json.loads(cronjob(
|
|
action="update",
|
|
job_id=job_id,
|
|
script="",
|
|
))
|
|
assert update_result["success"] is True
|
|
assert "script" not in update_result["job"]
|
|
|
|
def test_list_shows_script(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="data_collector.py",
|
|
)
|
|
|
|
list_result = json.loads(cronjob(action="list"))
|
|
assert list_result["success"] is True
|
|
assert len(list_result["jobs"]) == 1
|
|
assert list_result["jobs"][0]["script"] == "data_collector.py"
|
|
|
|
|
|
class TestScriptPathContainment:
|
|
"""Regression tests for path containment bypass in _run_job_script().
|
|
|
|
Prior to the fix, absolute paths and ~-prefixed paths bypassed the
|
|
scripts_dir containment check entirely, allowing arbitrary script
|
|
execution through the cron system.
|
|
"""
|
|
|
|
def test_absolute_path_outside_scripts_dir_blocked(self, cron_env):
|
|
"""Absolute paths outside ~/.hermes/scripts/ must be rejected."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
# Create a script outside the scripts dir
|
|
outside_script = cron_env / "outside.py"
|
|
outside_script.write_text('print("should not run")\n')
|
|
|
|
success, output = _run_job_script(str(outside_script))
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
def test_absolute_path_tmp_blocked(self, cron_env):
|
|
"""Absolute paths to /tmp must be rejected."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
success, output = _run_job_script("/tmp/evil.py")
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
def test_tilde_path_blocked(self, cron_env):
|
|
"""~ prefixed paths must be rejected (expanduser bypasses check)."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
success, output = _run_job_script("~/evil.py")
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
def test_tilde_traversal_blocked(self, cron_env):
|
|
"""~/../../../tmp/evil.py must be rejected."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
success, output = _run_job_script("~/../../../tmp/evil.py")
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
def test_relative_traversal_still_blocked(self, cron_env):
|
|
"""../../etc/passwd style traversal must still be blocked."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
success, output = _run_job_script("../../etc/passwd")
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
def test_relative_path_inside_scripts_dir_allowed(self, cron_env):
|
|
"""Relative paths within the scripts dir should still work."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "good.py"
|
|
script.write_text('print("ok")\n')
|
|
|
|
success, output = _run_job_script("good.py")
|
|
assert success is True
|
|
assert output == "ok"
|
|
|
|
def test_subdirectory_inside_scripts_dir_allowed(self, cron_env):
|
|
"""Relative paths to subdirectories within scripts/ should work."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
subdir = cron_env / "scripts" / "monitors"
|
|
subdir.mkdir()
|
|
script = subdir / "check.py"
|
|
script.write_text('print("sub ok")\n')
|
|
|
|
success, output = _run_job_script("monitors/check.py")
|
|
assert success is True
|
|
assert output == "sub ok"
|
|
|
|
def test_absolute_path_inside_scripts_dir_allowed(self, cron_env):
|
|
"""Absolute paths that resolve WITHIN scripts/ should work."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
script = cron_env / "scripts" / "abs_ok.py"
|
|
script.write_text('print("abs ok")\n')
|
|
|
|
success, output = _run_job_script(str(script))
|
|
assert success is True
|
|
assert output == "abs ok"
|
|
|
|
@pytest.mark.skipif(
|
|
sys.platform == "win32",
|
|
reason="Symlinks require elevated privileges on Windows",
|
|
)
|
|
def test_symlink_escape_blocked(self, cron_env, tmp_path):
|
|
"""Symlinks pointing outside scripts/ must be rejected."""
|
|
from cron.scheduler import _run_job_script
|
|
|
|
# Create a script outside the scripts dir
|
|
outside = tmp_path / "outside_evil.py"
|
|
outside.write_text('print("escaped")\n')
|
|
|
|
# Create a symlink inside scripts/ pointing outside
|
|
link = cron_env / "scripts" / "sneaky.py"
|
|
link.symlink_to(outside)
|
|
|
|
success, output = _run_job_script("sneaky.py")
|
|
assert success is False
|
|
assert "blocked" in output.lower() or "outside" in output.lower()
|
|
|
|
|
|
class TestCronjobToolScriptValidation:
|
|
"""Test API-boundary validation of cron script paths in cronjob_tools."""
|
|
|
|
def test_create_with_absolute_script_rejected(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="/home/user/evil.py",
|
|
))
|
|
assert result["success"] is False
|
|
assert "relative" in result["error"].lower() or "absolute" in result["error"].lower()
|
|
|
|
def test_create_with_tilde_script_rejected(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="~/monitor.py",
|
|
))
|
|
assert result["success"] is False
|
|
assert "relative" in result["error"].lower() or "absolute" in result["error"].lower()
|
|
|
|
def test_create_with_traversal_script_rejected(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="../../etc/passwd",
|
|
))
|
|
assert result["success"] is False
|
|
assert "escapes" in result["error"].lower() or "traversal" in result["error"].lower()
|
|
|
|
def test_create_with_relative_script_allowed(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="monitor.py",
|
|
))
|
|
assert result["success"] is True
|
|
assert result["job"]["script"] == "monitor.py"
|
|
|
|
def test_update_with_absolute_script_rejected(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
create_result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
))
|
|
job_id = create_result["job_id"]
|
|
|
|
update_result = json.loads(cronjob(
|
|
action="update",
|
|
job_id=job_id,
|
|
script="/tmp/evil.py",
|
|
))
|
|
assert update_result["success"] is False
|
|
assert "relative" in update_result["error"].lower() or "absolute" in update_result["error"].lower()
|
|
|
|
def test_update_clear_script_allowed(self, cron_env, monkeypatch):
|
|
"""Clearing a script (empty string) should always be permitted."""
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
create_result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="monitor.py",
|
|
))
|
|
job_id = create_result["job_id"]
|
|
|
|
update_result = json.loads(cronjob(
|
|
action="update",
|
|
job_id=job_id,
|
|
script="",
|
|
))
|
|
assert update_result["success"] is True
|
|
assert "script" not in update_result["job"]
|
|
|
|
def test_windows_absolute_path_rejected(self, cron_env, monkeypatch):
|
|
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
|
from tools.cronjob_tools import cronjob
|
|
|
|
result = json.loads(cronjob(
|
|
action="create",
|
|
schedule="every 1h",
|
|
prompt="Monitor things",
|
|
script="C:\\Users\\evil\\script.py",
|
|
))
|
|
assert result["success"] is False
|
|
|
|
|
|
class TestRunJobEnvVarCleanup:
|
|
"""Test that run_job() env vars are cleaned up even on early failure."""
|
|
|
|
def test_env_vars_cleaned_on_early_error(self, cron_env, monkeypatch):
|
|
"""Origin env vars must be cleaned up even if run_job fails early."""
|
|
# Ensure env vars are clean before test
|
|
for key in (
|
|
"HERMES_SESSION_PLATFORM",
|
|
"HERMES_SESSION_CHAT_ID",
|
|
"HERMES_SESSION_CHAT_NAME",
|
|
):
|
|
monkeypatch.delenv(key, raising=False)
|
|
|
|
# Build a job with origin info that will fail during execution
|
|
# (no valid model, no API key — will raise inside try block)
|
|
job = {
|
|
"id": "test-envleak",
|
|
"name": "env-leak-test",
|
|
"prompt": "test",
|
|
"schedule_display": "every 1h",
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "12345",
|
|
"chat_name": "Test Chat",
|
|
},
|
|
}
|
|
|
|
from cron.scheduler import run_job
|
|
|
|
# Expect it to fail (no model/API key), but env vars must be cleaned
|
|
try:
|
|
run_job(job)
|
|
except Exception:
|
|
pass
|
|
|
|
# Verify env vars were cleaned up by the finally block
|
|
assert os.environ.get("HERMES_SESSION_PLATFORM") is None
|
|
assert os.environ.get("HERMES_SESSION_CHAT_ID") is None
|
|
assert os.environ.get("HERMES_SESSION_CHAT_NAME") is None
|