fix: suppress duplicate cron sends to auto-delivery targets
Allow cron runs to keep using send_message for additional destinations, but skip same-target sends when the scheduler will already auto-deliver the final response there. Add prompt/tool guidance, docs, and regression coverage for origin/home-channel resolution and thread-aware comparisons.
This commit is contained in:
@@ -141,6 +141,13 @@ PLATFORM_HINTS = {
|
||||
"is preserved for threading. Do not include greetings or sign-offs unless "
|
||||
"contextually appropriate."
|
||||
),
|
||||
"cron": (
|
||||
"You are running as a scheduled cron job. Your final response is automatically "
|
||||
"delivered to the job's configured destination, so do not use send_message to "
|
||||
"send to that same target again. If you want the user to receive something in "
|
||||
"the scheduled destination, put it directly in your final response. Use "
|
||||
"send_message only for additional or different targets."
|
||||
),
|
||||
"cli": (
|
||||
"You are a CLI AI Agent. Try not to use markdown but simple text "
|
||||
"renderable inside a terminal."
|
||||
|
||||
@@ -56,6 +56,50 @@ def _resolve_origin(job: dict) -> Optional[dict]:
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
||||
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
||||
deliver = job.get("deliver", "local")
|
||||
origin = _resolve_origin(job)
|
||||
|
||||
if deliver == "local":
|
||||
return None
|
||||
|
||||
if deliver == "origin":
|
||||
if not origin:
|
||||
return None
|
||||
return {
|
||||
"platform": origin["platform"],
|
||||
"chat_id": str(origin["chat_id"]),
|
||||
"thread_id": origin.get("thread_id"),
|
||||
}
|
||||
|
||||
if ":" in deliver:
|
||||
platform_name, chat_id = deliver.split(":", 1)
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
platform_name = deliver
|
||||
if origin and origin.get("platform") == platform_name:
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": str(origin["chat_id"]),
|
||||
"thread_id": origin.get("thread_id"),
|
||||
}
|
||||
|
||||
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
||||
if not chat_id:
|
||||
return None
|
||||
|
||||
return {
|
||||
"platform": platform_name,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
|
||||
def _deliver_result(job: dict, content: str) -> None:
|
||||
"""
|
||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||
@@ -63,36 +107,19 @@ def _deliver_result(job: dict, content: str) -> None:
|
||||
Uses the standalone platform send functions from send_message_tool so delivery
|
||||
works whether or not the gateway is running.
|
||||
"""
|
||||
deliver = job.get("deliver", "local")
|
||||
origin = _resolve_origin(job)
|
||||
|
||||
if deliver == "local":
|
||||
target = _resolve_delivery_target(job)
|
||||
if not target:
|
||||
if job.get("deliver", "local") != "local":
|
||||
logger.warning(
|
||||
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
|
||||
job["id"],
|
||||
job.get("deliver", "local"),
|
||||
)
|
||||
return
|
||||
|
||||
thread_id = None
|
||||
|
||||
# Resolve target platform + chat_id
|
||||
if deliver == "origin":
|
||||
if not origin:
|
||||
logger.warning("Job '%s' deliver=origin but no origin stored, skipping delivery", job["id"])
|
||||
return
|
||||
platform_name = origin["platform"]
|
||||
chat_id = origin["chat_id"]
|
||||
thread_id = origin.get("thread_id")
|
||||
elif ":" in deliver:
|
||||
platform_name, chat_id = deliver.split(":", 1)
|
||||
else:
|
||||
# Bare platform name like "telegram" — need to resolve to origin or home channel
|
||||
platform_name = deliver
|
||||
if origin and origin.get("platform") == platform_name:
|
||||
chat_id = origin["chat_id"]
|
||||
thread_id = origin.get("thread_id")
|
||||
else:
|
||||
# Fall back to home channel
|
||||
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
||||
if not chat_id:
|
||||
logger.warning("Job '%s' deliver=%s but no chat_id or home channel. Set via: hermes config set %s_HOME_CHANNEL <channel_id>", job["id"], deliver, platform_name.upper())
|
||||
return
|
||||
platform_name = target["platform"]
|
||||
chat_id = target["chat_id"]
|
||||
thread_id = target.get("thread_id")
|
||||
|
||||
from tools.send_message_tool import _send_to_platform
|
||||
from gateway.config import load_gateway_config, Platform
|
||||
@@ -169,6 +196,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
job_name = job["name"]
|
||||
prompt = job["prompt"]
|
||||
origin = _resolve_origin(job)
|
||||
delivery_target = _resolve_delivery_target(job)
|
||||
|
||||
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
||||
logger.info("Prompt: %s", prompt[:100])
|
||||
@@ -179,6 +207,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
|
||||
if origin.get("chat_name"):
|
||||
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
||||
if delivery_target:
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"]
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"])
|
||||
if delivery_target.get("thread_id") is not None:
|
||||
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"])
|
||||
|
||||
try:
|
||||
# Re-read .env and config.yaml fresh every run so provider/key
|
||||
@@ -324,7 +357,14 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
|
||||
finally:
|
||||
# Clean up injected env vars so they don't leak to other jobs
|
||||
for key in ("HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"):
|
||||
for key in (
|
||||
"HERMES_SESSION_PLATFORM",
|
||||
"HERMES_SESSION_CHAT_ID",
|
||||
"HERMES_SESSION_CHAT_NAME",
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
||||
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
||||
):
|
||||
os.environ.pop(key, None)
|
||||
if _session_db:
|
||||
try:
|
||||
|
||||
@@ -455,6 +455,7 @@ class TestPromptBuilderConstants:
|
||||
assert "whatsapp" in PLATFORM_HINTS
|
||||
assert "telegram" in PLATFORM_HINTS
|
||||
assert "discord" in PLATFORM_HINTS
|
||||
assert "cron" in PLATFORM_HINTS
|
||||
assert "cli" in PLATFORM_HINTS
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from cron.scheduler import _resolve_origin, _deliver_result, run_job
|
||||
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job
|
||||
|
||||
|
||||
class TestResolveOrigin:
|
||||
@@ -44,6 +44,56 @@ class TestResolveOrigin:
|
||||
assert _resolve_origin(job) is None
|
||||
|
||||
|
||||
class TestResolveDeliveryTarget:
|
||||
def test_origin_delivery_preserves_thread_id(self):
|
||||
job = {
|
||||
"deliver": "origin",
|
||||
"origin": {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
}
|
||||
|
||||
def test_bare_platform_uses_matching_origin_chat(self):
|
||||
job = {
|
||||
"deliver": "telegram",
|
||||
"origin": {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-1001",
|
||||
"thread_id": "17585",
|
||||
}
|
||||
|
||||
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
|
||||
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
|
||||
job = {
|
||||
"deliver": "telegram",
|
||||
"origin": {
|
||||
"platform": "discord",
|
||||
"chat_id": "abc",
|
||||
},
|
||||
}
|
||||
|
||||
assert _resolve_delivery_target(job) == {
|
||||
"platform": "telegram",
|
||||
"chat_id": "-2002",
|
||||
"thread_id": None,
|
||||
}
|
||||
|
||||
|
||||
class TestDeliverResultMirrorLogging:
|
||||
"""Verify that mirror_to_session failures are logged, not silently swallowed."""
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
@@ -29,6 +30,118 @@ def _install_telegram_mock(monkeypatch, bot):
|
||||
|
||||
|
||||
class TestSendMessageTool:
|
||||
def test_cron_duplicate_target_is_skipped_and_explained(self):
|
||||
home = SimpleNamespace(chat_id="-1001")
|
||||
config, _telegram_cfg = _make_config()
|
||||
config.get_home_channel = lambda _platform: home
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["skipped"] is True
|
||||
assert result["reason"] == "cron_auto_delivery_duplicate_target"
|
||||
assert "final response" in result["note"]
|
||||
send_mock.assert_not_awaited()
|
||||
mirror_mock.assert_not_called()
|
||||
|
||||
def test_cron_different_target_still_sends(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram:-1002",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result.get("skipped") is not True
|
||||
send_mock.assert_awaited_once_with(
|
||||
Platform.TELEGRAM,
|
||||
telegram_cfg,
|
||||
"-1002",
|
||||
"hello",
|
||||
thread_id=None,
|
||||
media_files=[],
|
||||
)
|
||||
mirror_mock.assert_called_once_with("telegram", "-1002", "hello", source_label="cli", thread_id=None)
|
||||
|
||||
def test_cron_same_chat_different_thread_still_sends(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM": "telegram",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": "-1001",
|
||||
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": "17585",
|
||||
},
|
||||
clear=False,
|
||||
), \
|
||||
patch("gateway.config.load_gateway_config", return_value=config), \
|
||||
patch("tools.interrupt.is_interrupted", return_value=False), \
|
||||
patch("model_tools._run_async", side_effect=_run_async_immediately), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
|
||||
patch("gateway.mirror.mirror_to_session", return_value=True) as mirror_mock:
|
||||
result = json.loads(
|
||||
send_message_tool(
|
||||
{
|
||||
"action": "send",
|
||||
"target": "telegram:-1001:99999",
|
||||
"message": "hello",
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result.get("skipped") is not True
|
||||
send_mock.assert_awaited_once_with(
|
||||
Platform.TELEGRAM,
|
||||
telegram_cfg,
|
||||
"-1001",
|
||||
"hello",
|
||||
thread_id="99999",
|
||||
media_files=[],
|
||||
)
|
||||
mirror_mock.assert_called_once_with("telegram", "-1001", "hello", source_label="cli", thread_id="99999")
|
||||
|
||||
def test_sends_to_explicit_telegram_topic_target(self):
|
||||
config, telegram_cfg = _make_config()
|
||||
|
||||
|
||||
@@ -194,7 +194,10 @@ DELIVERY OPTIONS (where output goes):
|
||||
- "telegram:123456": Send to specific chat (if user provides ID)
|
||||
|
||||
NOTE: The agent's final response is auto-delivered to the target — do NOT use
|
||||
send_message in the prompt. Just have the agent compose its response normally.
|
||||
send_message in the prompt for that same destination. Same-target send_message
|
||||
calls are skipped so the cron doesn't double-message the user. Put the main
|
||||
user-facing content in the final response, and use send_message only for
|
||||
additional or different targets.
|
||||
|
||||
Use for: reminders, periodic checks, scheduled reports, automated maintenance.""",
|
||||
"parameters": {
|
||||
|
||||
@@ -153,6 +153,10 @@ def _handle_send(args):
|
||||
f"or set a home channel via: hermes config set {platform_name.upper()}_HOME_CHANNEL <channel_id>"
|
||||
})
|
||||
|
||||
duplicate_skip = _maybe_skip_cron_duplicate_send(platform_name, chat_id, thread_id)
|
||||
if duplicate_skip:
|
||||
return json.dumps(duplicate_skip)
|
||||
|
||||
try:
|
||||
from model_tools import _run_async
|
||||
result = _run_async(
|
||||
@@ -213,6 +217,51 @@ def _describe_media_for_mirror(media_files):
|
||||
return f"[Sent {len(media_files)} media attachments]"
|
||||
|
||||
|
||||
def _get_cron_auto_delivery_target():
|
||||
"""Return the cron scheduler's auto-delivery target for the current run, if any."""
|
||||
platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
|
||||
chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
|
||||
if not platform or not chat_id:
|
||||
return None
|
||||
thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
|
||||
return {
|
||||
"platform": platform,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": thread_id,
|
||||
}
|
||||
|
||||
|
||||
def _maybe_skip_cron_duplicate_send(platform_name: str, chat_id: str, thread_id: str | None):
|
||||
"""Skip redundant cron send_message calls when the scheduler will auto-deliver there."""
|
||||
auto_target = _get_cron_auto_delivery_target()
|
||||
if not auto_target:
|
||||
return None
|
||||
|
||||
same_target = (
|
||||
auto_target["platform"] == platform_name
|
||||
and str(auto_target["chat_id"]) == str(chat_id)
|
||||
and auto_target.get("thread_id") == thread_id
|
||||
)
|
||||
if not same_target:
|
||||
return None
|
||||
|
||||
target_label = f"{platform_name}:{chat_id}"
|
||||
if thread_id is not None:
|
||||
target_label += f":{thread_id}"
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"skipped": True,
|
||||
"reason": "cron_auto_delivery_duplicate_target",
|
||||
"target": target_label,
|
||||
"note": (
|
||||
f"Skipped send_message to {target_label}. This cron job will already auto-deliver "
|
||||
"its final response to that same target. Put the intended user-facing content in "
|
||||
"your final response instead, or use a different target if you want an additional message."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, media_files=None):
|
||||
"""Route a message to the appropriate platform sender."""
|
||||
from gateway.config import Platform
|
||||
|
||||
@@ -79,7 +79,7 @@ When scheduling jobs, you specify where the output goes:
|
||||
|
||||
**How platform names work:** When you specify a bare platform name like `"telegram"`, Hermes first checks if the job's origin matches that platform and uses the origin chat ID. Otherwise, it falls back to the platform's home channel configured via environment variable (e.g., `TELEGRAM_HOME_CHANNEL`).
|
||||
|
||||
The agent's final response is automatically delivered — you do **not** need to include `send_message` in the cron prompt.
|
||||
The agent's final response is automatically delivered — you do **not** need to include `send_message` in the cron prompt for that same destination. If a cron run calls `send_message` to the exact target the scheduler will already deliver to, Hermes skips that duplicate send and tells the model to put the user-facing content in the final response instead. Use `send_message` only for additional or different targets.
|
||||
|
||||
The agent knows your connected platforms and home channels — it'll choose sensible defaults.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user