Files
hermes-agent/tests/cron/test_scheduler.py
Teknium 3fad1e7cc1 fix(cron): resolve human-friendly delivery labels via channel directory (#3860)
Cron jobs configured with deliver labels from send_message(action='list')
like 'whatsapp:Alice (dm)' passed the label as a literal chat_id.
WhatsApp bridge failed with jidDecode error since 'Alice (dm)' isn't
a valid JID.

Now _resolve_delivery_target() strips display suffixes like ' (dm)' and
resolves human-friendly names via the channel directory before using
them. Raw IDs pass through unchanged when the directory has no match.

Fixes #1945.
2026-03-29 21:24:17 -07:00

796 lines
32 KiB
Python

"""Tests for cron/scheduler.py — origin resolution, delivery routing, and error logging."""
import json
import logging
import os
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
class TestResolveOrigin:
def test_full_origin(self):
job = {
"origin": {
"platform": "telegram",
"chat_id": "123456",
"chat_name": "Test Chat",
"thread_id": "42",
}
}
result = _resolve_origin(job)
assert isinstance(result, dict)
assert result == job["origin"]
assert result["platform"] == "telegram"
assert result["chat_id"] == "123456"
assert result["chat_name"] == "Test Chat"
assert result["thread_id"] == "42"
def test_no_origin(self):
assert _resolve_origin({}) is None
assert _resolve_origin({"origin": None}) is None
def test_missing_platform(self):
job = {"origin": {"chat_id": "123"}}
assert _resolve_origin(job) is None
def test_missing_chat_id(self):
job = {"origin": {"platform": "telegram"}}
assert _resolve_origin(job) is None
def test_empty_origin(self):
job = {"origin": {}}
assert _resolve_origin(job) is None
class TestResolveDeliveryTarget:
def test_origin_delivery_preserves_thread_id(self):
job = {
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_explicit_telegram_topic_target_with_thread_id(self):
"""deliver: 'telegram:chat_id:thread_id' parses correctly."""
job = {
"deliver": "telegram:-1003724596514:17",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": "17",
}
def test_explicit_telegram_chat_id_without_thread_id(self):
"""deliver: 'telegram:chat_id' sets thread_id to None."""
job = {
"deliver": "telegram:-1003724596514",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": None,
}
def test_human_friendly_label_resolved_via_channel_directory(self):
"""deliver: 'whatsapp:Alice (dm)' resolves to the real JID."""
job = {"deliver": "whatsapp:Alice (dm)"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="12345678901234@lid",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "whatsapp",
"chat_id": "12345678901234@lid",
"thread_id": None,
}
def test_human_friendly_label_without_suffix_resolved(self):
"""deliver: 'telegram:My Group' resolves without display suffix."""
job = {"deliver": "telegram:My Group"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="-1009999",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "telegram",
"chat_id": "-1009999",
"thread_id": None,
}
def test_raw_id_not_mangled_when_directory_returns_none(self):
"""deliver: 'whatsapp:12345@lid' passes through when directory has no match."""
job = {"deliver": "whatsapp:12345@lid"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value=None,
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "whatsapp",
"chat_id": "12345@lid",
"thread_id": None,
}
def test_bare_platform_uses_matching_origin_chat(self):
job = {
"deliver": "telegram",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
job = {
"deliver": "telegram",
"origin": {
"platform": "discord",
"chat_id": "abc",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: daily-report" in sent_content
assert "-------------" in sent_content
assert "Here is today's summary." in sent_content
assert "The agent cannot see this message" in sent_content
def test_delivery_uses_job_id_when_no_name(self):
"""When a job has no name, the wrapper should fall back to job id."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "abc-123",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Output.")
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: abc-123" in sent_content
def test_delivery_skips_wrapping_when_config_disabled(self):
"""When cron.wrap_response is false, deliver raw content without header/footer."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}):
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Clean output only.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert sent_content == "Clean output only."
assert "Cronjob Response" not in sent_content
assert "The agent cannot see" not in sent_content
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
job = {
"id": "test-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Hello!")
mirror_mock.assert_not_called()
def test_origin_delivery_preserves_thread_id(self):
"""Origin delivery should forward thread_id to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
job = {
"id": "test-job",
"name": "topic-job",
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
_deliver_result(job, "hello")
send_mock.assert_called_once()
assert send_mock.call_args.kwargs["thread_id"] == "17585"
class TestRunJobSessionPersistence:
def test_run_job_passes_session_db_and_cron_platform(self, tmp_path):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "test-key",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["session_db"] is fake_db
assert kwargs["platform"] == "cron"
assert kwargs["session_id"].startswith("cron_test-job_")
fake_db.end_session.assert_called_once()
call_args = fake_db.end_session.call_args
assert call_args[0][0].startswith("cron_test-job_")
assert call_args[0][1] == "cron_complete"
fake_db.close.assert_called_once()
def test_run_job_empty_response_returns_empty_not_placeholder(self, tmp_path):
"""Empty final_response should stay empty for delivery logic (issue #2234).
The placeholder '(No response generated)' should only appear in the
output log, not in the returned final_response that's used for delivery.
"""
job = {
"id": "silent-job",
"name": "silent test",
"prompt": "do work via tools only",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "test-key",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
# Agent did work via tools but returned no text
mock_agent.run_conversation.return_value = {"final_response": ""}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
# final_response should be empty for delivery logic to skip
assert final_response == ""
# But the output log should show the placeholder
assert "(No response generated)" in output
def test_run_job_sets_auto_delivery_env_from_dotenv_home_channel(self, tmp_path, monkeypatch):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
"deliver": "telegram",
}
fake_db = MagicMock()
seen = {}
(tmp_path / ".env").write_text("TELEGRAM_HOME_CHANNEL=-2002\n")
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", raising=False)
class FakeAgent:
def __init__(self, *args, **kwargs):
pass
def run_conversation(self, *args, **kwargs):
seen["platform"] = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM")
seen["chat_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID")
seen["thread_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID")
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent", FakeAgent):
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
assert seen == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
assert os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") is None
fake_db.close.assert_called_once()
class TestRunJobConfigLogging:
"""Verify that config.yaml parse failures are logged, not silently swallowed."""
def test_bad_config_yaml_is_logged(self, caplog, tmp_path):
"""When config.yaml is malformed, a warning should be logged."""
bad_yaml = tmp_path / "config.yaml"
bad_yaml.write_text("invalid: yaml: [[[bad")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to load config.yaml" in r.message for r in caplog.records), \
f"Expected 'failed to load config.yaml' warning in logs, got: {[r.message for r in caplog.records]}"
def test_bad_prefill_messages_is_logged(self, caplog, tmp_path):
"""When the prefill messages file contains invalid JSON, a warning should be logged."""
# Valid config.yaml that points to a bad prefill file
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text("prefill_messages_file: prefill.json\n")
bad_prefill = tmp_path / "prefill.json"
bad_prefill.write_text("{not valid json!!!")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to parse prefill messages" in r.message for r in caplog.records), \
f"Expected 'failed to parse prefill messages' warning in logs, got: {[r.message for r in caplog.records]}"
class TestRunJobPerJobOverrides:
def test_job_level_model_provider_and_base_url_overrides_are_used(self, tmp_path):
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text(
"model:\n"
" default: gpt-5.4\n"
" provider: openai-codex\n"
" base_url: https://chatgpt.com/backend-api/codex\n"
)
job = {
"id": "briefing-job",
"name": "briefing",
"prompt": "hello",
"model": "perplexity/sonar-pro",
"provider": "custom",
"base_url": "http://127.0.0.1:4000/v1",
}
fake_db = MagicMock()
fake_runtime = {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "http://127.0.0.1:4000/v1",
"api_key": "***",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch("hermes_cli.runtime_provider.resolve_runtime_provider", return_value=fake_runtime) as runtime_mock, \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
runtime_mock.assert_called_once_with(
requested="custom",
explicit_base_url="http://127.0.0.1:4000/v1",
)
assert mock_agent_cls.call_args.kwargs["model"] == "perplexity/sonar-pro"
fake_db.close.assert_called_once()
class TestRunJobSkillBacked:
def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path):
job = {
"id": "skill-job",
"name": "skill test",
"prompt": "Check the feeds and summarize anything new.",
"skill": "blogwatcher",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", return_value=json.dumps({"success": True, "content": "# Blogwatcher\nFollow this skill."})), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
kwargs = mock_agent_cls.call_args.kwargs
assert "cronjob" in (kwargs["disabled_toolsets"] or [])
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert "blogwatcher" in prompt_arg
assert "Follow this skill" in prompt_arg
assert "Check the feeds and summarize anything new." in prompt_arg
def test_run_job_loads_multiple_skills_in_order(self, tmp_path):
job = {
"id": "multi-skill-job",
"name": "multi skill test",
"prompt": "Combine the results.",
"skills": ["blogwatcher", "find-nearby"],
}
fake_db = MagicMock()
def _skill_view(name):
return json.dumps({"success": True, "content": f"# {name}\nInstructions for {name}."})
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view) as skill_view_mock, \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert skill_view_mock.call_count == 2
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "find-nearby"]
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert prompt_arg.index("blogwatcher") < prompt_arg.index("find-nearby")
assert "Instructions for blogwatcher." in prompt_arg
assert "Instructions for find-nearby." in prompt_arg
assert "Combine the results." in prompt_arg
class TestSilentDelivery:
"""Verify that [SILENT] responses suppress delivery while still saving output."""
def _make_job(self):
return {
"id": "monitor-job",
"name": "monitor",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
def test_normal_response_delivers(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "Results here", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_called_once()
def test_silent_response_suppresses_delivery(self, caplog):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
with caplog.at_level(logging.INFO, logger="cron.scheduler"):
tick(verbose=False)
deliver_mock.assert_not_called()
assert any(SILENT_MARKER in r.message for r in caplog.records)
def test_silent_with_note_suppresses_delivery(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT] No changes detected", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_silent_is_case_insensitive(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[silent] nothing new", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_failed_job_always_delivers(self):
"""Failed jobs deliver regardless of [SILENT] in output."""
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "some error")), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_called_once()
def test_output_saved_even_when_delivery_suppressed(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# full output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output") as save_mock, \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
save_mock.return_value = "/tmp/out.md"
from cron.scheduler import tick
tick(verbose=False)
save_mock.assert_called_once_with("monitor-job", "# full output")
deliver_mock.assert_not_called()
class TestBuildJobPromptSilentHint:
"""Verify _build_job_prompt always injects [SILENT] guidance."""
def test_hint_always_present(self):
job = {"prompt": "Check for updates"}
result = _build_job_prompt(job)
assert "[SILENT]" in result
assert "Check for updates" in result
def test_hint_present_even_without_prompt(self):
job = {"prompt": ""}
result = _build_job_prompt(job)
assert "[SILENT]" in result
class TestBuildJobPromptMissingSkill:
"""Verify that a missing skill logs a warning and does not crash the job."""
def _missing_skill_view(self, name: str) -> str:
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
def test_missing_skill_does_not_raise(self):
"""Job should run even when a referenced skill is not installed."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
# prompt is preserved even though skill was skipped
assert "do something" in result
def test_missing_skill_injects_user_notice_into_prompt(self):
"""A system notice about the missing skill is injected into the prompt."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
assert "ghost-skill" in result
assert "not found" in result.lower() or "skipped" in result.lower()
def test_missing_skill_logs_warning(self, caplog):
"""A warning is logged when a skill cannot be found."""
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
_build_job_prompt({"name": "My Job", "skills": ["ghost-skill"], "prompt": "do something"})
assert any("ghost-skill" in record.message for record in caplog.records)
def test_valid_skill_loaded_alongside_missing(self):
"""A valid skill is still loaded when another skill in the list is missing."""
def _mixed_skill_view(name: str) -> str:
if name == "real-skill":
return json.dumps({"success": True, "content": "Real skill content."})
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
with patch("tools.skills_tool.skill_view", side_effect=_mixed_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"})
assert "Real skill content." in result
assert "go" in result
class TestTickAdvanceBeforeRun:
"""Verify that tick() calls advance_next_run before run_job for crash safety."""
def test_advance_called_before_run_job(self, tmp_path):
"""advance_next_run must be called before run_job to prevent crash-loop re-fires."""
call_order = []
def fake_advance(job_id):
call_order.append(("advance", job_id))
return True
def fake_run_job(job):
call_order.append(("run", job["id"]))
return True, "output", "response", None
fake_job = {
"id": "test-advance",
"name": "test",
"prompt": "hello",
"enabled": True,
"schedule": {"kind": "cron", "expr": "15 6 * * *"},
}
with patch("cron.scheduler.get_due_jobs", return_value=[fake_job]), \
patch("cron.scheduler.advance_next_run", side_effect=fake_advance) as adv_mock, \
patch("cron.scheduler.run_job", side_effect=fake_run_job), \
patch("cron.scheduler.save_job_output", return_value=tmp_path / "out.md"), \
patch("cron.scheduler.mark_job_run"), \
patch("cron.scheduler._deliver_result"):
from cron.scheduler import tick
executed = tick(verbose=False)
assert executed == 1
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]