diff --git a/cron/scheduler.py b/cron/scheduler.py index 51dfbc770..2a6effa23 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -8,6 +8,7 @@ Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick runs at a time if multiple processes overlap. """ +import asyncio import fcntl import logging import os @@ -29,40 +30,134 @@ _LOCK_DIR = Path.home() / ".hermes" / "cron" _LOCK_FILE = _LOCK_DIR / ".tick.lock" -def run_job(job: dict) -> tuple[bool, str, Optional[str]]: +def _resolve_origin(job: dict) -> Optional[dict]: + """Extract origin info from a job, returning {platform, chat_id, chat_name} or None.""" + origin = job.get("origin") + if not origin: + return None + platform = origin.get("platform") + chat_id = origin.get("chat_id") + if platform and chat_id: + return origin + return None + + +def _deliver_result(job: dict, content: str) -> None: + """ + Deliver job output to the configured target (origin chat, specific platform, etc.). + + Uses the standalone platform send functions from send_message_tool so delivery + works whether or not the gateway is running. + """ + deliver = job.get("deliver", "local") + origin = _resolve_origin(job) + + if deliver == "local": + return + + # Resolve target platform + chat_id + if deliver == "origin": + if not origin: + logger.warning("Job '%s' deliver=origin but no origin stored, skipping delivery", job["id"]) + return + platform_name = origin["platform"] + chat_id = origin["chat_id"] + elif ":" in deliver: + platform_name, chat_id = deliver.split(":", 1) + else: + # Bare platform name like "telegram" — need to resolve to origin or home channel + platform_name = deliver + if origin and origin.get("platform") == platform_name: + chat_id = origin["chat_id"] + else: + # Fall back to home channel + chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "") + if not chat_id: + logger.warning("Job '%s' deliver=%s but no chat_id or home channel", job["id"], deliver) + return + + from tools.send_message_tool import _send_to_platform + from gateway.config import load_gateway_config, Platform + + platform_map = { + "telegram": Platform.TELEGRAM, + "discord": Platform.DISCORD, + "slack": Platform.SLACK, + "whatsapp": Platform.WHATSAPP, + } + platform = platform_map.get(platform_name.lower()) + if not platform: + logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name) + return + + try: + config = load_gateway_config() + except Exception as e: + logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e) + return + + pconfig = config.platforms.get(platform) + if not pconfig or not pconfig.enabled: + logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name) + return + + # Run the async send in a fresh event loop (safe from any thread) + try: + result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content)) + except RuntimeError: + # asyncio.run() fails if there's already a running loop in this thread; + # spin up a new thread to avoid that. + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content)) + result = future.result(timeout=30) + except Exception as e: + logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e) + return + + if result and result.get("error"): + logger.error("Job '%s': delivery error: %s", job["id"], result["error"]) + else: + logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + + +def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: """ Execute a single cron job. Returns: - Tuple of (success, output, error_message) + Tuple of (success, full_output_doc, final_response, error_message) """ from run_agent import AIAgent job_id = job["id"] job_name = job["name"] prompt = job["prompt"] + origin = _resolve_origin(job) logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) - + + # Inject origin context so the agent's send_message tool knows the chat + if origin: + os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"] + os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"]) + if origin.get("chat_name"): + os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"] + try: - # Create agent with default settings - # Jobs run in isolated sessions (no prior context) agent = AIAgent( model=os.getenv("HERMES_MODEL", "anthropic/claude-opus-4.6"), quiet_mode=True, session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) - # Run the conversation result = agent.run_conversation(prompt) - # Extract final response final_response = result.get("final_response", "") if not final_response: final_response = "(No response generated)" - # Build output document output = f"""# Cron Job: {job_name} **Job ID:** {job_id} @@ -79,13 +174,12 @@ def run_job(job: dict) -> tuple[bool, str, Optional[str]]: """ logger.info("Job '%s' completed successfully", job_name) - return True, output, None + return True, output, final_response, None except Exception as e: error_msg = f"{type(e).__name__}: {str(e)}" logger.error("Job '%s' failed: %s", job_name, error_msg) - # Build error output output = f"""# Cron Job: {job_name} (FAILED) **Job ID:** {job_id} @@ -104,7 +198,12 @@ def run_job(job: dict) -> tuple[bool, str, Optional[str]]: {traceback.format_exc()} ``` """ - return False, output, error_msg + return False, output, "", error_msg + + finally: + # Clean up injected env vars so they don't leak to other jobs + for key in ("HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"): + os.environ.pop(key, None) def tick(verbose: bool = True) -> int: @@ -126,7 +225,6 @@ def tick(verbose: bool = True) -> int: lock_fd = open(_LOCK_FILE, "w") fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except (OSError, IOError): - # Another tick is already running — skip silently logger.debug("Tick skipped — another instance holds the lock") return 0 @@ -143,12 +241,20 @@ def tick(verbose: bool = True) -> int: executed = 0 for job in due_jobs: try: - success, output, error = run_job(job) + success, output, final_response, error = run_job(job) output_file = save_job_output(job["id"], output) if verbose: logger.info("Output saved to: %s", output_file) + # Deliver the final response to the origin/target chat + deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" + if deliver_content: + try: + _deliver_result(job, deliver_content) + except Exception as de: + logger.error("Delivery failed for job %s: %s", job["id"], de) + mark_job_run(job["id"], success, error) executed += 1