Implement SMS as a first-class messaging platform following ADDING_A_PLATFORM.md checklist. All 16 integration points covered: - gateway/platforms/sms.py: Core adapter with aiohttp webhook server, Telnyx REST API send, markdown stripping, 1600-char chunking, echo loop prevention, multi-number reply-from tracking - gateway/config.py: Platform.SMS enum + env override block - gateway/run.py: Adapter factory + auth maps (SMS_ALLOWED_USERS, SMS_ALLOW_ALL_USERS) - toolsets.py: hermes-sms toolset + included in hermes-gateway - cron/scheduler.py: SMS in platform_map for cron delivery - tools/send_message_tool.py: SMS routing + _send_sms() standalone sender - tools/cronjob_tools.py: 'sms' in deliver description - gateway/channel_directory.py: SMS in session-based discovery - agent/prompt_builder.py: SMS platform hint (plain text, concise) - hermes_cli/status.py: SMS in platforms status display - hermes_cli/gateway.py: SMS in setup wizard with Telnyx instructions - pyproject.toml: sms optional dependency group (aiohttp>=3.9.0) - tests/gateway/test_sms.py: Unit tests for config, format, truncate, echo prevention, requirements, toolset integration Co-authored-by: sunsakis <teo@sunsakis.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
512 lines
17 KiB
Python
512 lines
17 KiB
Python
"""
|
|
Cron job scheduler - executes due jobs.
|
|
|
|
Provides tick() which checks for due jobs and runs them. The gateway
|
|
calls this every 60 seconds from a background thread.
|
|
|
|
Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
|
|
runs at a time if multiple processes overlap.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import traceback
|
|
|
|
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
fcntl = None
|
|
try:
|
|
import msvcrt
|
|
except ImportError:
|
|
msvcrt = None
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from hermes_time import now as _hermes_now
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from cron.jobs import get_due_jobs, mark_job_run, save_job_output
|
|
|
|
# Resolve Hermes home directory (respects HERMES_HOME override)
|
|
_hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
|
|
|
# File-based lock prevents concurrent ticks from gateway + daemon + systemd timer
|
|
_LOCK_DIR = _hermes_home / "cron"
|
|
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
|
|
|
|
|
|
def _resolve_origin(job: dict) -> Optional[dict]:
|
|
"""Extract origin info from a job, preserving any extra routing metadata."""
|
|
origin = job.get("origin")
|
|
if not origin:
|
|
return None
|
|
platform = origin.get("platform")
|
|
chat_id = origin.get("chat_id")
|
|
if platform and chat_id:
|
|
return origin
|
|
return None
|
|
|
|
|
|
def _resolve_delivery_target(job: dict) -> Optional[dict]:
|
|
"""Resolve the concrete auto-delivery target for a cron job, if any."""
|
|
deliver = job.get("deliver", "local")
|
|
origin = _resolve_origin(job)
|
|
|
|
if deliver == "local":
|
|
return None
|
|
|
|
if deliver == "origin":
|
|
if not origin:
|
|
return None
|
|
return {
|
|
"platform": origin["platform"],
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
|
|
if ":" in deliver:
|
|
platform_name, chat_id = deliver.split(":", 1)
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": None,
|
|
}
|
|
|
|
platform_name = deliver
|
|
if origin and origin.get("platform") == platform_name:
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": str(origin["chat_id"]),
|
|
"thread_id": origin.get("thread_id"),
|
|
}
|
|
|
|
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
|
|
if not chat_id:
|
|
return None
|
|
|
|
return {
|
|
"platform": platform_name,
|
|
"chat_id": chat_id,
|
|
"thread_id": None,
|
|
}
|
|
|
|
|
|
def _deliver_result(job: dict, content: str) -> None:
|
|
"""
|
|
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
|
|
|
Uses the standalone platform send functions from send_message_tool so delivery
|
|
works whether or not the gateway is running.
|
|
"""
|
|
target = _resolve_delivery_target(job)
|
|
if not target:
|
|
if job.get("deliver", "local") != "local":
|
|
logger.warning(
|
|
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
|
|
job["id"],
|
|
job.get("deliver", "local"),
|
|
)
|
|
return
|
|
|
|
platform_name = target["platform"]
|
|
chat_id = target["chat_id"]
|
|
thread_id = target.get("thread_id")
|
|
|
|
from tools.send_message_tool import _send_to_platform
|
|
from gateway.config import load_gateway_config, Platform
|
|
|
|
platform_map = {
|
|
"telegram": Platform.TELEGRAM,
|
|
"discord": Platform.DISCORD,
|
|
"slack": Platform.SLACK,
|
|
"whatsapp": Platform.WHATSAPP,
|
|
"signal": Platform.SIGNAL,
|
|
"email": Platform.EMAIL,
|
|
"sms": Platform.SMS,
|
|
}
|
|
platform = platform_map.get(platform_name.lower())
|
|
if not platform:
|
|
logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name)
|
|
return
|
|
|
|
try:
|
|
config = load_gateway_config()
|
|
except Exception as e:
|
|
logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e)
|
|
return
|
|
|
|
pconfig = config.platforms.get(platform)
|
|
if not pconfig or not pconfig.enabled:
|
|
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
|
|
return
|
|
|
|
# Run the async send in a fresh event loop (safe from any thread)
|
|
try:
|
|
result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
|
|
except RuntimeError:
|
|
# asyncio.run() fails if there's already a running loop in this thread;
|
|
# spin up a new thread to avoid that.
|
|
import concurrent.futures
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
|
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
|
|
result = future.result(timeout=30)
|
|
except Exception as e:
|
|
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
|
|
return
|
|
|
|
if result and result.get("error"):
|
|
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
|
|
else:
|
|
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
|
# Mirror the delivered content into the target's gateway session
|
|
try:
|
|
from gateway.mirror import mirror_to_session
|
|
mirror_to_session(platform_name, chat_id, content, source_label="cron", thread_id=thread_id)
|
|
except Exception as e:
|
|
logger.warning("Job '%s': mirror_to_session failed: %s", job["id"], e)
|
|
|
|
|
|
def _build_job_prompt(job: dict) -> str:
|
|
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
|
prompt = job.get("prompt", "")
|
|
skills = job.get("skills")
|
|
if skills is None:
|
|
legacy = job.get("skill")
|
|
skills = [legacy] if legacy else []
|
|
|
|
skill_names = [str(name).strip() for name in skills if str(name).strip()]
|
|
if not skill_names:
|
|
return prompt
|
|
|
|
from tools.skills_tool import skill_view
|
|
|
|
parts = []
|
|
for skill_name in skill_names:
|
|
loaded = json.loads(skill_view(skill_name))
|
|
if not loaded.get("success"):
|
|
error = loaded.get("error") or f"Failed to load skill '{skill_name}'"
|
|
raise RuntimeError(error)
|
|
|
|
content = str(loaded.get("content") or "").strip()
|
|
if parts:
|
|
parts.append("")
|
|
parts.extend(
|
|
[
|
|
f'[SYSTEM: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]',
|
|
"",
|
|
content,
|
|
]
|
|
)
|
|
|
|
if prompt:
|
|
parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"])
|
|
return "\n".join(parts)
|
|
|
|
|
|
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|
"""
|
|
Execute a single cron job.
|
|
|
|
Returns:
|
|
Tuple of (success, full_output_doc, final_response, error_message)
|
|
"""
|
|
from run_agent import AIAgent
|
|
|
|
# Initialize SQLite session store so cron job messages are persisted
|
|
# and discoverable via session_search (same pattern as gateway/run.py).
|
|
_session_db = None
|
|
try:
|
|
from hermes_state import SessionDB
|
|
_session_db = SessionDB()
|
|
except Exception as e:
|
|
logger.debug("Job '%s': SQLite session store not available: %s", job.get("id", "?"), e)
|
|
|
|
job_id = job["id"]
|
|
job_name = job["name"]
|
|
prompt = _build_job_prompt(job)
|
|
origin = _resolve_origin(job)
|
|
|
|
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
|
logger.info("Prompt: %s", prompt[:100])
|
|
|
|
# Inject origin context so the agent's send_message tool knows the chat
|
|
if origin:
|
|
os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"]
|
|
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
|
|
if origin.get("chat_name"):
|
|
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
|
|
|
try:
|
|
# Re-read .env and config.yaml fresh every run so provider/key
|
|
# changes take effect without a gateway restart.
|
|
from dotenv import load_dotenv
|
|
try:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
|
|
|
|
delivery_target = _resolve_delivery_target(job)
|
|
if delivery_target:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"]
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"])
|
|
if delivery_target.get("thread_id") is not None:
|
|
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"])
|
|
|
|
model = job.get("model") or os.getenv("HERMES_MODEL") or "anthropic/claude-opus-4.6"
|
|
|
|
# Load config.yaml for model, reasoning, prefill, toolsets, provider routing
|
|
_cfg = {}
|
|
try:
|
|
import yaml
|
|
_cfg_path = str(_hermes_home / "config.yaml")
|
|
if os.path.exists(_cfg_path):
|
|
with open(_cfg_path) as _f:
|
|
_cfg = yaml.safe_load(_f) or {}
|
|
_model_cfg = _cfg.get("model", {})
|
|
if not job.get("model"):
|
|
if isinstance(_model_cfg, str):
|
|
model = _model_cfg
|
|
elif isinstance(_model_cfg, dict):
|
|
model = _model_cfg.get("default", model)
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
|
|
|
|
# Reasoning config from env or config.yaml
|
|
reasoning_config = None
|
|
effort = os.getenv("HERMES_REASONING_EFFORT", "")
|
|
if not effort:
|
|
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
|
|
if effort and effort.lower() != "none":
|
|
valid = ("xhigh", "high", "medium", "low", "minimal")
|
|
if effort.lower() in valid:
|
|
reasoning_config = {"enabled": True, "effort": effort.lower()}
|
|
elif effort.lower() == "none":
|
|
reasoning_config = {"enabled": False}
|
|
|
|
# Prefill messages from env or config.yaml
|
|
prefill_messages = None
|
|
prefill_file = os.getenv("HERMES_PREFILL_MESSAGES_FILE", "") or _cfg.get("prefill_messages_file", "")
|
|
if prefill_file:
|
|
import json as _json
|
|
pfpath = Path(prefill_file).expanduser()
|
|
if not pfpath.is_absolute():
|
|
pfpath = _hermes_home / pfpath
|
|
if pfpath.exists():
|
|
try:
|
|
with open(pfpath, "r", encoding="utf-8") as _pf:
|
|
prefill_messages = _json.load(_pf)
|
|
if not isinstance(prefill_messages, list):
|
|
prefill_messages = None
|
|
except Exception as e:
|
|
logger.warning("Job '%s': failed to parse prefill messages file '%s': %s", job_id, pfpath, e)
|
|
prefill_messages = None
|
|
|
|
# Max iterations
|
|
max_iterations = _cfg.get("agent", {}).get("max_turns") or _cfg.get("max_turns") or 90
|
|
|
|
# Provider routing
|
|
pr = _cfg.get("provider_routing", {})
|
|
smart_routing = _cfg.get("smart_model_routing", {}) or {}
|
|
|
|
from hermes_cli.runtime_provider import (
|
|
resolve_runtime_provider,
|
|
format_runtime_provider_error,
|
|
)
|
|
try:
|
|
runtime_kwargs = {
|
|
"requested": job.get("provider") or os.getenv("HERMES_INFERENCE_PROVIDER"),
|
|
}
|
|
if job.get("base_url"):
|
|
runtime_kwargs["explicit_base_url"] = job.get("base_url")
|
|
runtime = resolve_runtime_provider(**runtime_kwargs)
|
|
except Exception as exc:
|
|
message = format_runtime_provider_error(exc)
|
|
raise RuntimeError(message) from exc
|
|
|
|
from agent.smart_model_routing import resolve_turn_route
|
|
turn_route = resolve_turn_route(
|
|
prompt,
|
|
smart_routing,
|
|
{
|
|
"model": model,
|
|
"api_key": runtime.get("api_key"),
|
|
"base_url": runtime.get("base_url"),
|
|
"provider": runtime.get("provider"),
|
|
"api_mode": runtime.get("api_mode"),
|
|
},
|
|
)
|
|
|
|
agent = AIAgent(
|
|
model=turn_route["model"],
|
|
api_key=turn_route["runtime"].get("api_key"),
|
|
base_url=turn_route["runtime"].get("base_url"),
|
|
provider=turn_route["runtime"].get("provider"),
|
|
api_mode=turn_route["runtime"].get("api_mode"),
|
|
max_iterations=max_iterations,
|
|
reasoning_config=reasoning_config,
|
|
prefill_messages=prefill_messages,
|
|
providers_allowed=pr.get("only"),
|
|
providers_ignored=pr.get("ignore"),
|
|
providers_order=pr.get("order"),
|
|
provider_sort=pr.get("sort"),
|
|
disabled_toolsets=["cronjob"],
|
|
quiet_mode=True,
|
|
platform="cron",
|
|
session_id=f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}",
|
|
session_db=_session_db,
|
|
)
|
|
|
|
result = agent.run_conversation(prompt)
|
|
|
|
final_response = result.get("final_response", "")
|
|
if not final_response:
|
|
final_response = "(No response generated)"
|
|
|
|
output = f"""# Cron Job: {job_name}
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Response
|
|
|
|
{final_response}
|
|
"""
|
|
|
|
logger.info("Job '%s' completed successfully", job_name)
|
|
return True, output, final_response, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"{type(e).__name__}: {str(e)}"
|
|
logger.error("Job '%s' failed: %s", job_name, error_msg)
|
|
|
|
output = f"""# Cron Job: {job_name} (FAILED)
|
|
|
|
**Job ID:** {job_id}
|
|
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
|
**Schedule:** {job.get('schedule_display', 'N/A')}
|
|
|
|
## Prompt
|
|
|
|
{prompt}
|
|
|
|
## Error
|
|
|
|
```
|
|
{error_msg}
|
|
|
|
{traceback.format_exc()}
|
|
```
|
|
"""
|
|
return False, output, "", error_msg
|
|
|
|
finally:
|
|
# Clean up injected env vars so they don't leak to other jobs
|
|
for key in (
|
|
"HERMES_SESSION_PLATFORM",
|
|
"HERMES_SESSION_CHAT_ID",
|
|
"HERMES_SESSION_CHAT_NAME",
|
|
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
|
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
|
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
|
):
|
|
os.environ.pop(key, None)
|
|
if _session_db:
|
|
try:
|
|
_session_db.close()
|
|
except Exception as e:
|
|
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
|
|
|
|
|
def tick(verbose: bool = True) -> int:
|
|
"""
|
|
Check and run all due jobs.
|
|
|
|
Uses a file lock so only one tick runs at a time, even if the gateway's
|
|
in-process ticker and a standalone daemon or manual tick overlap.
|
|
|
|
Args:
|
|
verbose: Whether to print status messages
|
|
|
|
Returns:
|
|
Number of jobs executed (0 if another tick is already running)
|
|
"""
|
|
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Cross-platform file locking: fcntl on Unix, msvcrt on Windows
|
|
lock_fd = None
|
|
try:
|
|
lock_fd = open(_LOCK_FILE, "w")
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
elif msvcrt:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
|
|
except (OSError, IOError):
|
|
logger.debug("Tick skipped — another instance holds the lock")
|
|
if lock_fd is not None:
|
|
lock_fd.close()
|
|
return 0
|
|
|
|
try:
|
|
due_jobs = get_due_jobs()
|
|
|
|
if verbose and not due_jobs:
|
|
logger.info("%s - No jobs due", _hermes_now().strftime('%H:%M:%S'))
|
|
return 0
|
|
|
|
if verbose:
|
|
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
|
|
|
executed = 0
|
|
for job in due_jobs:
|
|
try:
|
|
success, output, final_response, error = run_job(job)
|
|
|
|
output_file = save_job_output(job["id"], output)
|
|
if verbose:
|
|
logger.info("Output saved to: %s", output_file)
|
|
|
|
# Deliver the final response to the origin/target chat
|
|
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
|
if deliver_content:
|
|
try:
|
|
_deliver_result(job, deliver_content)
|
|
except Exception as de:
|
|
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
|
|
|
mark_job_run(job["id"], success, error)
|
|
executed += 1
|
|
|
|
except Exception as e:
|
|
logger.error("Error processing job %s: %s", job['id'], e)
|
|
mark_job_run(job["id"], False, str(e))
|
|
|
|
return executed
|
|
finally:
|
|
if fcntl:
|
|
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
|
elif msvcrt:
|
|
try:
|
|
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
|
|
except (OSError, IOError):
|
|
pass
|
|
lock_fd.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tick(verbose=True)
|