From df5c61b37c80513badd3912faca81d83fdaf1208 Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sat, 14 Mar 2026 12:21:50 -0700 Subject: [PATCH] feat: compress cron management into one tool --- agent/display.py | 16 +- cli.py | 49 ++- cron/__init__.py | 6 + cron/jobs.py | 112 +++++- cron/scheduler.py | 31 +- gateway/run.py | 4 +- hermes_cli/tools_config.py | 2 +- model_tools.py | 2 +- run_agent.py | 2 +- tests/cron/test_jobs.py | 28 +- tests/cron/test_scheduler.py | 45 +++ tests/tools/test_cronjob_tools.py | 65 ++++ tools/__init__.py | 10 +- tools/cronjob_tools.py | 593 +++++++++++++----------------- toolsets.py | 6 +- 15 files changed, 574 insertions(+), 397 deletions(-) diff --git a/agent/display.py b/agent/display.py index 72b56318..07d35ea3 100644 --- a/agent/display.py +++ b/agent/display.py @@ -80,7 +80,7 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int = 40) -> str | N "image_generate": "prompt", "text_to_speech": "text", "vision_analyze": "question", "mixture_of_agents": "user_prompt", "skill_view": "name", "skills_list": "category", - "schedule_cronjob": "name", + "cronjob": "action", "execute_code": "code", "delegate_task": "goal", "clarify": "question", "skill_manage": "name", } @@ -513,12 +513,14 @@ def get_cute_tool_message( return _wrap(f"โ”Š ๐Ÿง  reason {_trunc(args.get('user_prompt', ''), 30)} {dur}") if tool_name == "send_message": return _wrap(f"โ”Š ๐Ÿ“จ send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}") - if tool_name == "schedule_cronjob": - return _wrap(f"โ”Š โฐ schedule {_trunc(args.get('name', args.get('prompt', 'task')), 30)} {dur}") - if tool_name == "list_cronjobs": - return _wrap(f"โ”Š โฐ jobs listing {dur}") - if tool_name == "remove_cronjob": - return _wrap(f"โ”Š โฐ remove job {args.get('job_id', '?')} {dur}") + if tool_name == "cronjob": + action = args.get("action", "?") + if action == "create": + label = args.get("name") or args.get("skill") or args.get("prompt", "task") + return _wrap(f"โ”Š โฐ cron create {_trunc(label, 24)} {dur}") + if action == "list": + return _wrap(f"โ”Š โฐ cron listing {dur}") + return _wrap(f"โ”Š โฐ cron {action} {args.get('job_id', '')} {dur}") if tool_name.startswith("rl_"): rl = { "rl_list_environments": "list envs", "rl_select_environment": f"select {args.get('name', '')}", diff --git a/cli.py b/cli.py index 094be22e..8d07d3b8 100755 --- a/cli.py +++ b/cli.py @@ -428,8 +428,8 @@ from hermes_cli.commands import COMMANDS, SlashCommandCompleter from hermes_cli import callbacks as _callbacks from toolsets import get_all_toolsets, get_toolset_info, resolve_toolset, validate_toolset -# Cron job system for scheduled tasks (CRUD only โ€” execution is handled by the gateway) -from cron import create_job, list_jobs, remove_job, get_job +# Cron job system for scheduled tasks (execution is handled by the gateway) +from cron import create_job, list_jobs, remove_job, get_job, pause_job, resume_job, trigger_job # Resource cleanup imports for safe shutdown (terminal VMs, browser sessions) from tools.terminal_tool import cleanup_all_environments as _cleanup_all_terminals @@ -2601,6 +2601,9 @@ class HermesCLI: print(" /cron - List scheduled jobs") print(" /cron list - List scheduled jobs") print(' /cron add - Add a new job') + print(" /cron pause - Pause a job") + print(" /cron resume - Resume a job") + print(" /cron run - Run a job on the next tick") print(" /cron remove - Remove a job") print() print(" Schedule formats:") @@ -2700,27 +2703,47 @@ class HermesCLI: except Exception as e: print(f"(x_x) Failed to create job: {e}") - elif subcommand == "remove" or subcommand == "rm" or subcommand == "delete": - # /cron remove + elif subcommand in {"pause", "resume", "run", "remove", "rm", "delete"}: if len(parts) < 3: - print("(._.) Usage: /cron remove ") + print(f"(._.) Usage: /cron {subcommand} ") return - + job_id = parts[2].strip() job = get_job(job_id) - + if not job: print(f"(._.) Job not found: {job_id}") return - - if remove_job(job_id): - print(f"(^_^)b Removed job: {job['name']} ({job_id})") + + if subcommand == "pause": + updated = pause_job(job_id, reason="paused from /cron") + if updated: + print(f"(^_^)b Paused job: {updated['name']} ({job_id})") + else: + print(f"(x_x) Failed to pause job: {job_id}") + elif subcommand == "resume": + updated = resume_job(job_id) + if updated: + print(f"(^_^)b Resumed job: {updated['name']} ({job_id})") + print(f" Next run: {updated.get('next_run_at')}") + else: + print(f"(x_x) Failed to resume job: {job_id}") + elif subcommand == "run": + updated = trigger_job(job_id) + if updated: + print(f"(^_^)b Triggered job: {updated['name']} ({job_id})") + print(" It will run on the next scheduler tick.") + else: + print(f"(x_x) Failed to trigger job: {job_id}") else: - print(f"(x_x) Failed to remove job: {job_id}") - + if remove_job(job_id): + print(f"(^_^)b Removed job: {job['name']} ({job_id})") + else: + print(f"(x_x) Failed to remove job: {job_id}") + else: print(f"(._.) Unknown cron command: {subcommand}") - print(" Available: list, add, remove") + print(" Available: list, add, pause, resume, run, remove") def _handle_skills_command(self, cmd: str): """Handle /skills slash command โ€” delegates to hermes_cli.skills_hub.""" diff --git a/cron/__init__.py b/cron/__init__.py index 6a8f3ecb..31d7bf8e 100644 --- a/cron/__init__.py +++ b/cron/__init__.py @@ -20,6 +20,9 @@ from cron.jobs import ( list_jobs, remove_job, update_job, + pause_job, + resume_job, + trigger_job, JOBS_FILE, ) from cron.scheduler import tick @@ -30,6 +33,9 @@ __all__ = [ "list_jobs", "remove_job", "update_job", + "pause_job", + "resume_job", + "trigger_job", "tick", "JOBS_FILE", ] diff --git a/cron/jobs.py b/cron/jobs.py index 186424c6..2fb5c95c 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -263,39 +263,43 @@ def create_job( name: Optional[str] = None, repeat: Optional[int] = None, deliver: Optional[str] = None, - origin: Optional[Dict[str, Any]] = None + origin: Optional[Dict[str, Any]] = None, + skill: Optional[str] = None, ) -> Dict[str, Any]: """ Create a new cron job. - + Args: - prompt: The prompt to run (must be self-contained) + prompt: The prompt to run (must be self-contained, or a task instruction when skill is set) schedule: Schedule string (see parse_schedule) name: Optional friendly name repeat: How many times to run (None = forever, 1 = once) deliver: Where to deliver output ("origin", "local", "telegram", etc.) origin: Source info where job was created (for "origin" delivery) - + skill: Optional skill name to load before running the prompt + Returns: The created job dict """ parsed_schedule = parse_schedule(schedule) - + # Auto-set repeat=1 for one-shot schedules if not specified if parsed_schedule["kind"] == "once" and repeat is None: repeat = 1 - + # Default delivery to origin if available, otherwise local if deliver is None: deliver = "origin" if origin else "local" - + job_id = uuid.uuid4().hex[:12] now = _hermes_now().isoformat() - + + label_source = skill or prompt or "cron job" job = { "id": job_id, - "name": name or prompt[:50].strip(), + "name": name or label_source[:50].strip(), "prompt": prompt, + "skill": skill, "schedule": parsed_schedule, "schedule_display": parsed_schedule.get("display", schedule), "repeat": { @@ -303,6 +307,9 @@ def create_job( "completed": 0 }, "enabled": True, + "state": "scheduled", + "paused_at": None, + "paused_reason": None, "created_at": now, "next_run_at": compute_next_run(parsed_schedule), "last_run_at": None, @@ -312,11 +319,11 @@ def create_job( "deliver": deliver, "origin": origin, # Tracks where job was created for "origin" delivery } - + jobs = load_jobs() jobs.append(job) save_jobs(jobs) - + return job @@ -338,16 +345,82 @@ def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]: def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Update a job by ID.""" + """Update a job by ID, refreshing derived schedule fields when needed.""" jobs = load_jobs() for i, job in enumerate(jobs): - if job["id"] == job_id: - jobs[i] = {**job, **updates} - save_jobs(jobs) - return jobs[i] + if job["id"] != job_id: + continue + + updated = {**job, **updates} + schedule_changed = "schedule" in updates + + if schedule_changed: + updated_schedule = updated["schedule"] + updated["schedule_display"] = updates.get( + "schedule_display", + updated_schedule.get("display", updated.get("schedule_display")), + ) + if updated.get("state") != "paused": + updated["next_run_at"] = compute_next_run(updated_schedule) + + if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"): + updated["next_run_at"] = compute_next_run(updated["schedule"]) + + jobs[i] = updated + save_jobs(jobs) + return jobs[i] return None +def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]: + """Pause a job without deleting it.""" + return update_job( + job_id, + { + "enabled": False, + "state": "paused", + "paused_at": _hermes_now().isoformat(), + "paused_reason": reason, + }, + ) + + +def resume_job(job_id: str) -> Optional[Dict[str, Any]]: + """Resume a paused job and compute the next future run from now.""" + job = get_job(job_id) + if not job: + return None + + next_run_at = compute_next_run(job["schedule"]) + return update_job( + job_id, + { + "enabled": True, + "state": "scheduled", + "paused_at": None, + "paused_reason": None, + "next_run_at": next_run_at, + }, + ) + + +def trigger_job(job_id: str) -> Optional[Dict[str, Any]]: + """Schedule a job to run on the next scheduler tick.""" + job = get_job(job_id) + if not job: + return None + return update_job( + job_id, + { + "enabled": True, + "state": "scheduled", + "paused_at": None, + "paused_reason": None, + "next_run_at": _hermes_now().isoformat(), + }, + ) + + def remove_job(job_id: str) -> bool: """Remove a job by ID.""" jobs = load_jobs() @@ -389,11 +462,14 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None): # Compute next run job["next_run_at"] = compute_next_run(job["schedule"], now) - + # If no next run (one-shot completed), disable if job["next_run_at"] is None: job["enabled"] = False - + job["state"] = "completed" + elif job.get("state") != "paused": + job["state"] = "scheduled" + save_jobs(jobs) return diff --git a/cron/scheduler.py b/cron/scheduler.py index 12d355cd..e65986b2 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -9,6 +9,7 @@ runs at a time if multiple processes overlap. """ import asyncio +import json import logging import os import sys @@ -147,6 +148,31 @@ def _deliver_result(job: dict, content: str) -> None: logger.warning("Job '%s': mirror_to_session failed: %s", job["id"], e) +def _build_job_prompt(job: dict) -> str: + """Build the effective prompt for a cron job, optionally loading a skill first.""" + prompt = job.get("prompt", "") + skill_name = job.get("skill") + if not skill_name: + return prompt + + from tools.skills_tool import skill_view + + loaded = json.loads(skill_view(skill_name)) + if not loaded.get("success"): + error = loaded.get("error") or f"Failed to load skill '{skill_name}'" + raise RuntimeError(error) + + content = str(loaded.get("content") or "").strip() + parts = [ + f'[SYSTEM: The user has invoked the "{skill_name}" skill, indicating they want you to follow its instructions. The full skill content is loaded below.]', + "", + content, + ] + if prompt: + parts.extend(["", f"The user has provided the following instruction alongside the skill invocation: {prompt}"]) + return "\n".join(parts) + + def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: """ Execute a single cron job. @@ -167,9 +193,9 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: job_id = job["id"] job_name = job["name"] - prompt = job["prompt"] + prompt = _build_job_prompt(job) origin = _resolve_origin(job) - + logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) @@ -268,6 +294,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: providers_ignored=pr.get("ignore"), providers_order=pr.get("order"), provider_sort=pr.get("sort"), + disabled_toolsets=["cronjob"], quiet_mode=True, platform="cron", session_id=f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}", diff --git a/gateway/run.py b/gateway/run.py index 5ab74972..5b889501 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3694,9 +3694,7 @@ class GatewayRunner: "memory": "๐Ÿง ", "session_search": "๐Ÿ”", "send_message": "๐Ÿ“จ", - "schedule_cronjob": "โฐ", - "list_cronjobs": "โฐ", - "remove_cronjob": "โฐ", + "cronjob": "โฐ", "execute_code": "๐Ÿ", "delegate_task": "๐Ÿ”€", "clarify": "โ“", diff --git a/hermes_cli/tools_config.py b/hermes_cli/tools_config.py index cb9b9965..3ae86efd 100644 --- a/hermes_cli/tools_config.py +++ b/hermes_cli/tools_config.py @@ -91,7 +91,7 @@ CONFIGURABLE_TOOLSETS = [ ("session_search", "๐Ÿ”Ž Session Search", "search past conversations"), ("clarify", "โ“ Clarifying Questions", "clarify"), ("delegation", "๐Ÿ‘ฅ Task Delegation", "delegate_task"), - ("cronjob", "โฐ Cron Jobs", "schedule, list, remove"), + ("cronjob", "โฐ Cron Jobs", "create, list, update, pause, resume, remove, run"), ("rl", "๐Ÿงช RL Training", "Tinker-Atropos training tools"), ("homeassistant", "๐Ÿ  Home Assistant", "smart home device control"), ] diff --git a/model_tools.py b/model_tools.py index 2139eb08..7ef2df10 100644 --- a/model_tools.py +++ b/model_tools.py @@ -144,7 +144,7 @@ _LEGACY_TOOLSET_MAP = { "browser_press", "browser_close", "browser_get_images", "browser_vision" ], - "cronjob_tools": ["schedule_cronjob", "list_cronjobs", "remove_cronjob"], + "cronjob_tools": ["cronjob"], "rl_tools": [ "rl_list_environments", "rl_select_environment", "rl_get_current_config", "rl_edit_config", diff --git a/run_agent.py b/run_agent.py index bdf04965..ec51c37c 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3804,7 +3804,7 @@ class AIAgent: 'image_generate': '๐ŸŽจ', 'text_to_speech': '๐Ÿ”Š', 'vision_analyze': '๐Ÿ‘๏ธ', 'mixture_of_agents': '๐Ÿง ', 'skills_list': '๐Ÿ“š', 'skill_view': '๐Ÿ“š', - 'schedule_cronjob': 'โฐ', 'list_cronjobs': 'โฐ', 'remove_cronjob': 'โฐ', + 'cronjob': 'โฐ', 'send_message': '๐Ÿ“จ', 'todo': '๐Ÿ“‹', 'memory': '๐Ÿง ', 'session_search': '๐Ÿ”', 'clarify': 'โ“', 'execute_code': '๐Ÿ', 'delegate_task': '๐Ÿ”€', } diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index b39342ce..802a744f 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -16,6 +16,8 @@ from cron.jobs import ( get_job, list_jobs, update_job, + pause_job, + resume_job, remove_job, mark_job_run, get_due_jobs, @@ -233,14 +235,18 @@ class TestUpdateJob: job = create_job(prompt="Daily report", schedule="every 1h") assert job["schedule"]["kind"] == "interval" assert job["schedule"]["minutes"] == 60 + old_next_run = job["next_run_at"] new_schedule = parse_schedule("every 2h") - updated = update_job(job["id"], {"schedule": new_schedule}) + updated = update_job(job["id"], {"schedule": new_schedule, "schedule_display": new_schedule["display"]}) assert updated is not None assert updated["schedule"]["kind"] == "interval" assert updated["schedule"]["minutes"] == 120 + assert updated["schedule_display"] == "every 120m" + assert updated["next_run_at"] != old_next_run # Verify persisted to disk fetched = get_job(job["id"]) assert fetched["schedule"]["minutes"] == 120 + assert fetched["schedule_display"] == "every 120m" def test_update_enable_disable(self, tmp_cron_dir): job = create_job(prompt="Toggle me", schedule="every 1h") @@ -255,6 +261,26 @@ class TestUpdateJob: assert result is None +class TestPauseResumeJob: + def test_pause_sets_state(self, tmp_cron_dir): + job = create_job(prompt="Pause me", schedule="every 1h") + paused = pause_job(job["id"], reason="user paused") + assert paused is not None + assert paused["enabled"] is False + assert paused["state"] == "paused" + assert paused["paused_reason"] == "user paused" + + def test_resume_reenables_job(self, tmp_cron_dir): + job = create_job(prompt="Resume me", schedule="every 1h") + pause_job(job["id"], reason="user paused") + resumed = resume_job(job["id"]) + assert resumed is not None + assert resumed["enabled"] is True + assert resumed["state"] == "scheduled" + assert resumed["paused_at"] is None + assert resumed["paused_reason"] is None + + class TestMarkJobRun: def test_increments_completed(self, tmp_cron_dir): job = create_job(prompt="Test", schedule="every 1h") diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 4314b5ac..0b6a0838 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -203,3 +203,48 @@ class TestRunJobConfigLogging: assert any("failed to parse prefill messages" in r.message for r in caplog.records), \ f"Expected 'failed to parse prefill messages' warning in logs, got: {[r.message for r in caplog.records]}" + + +class TestRunJobSkillBacked: + def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path): + job = { + "id": "skill-job", + "name": "skill test", + "prompt": "Check the feeds and summarize anything new.", + "skill": "blogwatcher", + } + + fake_db = MagicMock() + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("tools.skills_tool.skill_view", return_value=json.dumps({"success": True, "content": "# Blogwatcher\nFollow this skill."})), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.return_value = {"final_response": "ok"} + mock_agent_cls.return_value = mock_agent + + success, output, final_response, error = run_job(job) + + assert success is True + assert error is None + assert final_response == "ok" + + kwargs = mock_agent_cls.call_args.kwargs + assert "cronjob" in (kwargs["disabled_toolsets"] or []) + + prompt_arg = mock_agent.run_conversation.call_args.args[0] + assert "blogwatcher" in prompt_arg + assert "Follow this skill" in prompt_arg + assert "Check the feeds and summarize anything new." in prompt_arg diff --git a/tests/tools/test_cronjob_tools.py b/tests/tools/test_cronjob_tools.py index 500087d5..93b2430e 100644 --- a/tests/tools/test_cronjob_tools.py +++ b/tests/tools/test_cronjob_tools.py @@ -6,6 +6,7 @@ from pathlib import Path from tools.cronjob_tools import ( _scan_cron_prompt, + cronjob, schedule_cronjob, list_cronjobs, remove_cronjob, @@ -180,3 +181,67 @@ class TestRemoveCronjob: result = json.loads(remove_cronjob("nonexistent_id")) assert result["success"] is False assert "not found" in result["error"].lower() + + +class TestUnifiedCronjobTool: + @pytest.fixture(autouse=True) + def _setup_cron_dir(self, tmp_path, monkeypatch): + monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path / "cron") + monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "cron" / "jobs.json") + monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "cron" / "output") + + def test_create_and_list(self): + created = json.loads( + cronjob( + action="create", + prompt="Check server status", + schedule="every 1h", + name="Server Check", + ) + ) + assert created["success"] is True + + listing = json.loads(cronjob(action="list")) + assert listing["success"] is True + assert listing["count"] == 1 + assert listing["jobs"][0]["name"] == "Server Check" + assert listing["jobs"][0]["state"] == "scheduled" + + def test_pause_and_resume(self): + created = json.loads(cronjob(action="create", prompt="Check", schedule="every 1h")) + job_id = created["job_id"] + + paused = json.loads(cronjob(action="pause", job_id=job_id)) + assert paused["success"] is True + assert paused["job"]["state"] == "paused" + + resumed = json.loads(cronjob(action="resume", job_id=job_id)) + assert resumed["success"] is True + assert resumed["job"]["state"] == "scheduled" + + def test_update_schedule_recomputes_display(self): + created = json.loads(cronjob(action="create", prompt="Check", schedule="every 1h")) + job_id = created["job_id"] + + updated = json.loads( + cronjob(action="update", job_id=job_id, schedule="every 2h", name="New Name") + ) + assert updated["success"] is True + assert updated["job"]["name"] == "New Name" + assert updated["job"]["schedule"] == "every 120m" + + def test_create_skill_backed_job(self): + result = json.loads( + cronjob( + action="create", + skill="blogwatcher", + prompt="Check the configured feeds and summarize anything new.", + schedule="every 1h", + name="Morning feeds", + ) + ) + assert result["success"] is True + assert result["skill"] == "blogwatcher" + + listing = json.loads(cronjob(action="list")) + assert listing["jobs"][0]["skill"] == "blogwatcher" diff --git a/tools/__init__.py b/tools/__init__.py index 04eabd02..6c02865d 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -84,14 +84,13 @@ from .browser_tool import ( # Cronjob management tools (CLI-only, hermes-cli toolset) from .cronjob_tools import ( + cronjob, schedule_cronjob, list_cronjobs, remove_cronjob, check_cronjob_requirements, get_cronjob_tool_definitions, - SCHEDULE_CRONJOB_SCHEMA, - LIST_CRONJOBS_SCHEMA, - REMOVE_CRONJOB_SCHEMA + CRONJOB_SCHEMA, ) # RL Training tools (Tinker-Atropos) @@ -211,14 +210,13 @@ __all__ = [ 'check_browser_requirements', 'BROWSER_TOOL_SCHEMAS', # Cronjob management tools (CLI-only) + 'cronjob', 'schedule_cronjob', 'list_cronjobs', 'remove_cronjob', 'check_cronjob_requirements', 'get_cronjob_tool_definitions', - 'SCHEDULE_CRONJOB_SCHEMA', - 'LIST_CRONJOBS_SCHEMA', - 'REMOVE_CRONJOB_SCHEMA', + 'CRONJOB_SCHEMA', # RL Training tools 'rl_list_environments', 'rl_select_environment', diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index bdfa58d6..35ef1e63 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -1,24 +1,31 @@ """ Cron job management tools for Hermes Agent. -These tools allow the agent to schedule, list, and remove automated tasks. -Only available when running via CLI (hermes-cli toolset). - -IMPORTANT: Cronjobs run in isolated sessions with NO prior context. -The prompt must contain ALL necessary information. +Expose a single compressed action-oriented tool to avoid schema/context bloat. +Compatibility wrappers remain for direct Python callers and legacy tests. """ import json import os import re -from typing import Optional - -# Import from cron module (will be available when properly installed) import sys from pathlib import Path +from typing import Any, Dict, Optional + +# Import from cron module (will be available when properly installed) sys.path.insert(0, str(Path(__file__).parent.parent)) -from cron.jobs import create_job, get_job, list_jobs, remove_job +from cron.jobs import ( + create_job, + get_job, + list_jobs, + parse_schedule, + pause_job, + remove_job, + resume_job, + trigger_job, + update_job, +) # --------------------------------------------------------------------------- @@ -56,9 +63,183 @@ def _scan_cron_prompt(prompt: str) -> str: return "" -# ============================================================================= -# Tool: schedule_cronjob -# ============================================================================= +def _origin_from_env() -> Optional[Dict[str, str]]: + origin_platform = os.getenv("HERMES_SESSION_PLATFORM") + origin_chat_id = os.getenv("HERMES_SESSION_CHAT_ID") + if origin_platform and origin_chat_id: + return { + "platform": origin_platform, + "chat_id": origin_chat_id, + "chat_name": os.getenv("HERMES_SESSION_CHAT_NAME"), + } + return None + + +def _repeat_display(job: Dict[str, Any]) -> str: + times = (job.get("repeat") or {}).get("times") + completed = (job.get("repeat") or {}).get("completed", 0) + if times is None: + return "forever" + if times == 1: + return "once" if completed == 0 else "1/1" + return f"{completed}/{times}" if completed else f"{times} times" + + +def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: + prompt = job.get("prompt", "") + return { + "job_id": job["id"], + "name": job["name"], + "skill": job.get("skill"), + "prompt_preview": prompt[:100] + "..." if len(prompt) > 100 else prompt, + "schedule": job.get("schedule_display"), + "repeat": _repeat_display(job), + "deliver": job.get("deliver", "local"), + "next_run_at": job.get("next_run_at"), + "last_run_at": job.get("last_run_at"), + "last_status": job.get("last_status"), + "enabled": job.get("enabled", True), + "state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"), + "paused_at": job.get("paused_at"), + "paused_reason": job.get("paused_reason"), + } + + +def cronjob( + action: str, + job_id: Optional[str] = None, + prompt: Optional[str] = None, + schedule: Optional[str] = None, + name: Optional[str] = None, + repeat: Optional[int] = None, + deliver: Optional[str] = None, + include_disabled: bool = False, + skill: Optional[str] = None, + reason: Optional[str] = None, + task_id: str = None, +) -> str: + """Unified cron job management tool.""" + del task_id # unused but kept for handler signature compatibility + + try: + normalized = (action or "").strip().lower() + + if normalized == "create": + if not schedule: + return json.dumps({"success": False, "error": "schedule is required for create"}, indent=2) + if not prompt and not skill: + return json.dumps({"success": False, "error": "create requires either prompt or skill"}, indent=2) + if prompt: + scan_error = _scan_cron_prompt(prompt) + if scan_error: + return json.dumps({"success": False, "error": scan_error}, indent=2) + + job = create_job( + prompt=prompt or "", + schedule=schedule, + name=name, + repeat=repeat, + deliver=deliver, + origin=_origin_from_env(), + skill=skill, + ) + return json.dumps( + { + "success": True, + "job_id": job["id"], + "name": job["name"], + "skill": job.get("skill"), + "schedule": job["schedule_display"], + "repeat": _repeat_display(job), + "deliver": job.get("deliver", "local"), + "next_run_at": job["next_run_at"], + "job": _format_job(job), + "message": f"Cron job '{job['name']}' created.", + }, + indent=2, + ) + + if normalized == "list": + jobs = [_format_job(job) for job in list_jobs(include_disabled=include_disabled)] + return json.dumps({"success": True, "count": len(jobs), "jobs": jobs}, indent=2) + + if not job_id: + return json.dumps({"success": False, "error": f"job_id is required for action '{normalized}'"}, indent=2) + + job = get_job(job_id) + if not job: + return json.dumps( + {"success": False, "error": f"Job with ID '{job_id}' not found. Use cronjob(action='list') to inspect jobs."}, + indent=2, + ) + + if normalized == "remove": + removed = remove_job(job_id) + if not removed: + return json.dumps({"success": False, "error": f"Failed to remove job '{job_id}'"}, indent=2) + return json.dumps( + { + "success": True, + "message": f"Cron job '{job['name']}' removed.", + "removed_job": { + "id": job_id, + "name": job["name"], + "schedule": job.get("schedule_display"), + }, + }, + indent=2, + ) + + if normalized == "pause": + updated = pause_job(job_id, reason=reason) + return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + + if normalized == "resume": + updated = resume_job(job_id) + return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + + if normalized in {"run", "run_now", "trigger"}: + updated = trigger_job(job_id) + return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + + if normalized == "update": + updates: Dict[str, Any] = {} + if prompt is not None: + scan_error = _scan_cron_prompt(prompt) + if scan_error: + return json.dumps({"success": False, "error": scan_error}, indent=2) + updates["prompt"] = prompt + if name is not None: + updates["name"] = name + if deliver is not None: + updates["deliver"] = deliver + if skill is not None: + updates["skill"] = skill + if repeat is not None: + repeat_state = dict(job.get("repeat") or {}) + repeat_state["times"] = repeat + updates["repeat"] = repeat_state + if schedule is not None: + parsed_schedule = parse_schedule(schedule) + updates["schedule"] = parsed_schedule + updates["schedule_display"] = parsed_schedule.get("display", schedule) + if job.get("state") != "paused": + updates["state"] = "scheduled" + updates["enabled"] = True + if not updates: + return json.dumps({"success": False, "error": "No updates provided."}, indent=2) + updated = update_job(job_id, updates) + return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + + return json.dumps({"success": False, "error": f"Unknown cron action '{action}'"}, indent=2) + + except Exception as e: + return json.dumps({"success": False, "error": str(e)}, indent=2) + + +# --------------------------------------------------------------------------- +# Compatibility wrappers +# --------------------------------------------------------------------------- def schedule_cronjob( prompt: str, @@ -66,326 +247,92 @@ def schedule_cronjob( name: Optional[str] = None, repeat: Optional[int] = None, deliver: Optional[str] = None, - task_id: str = None + task_id: str = None, ) -> str: - """ - Schedule an automated task to run the agent on a schedule. - - IMPORTANT: When the cronjob runs, it starts a COMPLETELY FRESH session. - The agent will have NO memory of this conversation or any prior context. - Therefore, the prompt MUST contain ALL necessary information: - - Full context of what needs to be done - - Specific file paths, URLs, or identifiers - - Clear success criteria - - Any relevant background information - - BAD prompt: "Check on that server issue" - GOOD prompt: "SSH into server 192.168.1.100 as user 'deploy', check if nginx - is running with 'systemctl status nginx', and verify the site - https://example.com returns HTTP 200. Report any issues found." - - Args: - prompt: Complete, self-contained instructions for the future agent. - Must include ALL context needed - the agent won't remember anything. - schedule: When to run. Either: - - Duration for one-shot: "30m", "2h", "1d" (runs once) - - Interval: "every 30m", "every 2h" (recurring) - - Cron expression: "0 9 * * *" (daily at 9am) - - ISO timestamp: "2026-02-03T14:00:00" (one-shot at specific time) - name: Optional human-friendly name for the job (for listing/management) - repeat: How many times to run. Omit for default behavior: - - One-shot schedules default to repeat=1 (run once) - - Intervals/cron default to forever - - Set repeat=5 to run 5 times then auto-delete - deliver: Where to send the output. Options: - - "origin": Back to where this job was created (default) - - "local": Save to local files only (~/.hermes/cron/output/) - - "telegram": Send to Telegram home channel - - "discord": Send to Discord home channel - - "signal": Send to Signal home channel - - "telegram:123456": Send to specific chat ID - - "signal:+15551234567": Send to specific Signal number - - Returns: - JSON with job_id, next_run time, and confirmation - """ - # Scan prompt for critical threats before scheduling - scan_error = _scan_cron_prompt(prompt) - if scan_error: - return json.dumps({"success": False, "error": scan_error}, indent=2) - - # Get origin info from environment if available - origin = None - origin_platform = os.getenv("HERMES_SESSION_PLATFORM") - origin_chat_id = os.getenv("HERMES_SESSION_CHAT_ID") - if origin_platform and origin_chat_id: - origin = { - "platform": origin_platform, - "chat_id": origin_chat_id, - "chat_name": os.getenv("HERMES_SESSION_CHAT_NAME"), - } - - try: - job = create_job( - prompt=prompt, - schedule=schedule, - name=name, - repeat=repeat, - deliver=deliver, - origin=origin - ) - - # Format repeat info for display - times = job["repeat"].get("times") - if times is None: - repeat_display = "forever" - elif times == 1: - repeat_display = "once" - else: - repeat_display = f"{times} times" - - return json.dumps({ - "success": True, - "job_id": job["id"], - "name": job["name"], - "schedule": job["schedule_display"], - "repeat": repeat_display, - "deliver": job.get("deliver", "local"), - "next_run_at": job["next_run_at"], - "message": f"Cronjob '{job['name']}' created. It will run {repeat_display}, deliver to {job.get('deliver', 'local')}, next at {job['next_run_at']}." - }, indent=2) - - except Exception as e: - return json.dumps({ - "success": False, - "error": str(e) - }, indent=2) + return cronjob( + action="create", + prompt=prompt, + schedule=schedule, + name=name, + repeat=repeat, + deliver=deliver, + task_id=task_id, + ) -SCHEDULE_CRONJOB_SCHEMA = { - "name": "schedule_cronjob", - "description": """Schedule an automated task to run the agent on a schedule. +def list_cronjobs(include_disabled: bool = False, task_id: str = None) -> str: + return cronjob(action="list", include_disabled=include_disabled, task_id=task_id) -โš ๏ธ CRITICAL: The cronjob runs in a FRESH SESSION with NO CONTEXT from this conversation. -The prompt must be COMPLETELY SELF-CONTAINED with ALL necessary information including: -- Full context and background -- Specific file paths, URLs, server addresses -- Clear instructions and success criteria -- Any credentials or configuration details -The future agent will NOT remember anything from the current conversation. +def remove_cronjob(job_id: str, task_id: str = None) -> str: + return cronjob(action="remove", job_id=job_id, task_id=task_id) -SCHEDULE FORMATS: -- One-shot: "30m", "2h", "1d" (runs once after delay) -- Interval: "every 30m", "every 2h" (recurring) -- Cron: "0 9 * * *" (cron expression for precise scheduling) -- Timestamp: "2026-02-03T14:00:00" (specific date/time) -REPEAT BEHAVIOR: -- One-shot schedules: run once by default -- Intervals/cron: run forever by default -- Set repeat=N to run exactly N times then auto-delete +CRONJOB_SCHEMA = { + "name": "cronjob", + "description": """Manage scheduled cron jobs with a single compressed tool. -DELIVERY OPTIONS (where output goes): -- "origin": Back to current chat (default if in messaging platform) -- "local": Save to local files only (default if in CLI) -- "telegram": Send to Telegram home channel -- "discord": Send to Discord home channel -- "telegram:123456": Send to specific chat (if user provides ID) +Use action='create' to schedule a new job from a prompt or a skill. +Use action='list' to inspect jobs. +Use action='update', 'pause', 'resume', 'remove', or 'run' to manage an existing job. -NOTE: The agent's final response is auto-delivered to the target โ€” do NOT use -send_message in the prompt. Just have the agent compose its response normally. +Jobs run in a fresh session with no current-chat context, so prompts must be self-contained. +If skill is provided on create, the future cron run loads that skill first, then follows the prompt as the task instruction. -Use for: reminders, periodic checks, scheduled reports, automated maintenance.""", +Important safety rule: cron-run sessions should not recursively schedule more cron jobs.""", "parameters": { "type": "object", "properties": { + "action": { + "type": "string", + "description": "One of: create, list, update, pause, resume, remove, run" + }, + "job_id": { + "type": "string", + "description": "Required for update/pause/resume/remove/run" + }, "prompt": { "type": "string", - "description": "Complete, self-contained instructions. Must include ALL context - the future agent will have NO memory of this conversation." + "description": "For create: the full self-contained prompt. If skill is also provided, this becomes the task instruction paired with that skill." }, "schedule": { "type": "string", - "description": "When to run: '30m' (once in 30min), 'every 30m' (recurring), '0 9 * * *' (cron), or ISO timestamp" + "description": "For create/update: '30m', 'every 2h', '0 9 * * *', or ISO timestamp" }, "name": { "type": "string", - "description": "Optional human-friendly name for the job" + "description": "Optional human-friendly name" }, "repeat": { "type": "integer", - "description": "How many times to run. Omit for default (once for one-shot, forever for recurring). Set to N for exactly N runs." + "description": "Optional repeat count. Omit for defaults (once for one-shot, forever for recurring)." }, "deliver": { "type": "string", - "description": "Where to send output: 'origin' (back to this chat), 'local' (files only), 'telegram', 'discord', 'signal', or 'platform:chat_id'" - } - }, - "required": ["prompt", "schedule"] - } -} - - -# ============================================================================= -# Tool: list_cronjobs -# ============================================================================= - -def list_cronjobs(include_disabled: bool = False, task_id: str = None) -> str: - """ - List all scheduled cronjobs. - - Returns information about each job including: - - Job ID (needed for removal) - - Name - - Schedule (human-readable) - - Repeat status (completed/total or 'forever') - - Next scheduled run time - - Last run time and status (if any) - - Args: - include_disabled: Whether to include disabled/completed jobs - - Returns: - JSON array of all scheduled jobs - """ - try: - jobs = list_jobs(include_disabled=include_disabled) - - formatted_jobs = [] - for job in jobs: - # Format repeat status - times = job["repeat"].get("times") - completed = job["repeat"].get("completed", 0) - if times is None: - repeat_status = "forever" - else: - repeat_status = f"{completed}/{times}" - - formatted_jobs.append({ - "job_id": job["id"], - "name": job["name"], - "prompt_preview": job["prompt"][:100] + "..." if len(job["prompt"]) > 100 else job["prompt"], - "schedule": job["schedule_display"], - "repeat": repeat_status, - "deliver": job.get("deliver", "local"), - "next_run_at": job.get("next_run_at"), - "last_run_at": job.get("last_run_at"), - "last_status": job.get("last_status"), - "enabled": job.get("enabled", True) - }) - - return json.dumps({ - "success": True, - "count": len(formatted_jobs), - "jobs": formatted_jobs - }, indent=2) - - except Exception as e: - return json.dumps({ - "success": False, - "error": str(e) - }, indent=2) - - -LIST_CRONJOBS_SCHEMA = { - "name": "list_cronjobs", - "description": """List all scheduled cronjobs with their IDs, schedules, and status. - -Use this to: -- See what jobs are currently scheduled -- Find job IDs for removal with remove_cronjob -- Check job status and next run times - -Returns job_id, name, schedule, repeat status, next/last run times.""", - "parameters": { - "type": "object", - "properties": { + "description": "Delivery target: origin, local, telegram, discord, signal, or platform:chat_id" + }, "include_disabled": { "type": "boolean", - "description": "Include disabled/completed jobs in the list (default: false)" - } - }, - "required": [] - } -} - - -# ============================================================================= -# Tool: remove_cronjob -# ============================================================================= - -def remove_cronjob(job_id: str, task_id: str = None) -> str: - """ - Remove a scheduled cronjob by its ID. - - Use list_cronjobs first to find the job_id of the job you want to remove. - - Args: - job_id: The ID of the job to remove (from list_cronjobs output) - - Returns: - JSON confirmation of removal - """ - try: - job = get_job(job_id) - if not job: - return json.dumps({ - "success": False, - "error": f"Job with ID '{job_id}' not found. Use list_cronjobs to see available jobs." - }, indent=2) - - removed = remove_job(job_id) - if removed: - return json.dumps({ - "success": True, - "message": f"Cronjob '{job['name']}' (ID: {job_id}) has been removed.", - "removed_job": { - "id": job_id, - "name": job["name"], - "schedule": job["schedule_display"] - } - }, indent=2) - else: - return json.dumps({ - "success": False, - "error": f"Failed to remove job '{job_id}'" - }, indent=2) - - except Exception as e: - return json.dumps({ - "success": False, - "error": str(e) - }, indent=2) - - -REMOVE_CRONJOB_SCHEMA = { - "name": "remove_cronjob", - "description": """Remove a scheduled cronjob by its ID. - -Use list_cronjobs first to find the job_id of the job you want to remove. -Jobs that have completed their repeat count are auto-removed, but you can -use this to cancel a job before it completes.""", - "parameters": { - "type": "object", - "properties": { - "job_id": { + "description": "For list: include paused/completed jobs" + }, + "skill": { "type": "string", - "description": "The ID of the cronjob to remove (from list_cronjobs output)" + "description": "Optional skill name to load before executing the cron prompt" + }, + "reason": { + "type": "string", + "description": "Optional pause reason" } }, - "required": ["job_id"] + "required": ["action"] } } -# ============================================================================= -# Requirements check -# ============================================================================= - def check_cronjob_requirements() -> bool: """ Check if cronjob tools can be used. - + Available in interactive CLI mode and gateway/messaging platforms. Cronjobs are server-side scheduled tasks so they work from any interface. """ @@ -396,66 +343,30 @@ def check_cronjob_requirements() -> bool: ) -# ============================================================================= -# Exports -# ============================================================================= - def get_cronjob_tool_definitions(): """Return tool definitions for cronjob management.""" - return [ - SCHEDULE_CRONJOB_SCHEMA, - LIST_CRONJOBS_SCHEMA, - REMOVE_CRONJOB_SCHEMA - ] - - -# For direct testing -if __name__ == "__main__": - # Test the tools - print("Testing schedule_cronjob:") - result = schedule_cronjob( - prompt="Test prompt for cron job", - schedule="5m", - name="Test Job" - ) - print(result) - - print("\nTesting list_cronjobs:") - result = list_cronjobs() - print(result) + return [CRONJOB_SCHEMA] # --- Registry --- from tools.registry import registry registry.register( - name="schedule_cronjob", + name="cronjob", toolset="cronjob", - schema=SCHEDULE_CRONJOB_SCHEMA, - handler=lambda args, **kw: schedule_cronjob( - prompt=args.get("prompt", ""), - schedule=args.get("schedule", ""), + schema=CRONJOB_SCHEMA, + handler=lambda args, **kw: cronjob( + action=args.get("action", ""), + job_id=args.get("job_id"), + prompt=args.get("prompt"), + schedule=args.get("schedule"), name=args.get("name"), repeat=args.get("repeat"), deliver=args.get("deliver"), - task_id=kw.get("task_id")), - check_fn=check_cronjob_requirements, -) -registry.register( - name="list_cronjobs", - toolset="cronjob", - schema=LIST_CRONJOBS_SCHEMA, - handler=lambda args, **kw: list_cronjobs( include_disabled=args.get("include_disabled", False), - task_id=kw.get("task_id")), - check_fn=check_cronjob_requirements, -) -registry.register( - name="remove_cronjob", - toolset="cronjob", - schema=REMOVE_CRONJOB_SCHEMA, - handler=lambda args, **kw: remove_cronjob( - job_id=args.get("job_id", ""), - task_id=kw.get("task_id")), + skill=args.get("skill"), + reason=args.get("reason"), + task_id=kw.get("task_id"), + ), check_fn=check_cronjob_requirements, ) diff --git a/toolsets.py b/toolsets.py index 221ff2ca..cd811d37 100644 --- a/toolsets.py +++ b/toolsets.py @@ -57,7 +57,7 @@ _HERMES_CORE_TOOLS = [ # Code execution + delegation "execute_code", "delegate_task", # Cronjob management - "schedule_cronjob", "list_cronjobs", "remove_cronjob", + "cronjob", # Cross-platform messaging (gated on gateway running via check_fn) "send_message", # Honcho memory tools (gated on honcho being active via check_fn) @@ -125,8 +125,8 @@ TOOLSETS = { }, "cronjob": { - "description": "Cronjob management tools - schedule, list, and remove automated tasks", - "tools": ["schedule_cronjob", "list_cronjobs", "remove_cronjob"], + "description": "Cronjob management tool - create, list, update, pause, resume, remove, and trigger scheduled tasks", + "tools": ["cronjob"], "includes": [] },