feat: enhance job delivery mechanism in scheduler

- Introduced a new `_deliver_result` function to handle job output delivery to specified platforms.
- Added origin resolution logic to determine the correct delivery target based on job configuration.
- Updated `run_job` to return the final response along with the output for improved context.
- Integrated delivery of job results to the origin chat or fallback channels, with error handling for delivery failures.
- Cleaned up environment variables after job execution to prevent leakage between jobs.
This commit is contained in:
teknium1
2026-02-22 17:14:44 -08:00
parent 6037b6a5ab
commit 16d0aa7b4d

View File

@@ -8,6 +8,7 @@ Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
runs at a time if multiple processes overlap.
"""
import asyncio
import fcntl
import logging
import os
@@ -29,40 +30,134 @@ _LOCK_DIR = Path.home() / ".hermes" / "cron"
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
def run_job(job: dict) -> tuple[bool, str, Optional[str]]:
def _resolve_origin(job: dict) -> Optional[dict]:
"""Extract origin info from a job, returning {platform, chat_id, chat_name} or None."""
origin = job.get("origin")
if not origin:
return None
platform = origin.get("platform")
chat_id = origin.get("chat_id")
if platform and chat_id:
return origin
return None
def _deliver_result(job: dict, content: str) -> None:
"""
Deliver job output to the configured target (origin chat, specific platform, etc.).
Uses the standalone platform send functions from send_message_tool so delivery
works whether or not the gateway is running.
"""
deliver = job.get("deliver", "local")
origin = _resolve_origin(job)
if deliver == "local":
return
# Resolve target platform + chat_id
if deliver == "origin":
if not origin:
logger.warning("Job '%s' deliver=origin but no origin stored, skipping delivery", job["id"])
return
platform_name = origin["platform"]
chat_id = origin["chat_id"]
elif ":" in deliver:
platform_name, chat_id = deliver.split(":", 1)
else:
# Bare platform name like "telegram" — need to resolve to origin or home channel
platform_name = deliver
if origin and origin.get("platform") == platform_name:
chat_id = origin["chat_id"]
else:
# Fall back to home channel
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
if not chat_id:
logger.warning("Job '%s' deliver=%s but no chat_id or home channel", job["id"], deliver)
return
from tools.send_message_tool import _send_to_platform
from gateway.config import load_gateway_config, Platform
platform_map = {
"telegram": Platform.TELEGRAM,
"discord": Platform.DISCORD,
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
}
platform = platform_map.get(platform_name.lower())
if not platform:
logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name)
return
try:
config = load_gateway_config()
except Exception as e:
logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e)
return
pconfig = config.platforms.get(platform)
if not pconfig or not pconfig.enabled:
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
return
# Run the async send in a fresh event loop (safe from any thread)
try:
result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content))
except RuntimeError:
# asyncio.run() fails if there's already a running loop in this thread;
# spin up a new thread to avoid that.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content))
result = future.result(timeout=30)
except Exception as e:
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
return
if result and result.get("error"):
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
else:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.
Returns:
Tuple of (success, output, error_message)
Tuple of (success, full_output_doc, final_response, error_message)
"""
from run_agent import AIAgent
job_id = job["id"]
job_name = job["name"]
prompt = job["prompt"]
origin = _resolve_origin(job)
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
logger.info("Prompt: %s", prompt[:100])
# Inject origin context so the agent's send_message tool knows the chat
if origin:
os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"]
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
if origin.get("chat_name"):
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
try:
# Create agent with default settings
# Jobs run in isolated sessions (no prior context)
agent = AIAgent(
model=os.getenv("HERMES_MODEL", "anthropic/claude-opus-4.6"),
quiet_mode=True,
session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
# Run the conversation
result = agent.run_conversation(prompt)
# Extract final response
final_response = result.get("final_response", "")
if not final_response:
final_response = "(No response generated)"
# Build output document
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
@@ -79,13 +174,12 @@ def run_job(job: dict) -> tuple[bool, str, Optional[str]]:
"""
logger.info("Job '%s' completed successfully", job_name)
return True, output, None
return True, output, final_response, None
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)}"
logger.error("Job '%s' failed: %s", job_name, error_msg)
# Build error output
output = f"""# Cron Job: {job_name} (FAILED)
**Job ID:** {job_id}
@@ -104,7 +198,12 @@ def run_job(job: dict) -> tuple[bool, str, Optional[str]]:
{traceback.format_exc()}
```
"""
return False, output, error_msg
return False, output, "", error_msg
finally:
# Clean up injected env vars so they don't leak to other jobs
for key in ("HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"):
os.environ.pop(key, None)
def tick(verbose: bool = True) -> int:
@@ -126,7 +225,6 @@ def tick(verbose: bool = True) -> int:
lock_fd = open(_LOCK_FILE, "w")
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (OSError, IOError):
# Another tick is already running — skip silently
logger.debug("Tick skipped — another instance holds the lock")
return 0
@@ -143,12 +241,20 @@ def tick(verbose: bool = True) -> int:
executed = 0
for job in due_jobs:
try:
success, output, error = run_job(job)
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
if deliver_content:
try:
_deliver_result(job, deliver_content)
except Exception as de:
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error)
executed += 1