fix(cron): stop injecting cron outputs into gateway session history (#2313)

Cron deliveries were mirrored into the target gateway session as
assistant-role messages, causing consecutive assistant messages that
violate message alternation (issue #2221).

Instead of fixing the role, remove the mirror injection entirely.
Cron outputs already live in their own cron session and don't belong
in the interactive conversation history.

Delivered messages are now wrapped with a header (task name) and a
footer noting the agent cannot see or respond to the message, so
users have clear context about what they're reading.

Closes #2221
This commit is contained in:
Teknium
2026-03-21 07:18:36 -07:00
committed by GitHub
parent 713f2f73da
commit 37a9979459
2 changed files with 69 additions and 27 deletions

View File

@@ -159,15 +159,25 @@ def _deliver_result(job: dict, content: str) -> None:
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
return
# Wrap the content so the user knows this is a cron delivery and that
# the interactive agent has no visibility into it.
task_name = job.get("name", job["id"])
wrapped = (
f"Cronjob Response: {task_name}\n"
f"-------------\n\n"
f"{content}\n\n"
f"Note: The agent cannot see this message, and therefore cannot respond to it."
)
# Run the async send in a fresh event loop (safe from any thread)
try:
result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id))
except RuntimeError:
# asyncio.run() fails if there's already a running loop in this thread;
# spin up a new thread to avoid that.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id))
result = future.result(timeout=30)
except Exception as e:
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
@@ -177,12 +187,6 @@ def _deliver_result(job: dict, content: str) -> None:
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
else:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
# Mirror the delivered content into the target's gateway session
try:
from gateway.mirror import mirror_to_session
mirror_to_session(platform_name, chat_id, content, source_label="cron", thread_id=thread_id)
except Exception as e:
logger.warning("Job '%s': mirror_to_session failed: %s", job["id"], e)
def _build_job_prompt(job: dict) -> str:

View File

@@ -95,11 +95,58 @@ class TestResolveDeliveryTarget:
}
class TestDeliverResultMirrorLogging:
"""Verify that mirror_to_session failures are logged, not silently swallowed."""
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
def test_mirror_failure_is_logged(self, caplog):
"""When mirror_to_session raises, a warning should be logged."""
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: daily-report" in sent_content
assert "-------------" in sent_content
assert "Here is today's summary." in sent_content
assert "The agent cannot see this message" in sent_content
def test_delivery_uses_job_id_when_no_name(self):
"""When a job has no name, the wrapper should fall back to job id."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "abc-123",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Output.")
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: abc-123" in sent_content
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
from gateway.config import Platform
pconfig = MagicMock()
@@ -109,20 +156,18 @@ class TestDeliverResultMirrorLogging:
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session", side_effect=ConnectionError("network down")):
patch("gateway.mirror.mirror_to_session") as mirror_mock:
job = {
"id": "test-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
_deliver_result(job, "Hello!")
_deliver_result(job, "Hello!")
assert any("mirror_to_session failed" in r.message for r in caplog.records), \
f"Expected 'mirror_to_session failed' warning in logs, got: {[r.message for r in caplog.records]}"
mirror_mock.assert_not_called()
def test_origin_delivery_preserves_thread_id(self):
"""Origin delivery should forward thread_id to send/mirror helpers."""
"""Origin delivery should forward thread_id to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
@@ -132,6 +177,7 @@ class TestDeliverResultMirrorLogging:
job = {
"id": "test-job",
"name": "topic-job",
"deliver": "origin",
"origin": {
"platform": "telegram",
@@ -141,19 +187,11 @@ class TestDeliverResultMirrorLogging:
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
_deliver_result(job, "hello")
send_mock.assert_called_once()
assert send_mock.call_args.kwargs["thread_id"] == "17585"
mirror_mock.assert_called_once_with(
"telegram",
"-1001",
"hello",
source_label="cron",
thread_id="17585",
)
class TestRunJobSessionPersistence: