From 37a9979459ca16bc46fb4e424e031fc2ad93b78a Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 21 Mar 2026 07:18:36 -0700 Subject: [PATCH] fix(cron): stop injecting cron outputs into gateway session history (#2313) Cron deliveries were mirrored into the target gateway session as assistant-role messages, causing consecutive assistant messages that violate message alternation (issue #2221). Instead of fixing the role, remove the mirror injection entirely. Cron outputs already live in their own cron session and don't belong in the interactive conversation history. Delivered messages are now wrapped with a header (task name) and a footer noting the agent cannot see or respond to the message, so users have clear context about what they're reading. Closes #2221 --- cron/scheduler.py | 20 ++++++---- tests/cron/test_scheduler.py | 76 +++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 27 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 15cfb61bf..e08eaa0da 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -159,15 +159,25 @@ def _deliver_result(job: dict, content: str) -> None: logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name) return + # Wrap the content so the user knows this is a cron delivery and that + # the interactive agent has no visibility into it. + task_name = job.get("name", job["id"]) + wrapped = ( + f"Cronjob Response: {task_name}\n" + f"-------------\n\n" + f"{content}\n\n" + f"Note: The agent cannot see this message, and therefore cannot respond to it." + ) + # Run the async send in a fresh event loop (safe from any thread) try: - result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id)) + result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id)) except RuntimeError: # asyncio.run() fails if there's already a running loop in this thread; # spin up a new thread to avoid that. import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: - future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id)) + future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id)) result = future.result(timeout=30) except Exception as e: logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e) @@ -177,12 +187,6 @@ def _deliver_result(job: dict, content: str) -> None: logger.error("Job '%s': delivery error: %s", job["id"], result["error"]) else: logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) - # Mirror the delivered content into the target's gateway session - try: - from gateway.mirror import mirror_to_session - mirror_to_session(platform_name, chat_id, content, source_label="cron", thread_id=thread_id) - except Exception as e: - logger.warning("Job '%s': mirror_to_session failed: %s", job["id"], e) def _build_job_prompt(job: dict) -> str: diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 2a1ed7914..970df05c9 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -95,11 +95,58 @@ class TestResolveDeliveryTarget: } -class TestDeliverResultMirrorLogging: - """Verify that mirror_to_session failures are logged, not silently swallowed.""" +class TestDeliverResultWrapping: + """Verify that cron deliveries are wrapped with header/footer and no longer mirrored.""" - def test_mirror_failure_is_logged(self, caplog): - """When mirror_to_session raises, a warning should be logged.""" + def test_delivery_wraps_content_with_header_and_footer(self): + """Delivered content should include task name header and agent-invisible note.""" + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock: + job = { + "id": "test-job", + "name": "daily-report", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + _deliver_result(job, "Here is today's summary.") + + send_mock.assert_called_once() + sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1] + assert "Cronjob Response: daily-report" in sent_content + assert "-------------" in sent_content + assert "Here is today's summary." in sent_content + assert "The agent cannot see this message" in sent_content + + def test_delivery_uses_job_id_when_no_name(self): + """When a job has no name, the wrapper should fall back to job id.""" + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock: + job = { + "id": "abc-123", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + _deliver_result(job, "Output.") + + sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1] + assert "Cronjob Response: abc-123" in sent_content + + def test_no_mirror_to_session_call(self): + """Cron deliveries should NOT mirror into the gateway session.""" from gateway.config import Platform pconfig = MagicMock() @@ -109,20 +156,18 @@ class TestDeliverResultMirrorLogging: with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \ - patch("gateway.mirror.mirror_to_session", side_effect=ConnectionError("network down")): + patch("gateway.mirror.mirror_to_session") as mirror_mock: job = { "id": "test-job", "deliver": "origin", "origin": {"platform": "telegram", "chat_id": "123"}, } - with caplog.at_level(logging.WARNING, logger="cron.scheduler"): - _deliver_result(job, "Hello!") + _deliver_result(job, "Hello!") - assert any("mirror_to_session failed" in r.message for r in caplog.records), \ - f"Expected 'mirror_to_session failed' warning in logs, got: {[r.message for r in caplog.records]}" + mirror_mock.assert_not_called() def test_origin_delivery_preserves_thread_id(self): - """Origin delivery should forward thread_id to send/mirror helpers.""" + """Origin delivery should forward thread_id to the send helper.""" from gateway.config import Platform pconfig = MagicMock() @@ -132,6 +177,7 @@ class TestDeliverResultMirrorLogging: job = { "id": "test-job", + "name": "topic-job", "deliver": "origin", "origin": { "platform": "telegram", @@ -141,19 +187,11 @@ class TestDeliverResultMirrorLogging: } with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ - patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \ - patch("gateway.mirror.mirror_to_session") as mirror_mock: + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock: _deliver_result(job, "hello") send_mock.assert_called_once() assert send_mock.call_args.kwargs["thread_id"] == "17585" - mirror_mock.assert_called_once_with( - "telegram", - "-1001", - "hello", - source_label="cron", - thread_id="17585", - ) class TestRunJobSessionPersistence: