Implement cron job management system for scheduled tasks (similar to OpenAI's Pulse but the AI can also schedule jobs)
- Introduced a new cron job system allowing users to schedule automated tasks via the CLI, supporting one-time reminders and recurring jobs.
- Added commands for managing cron jobs: `/cron` to list jobs, `/cron add` to create new jobs, and `/cron remove` to delete jobs.
- Implemented job storage in `~/.hermes/cron/jobs.json` with output saved to `~/.hermes/cron/output/{job_id}/{timestamp}.md`.
- Enhanced the CLI and README documentation to include detailed usage instructions and examples for cron job management.
- Integrated cron job tools into the hermes-cli toolset, ensuring they are only available in interactive CLI mode.
- Added support for cron expression parsing with the `croniter` package, enabling flexible scheduling options.
This commit is contained in:
36
cron/__init__.py
Normal file
36
cron/__init__.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""
|
||||
Cron job scheduling system for Hermes Agent.
|
||||
|
||||
This module provides scheduled task execution, allowing the agent to:
|
||||
- Run automated tasks on schedules (cron expressions, intervals, one-shot)
|
||||
- Self-schedule reminders and follow-up tasks
|
||||
- Execute tasks in isolated sessions (no prior context)
|
||||
|
||||
Usage:
|
||||
# Run due jobs (for system cron integration)
|
||||
python -c "from cron import tick; tick()"
|
||||
|
||||
# Or via CLI
|
||||
python cli.py --cron-daemon
|
||||
"""
|
||||
|
||||
from cron.jobs import (
|
||||
create_job,
|
||||
get_job,
|
||||
list_jobs,
|
||||
remove_job,
|
||||
update_job,
|
||||
JOBS_FILE,
|
||||
)
|
||||
from cron.scheduler import tick, run_daemon
|
||||
|
||||
__all__ = [
|
||||
"create_job",
|
||||
"get_job",
|
||||
"list_jobs",
|
||||
"remove_job",
|
||||
"update_job",
|
||||
"tick",
|
||||
"run_daemon",
|
||||
"JOBS_FILE",
|
||||
]
|
||||
372
cron/jobs.py
Normal file
372
cron/jobs.py
Normal file
@@ -0,0 +1,372 @@
|
||||
"""
|
||||
Cron job storage and management.
|
||||
|
||||
Jobs are stored in ~/.hermes/cron/jobs.json
|
||||
Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, List, Any
|
||||
|
||||
try:
|
||||
from croniter import croniter
|
||||
HAS_CRONITER = True
|
||||
except ImportError:
|
||||
HAS_CRONITER = False
|
||||
|
||||
# =============================================================================
|
||||
# Configuration
|
||||
# =============================================================================
|
||||
|
||||
HERMES_DIR = Path.home() / ".hermes"
|
||||
CRON_DIR = HERMES_DIR / "cron"
|
||||
JOBS_FILE = CRON_DIR / "jobs.json"
|
||||
OUTPUT_DIR = CRON_DIR / "output"
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
"""Ensure cron directories exist."""
|
||||
CRON_DIR.mkdir(parents=True, exist_ok=True)
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Schedule Parsing
|
||||
# =============================================================================
|
||||
|
||||
def parse_duration(s: str) -> int:
|
||||
"""
|
||||
Parse duration string into minutes.
|
||||
|
||||
Examples:
|
||||
"30m" → 30
|
||||
"2h" → 120
|
||||
"1d" → 1440
|
||||
"""
|
||||
s = s.strip().lower()
|
||||
match = re.match(r'^(\d+)\s*(m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)$', s)
|
||||
if not match:
|
||||
raise ValueError(f"Invalid duration: '{s}'. Use format like '30m', '2h', or '1d'")
|
||||
|
||||
value = int(match.group(1))
|
||||
unit = match.group(2)[0] # First char: m, h, or d
|
||||
|
||||
multipliers = {'m': 1, 'h': 60, 'd': 1440}
|
||||
return value * multipliers[unit]
|
||||
|
||||
|
||||
def parse_schedule(schedule: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Parse schedule string into structured format.
|
||||
|
||||
Returns dict with:
|
||||
- kind: "once" | "interval" | "cron"
|
||||
- For "once": "run_at" (ISO timestamp)
|
||||
- For "interval": "minutes" (int)
|
||||
- For "cron": "expr" (cron expression)
|
||||
|
||||
Examples:
|
||||
"30m" → once in 30 minutes
|
||||
"2h" → once in 2 hours
|
||||
"every 30m" → recurring every 30 minutes
|
||||
"every 2h" → recurring every 2 hours
|
||||
"0 9 * * *" → cron expression
|
||||
"2026-02-03T14:00" → once at timestamp
|
||||
"""
|
||||
schedule = schedule.strip()
|
||||
original = schedule
|
||||
schedule_lower = schedule.lower()
|
||||
|
||||
# "every X" pattern → recurring interval
|
||||
if schedule_lower.startswith("every "):
|
||||
duration_str = schedule[6:].strip()
|
||||
minutes = parse_duration(duration_str)
|
||||
return {
|
||||
"kind": "interval",
|
||||
"minutes": minutes,
|
||||
"display": f"every {minutes}m"
|
||||
}
|
||||
|
||||
# Check for cron expression (5 or 6 space-separated fields)
|
||||
# Cron fields: minute hour day month weekday [year]
|
||||
parts = schedule.split()
|
||||
if len(parts) >= 5 and all(
|
||||
re.match(r'^[\d\*\-,/]+$', p) for p in parts[:5]
|
||||
):
|
||||
if not HAS_CRONITER:
|
||||
raise ValueError("Cron expressions require 'croniter' package. Install with: pip install croniter")
|
||||
# Validate cron expression
|
||||
try:
|
||||
croniter(schedule)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid cron expression '{schedule}': {e}")
|
||||
return {
|
||||
"kind": "cron",
|
||||
"expr": schedule,
|
||||
"display": schedule
|
||||
}
|
||||
|
||||
# ISO timestamp (contains T or looks like date)
|
||||
if 'T' in schedule or re.match(r'^\d{4}-\d{2}-\d{2}', schedule):
|
||||
try:
|
||||
# Parse and validate
|
||||
dt = datetime.fromisoformat(schedule.replace('Z', '+00:00'))
|
||||
return {
|
||||
"kind": "once",
|
||||
"run_at": dt.isoformat(),
|
||||
"display": f"once at {dt.strftime('%Y-%m-%d %H:%M')}"
|
||||
}
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid timestamp '{schedule}': {e}")
|
||||
|
||||
# Duration like "30m", "2h", "1d" → one-shot from now
|
||||
try:
|
||||
minutes = parse_duration(schedule)
|
||||
run_at = datetime.now() + timedelta(minutes=minutes)
|
||||
return {
|
||||
"kind": "once",
|
||||
"run_at": run_at.isoformat(),
|
||||
"display": f"once in {original}"
|
||||
}
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
raise ValueError(
|
||||
f"Invalid schedule '{original}'. Use:\n"
|
||||
f" - Duration: '30m', '2h', '1d' (one-shot)\n"
|
||||
f" - Interval: 'every 30m', 'every 2h' (recurring)\n"
|
||||
f" - Cron: '0 9 * * *' (cron expression)\n"
|
||||
f" - Timestamp: '2026-02-03T14:00:00' (one-shot at time)"
|
||||
)
|
||||
|
||||
|
||||
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
|
||||
"""
|
||||
Compute the next run time for a schedule.
|
||||
|
||||
Returns ISO timestamp string, or None if no more runs.
|
||||
"""
|
||||
now = datetime.now()
|
||||
|
||||
if schedule["kind"] == "once":
|
||||
run_at = datetime.fromisoformat(schedule["run_at"])
|
||||
# If in the future, return it; if in the past, no more runs
|
||||
return schedule["run_at"] if run_at > now else None
|
||||
|
||||
elif schedule["kind"] == "interval":
|
||||
minutes = schedule["minutes"]
|
||||
if last_run_at:
|
||||
# Next run is last_run + interval
|
||||
last = datetime.fromisoformat(last_run_at)
|
||||
next_run = last + timedelta(minutes=minutes)
|
||||
else:
|
||||
# First run is now + interval
|
||||
next_run = now + timedelta(minutes=minutes)
|
||||
return next_run.isoformat()
|
||||
|
||||
elif schedule["kind"] == "cron":
|
||||
if not HAS_CRONITER:
|
||||
return None
|
||||
cron = croniter(schedule["expr"], now)
|
||||
next_run = cron.get_next(datetime)
|
||||
return next_run.isoformat()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Job CRUD Operations
|
||||
# =============================================================================
|
||||
|
||||
def load_jobs() -> List[Dict[str, Any]]:
|
||||
"""Load all jobs from storage."""
|
||||
ensure_dirs()
|
||||
if not JOBS_FILE.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
return data.get("jobs", [])
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return []
|
||||
|
||||
|
||||
def save_jobs(jobs: List[Dict[str, Any]]):
|
||||
"""Save all jobs to storage."""
|
||||
ensure_dirs()
|
||||
with open(JOBS_FILE, 'w', encoding='utf-8') as f:
|
||||
json.dump({"jobs": jobs, "updated_at": datetime.now().isoformat()}, f, indent=2)
|
||||
|
||||
|
||||
def create_job(
|
||||
prompt: str,
|
||||
schedule: str,
|
||||
name: Optional[str] = None,
|
||||
repeat: Optional[int] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new cron job.
|
||||
|
||||
Args:
|
||||
prompt: The prompt to run (must be self-contained)
|
||||
schedule: Schedule string (see parse_schedule)
|
||||
name: Optional friendly name
|
||||
repeat: How many times to run (None = forever, 1 = once)
|
||||
|
||||
Returns:
|
||||
The created job dict
|
||||
"""
|
||||
parsed_schedule = parse_schedule(schedule)
|
||||
|
||||
# Auto-set repeat=1 for one-shot schedules if not specified
|
||||
if parsed_schedule["kind"] == "once" and repeat is None:
|
||||
repeat = 1
|
||||
|
||||
job_id = uuid.uuid4().hex[:12]
|
||||
now = datetime.now().isoformat()
|
||||
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": name or prompt[:50].strip(),
|
||||
"prompt": prompt,
|
||||
"schedule": parsed_schedule,
|
||||
"schedule_display": parsed_schedule.get("display", schedule),
|
||||
"repeat": {
|
||||
"times": repeat, # None = forever
|
||||
"completed": 0
|
||||
},
|
||||
"enabled": True,
|
||||
"created_at": now,
|
||||
"next_run_at": compute_next_run(parsed_schedule),
|
||||
"last_run_at": None,
|
||||
"last_status": None,
|
||||
"last_error": None
|
||||
}
|
||||
|
||||
jobs = load_jobs()
|
||||
jobs.append(job)
|
||||
save_jobs(jobs)
|
||||
|
||||
return job
|
||||
|
||||
|
||||
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get a job by ID."""
|
||||
jobs = load_jobs()
|
||||
for job in jobs:
|
||||
if job["id"] == job_id:
|
||||
return job
|
||||
return None
|
||||
|
||||
|
||||
def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]:
|
||||
"""List all jobs, optionally including disabled ones."""
|
||||
jobs = load_jobs()
|
||||
if not include_disabled:
|
||||
jobs = [j for j in jobs if j.get("enabled", True)]
|
||||
return jobs
|
||||
|
||||
|
||||
def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Update a job by ID."""
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] == job_id:
|
||||
jobs[i] = {**job, **updates}
|
||||
save_jobs(jobs)
|
||||
return jobs[i]
|
||||
return None
|
||||
|
||||
|
||||
def remove_job(job_id: str) -> bool:
|
||||
"""Remove a job by ID."""
|
||||
jobs = load_jobs()
|
||||
original_len = len(jobs)
|
||||
jobs = [j for j in jobs if j["id"] != job_id]
|
||||
if len(jobs) < original_len:
|
||||
save_jobs(jobs)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
|
||||
"""
|
||||
Mark a job as having been run.
|
||||
|
||||
Updates last_run_at, last_status, increments completed count,
|
||||
computes next_run_at, and auto-deletes if repeat limit reached.
|
||||
"""
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] == job_id:
|
||||
now = datetime.now().isoformat()
|
||||
job["last_run_at"] = now
|
||||
job["last_status"] = "ok" if success else "error"
|
||||
job["last_error"] = error if not success else None
|
||||
|
||||
# Increment completed count
|
||||
if job.get("repeat"):
|
||||
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
|
||||
|
||||
# Check if we've hit the repeat limit
|
||||
times = job["repeat"].get("times")
|
||||
completed = job["repeat"]["completed"]
|
||||
if times is not None and completed >= times:
|
||||
# Remove the job (limit reached)
|
||||
jobs.pop(i)
|
||||
save_jobs(jobs)
|
||||
return
|
||||
|
||||
# Compute next run
|
||||
job["next_run_at"] = compute_next_run(job["schedule"], now)
|
||||
|
||||
# If no next run (one-shot completed), disable
|
||||
if job["next_run_at"] is None:
|
||||
job["enabled"] = False
|
||||
|
||||
save_jobs(jobs)
|
||||
return
|
||||
|
||||
save_jobs(jobs)
|
||||
|
||||
|
||||
def get_due_jobs() -> List[Dict[str, Any]]:
|
||||
"""Get all jobs that are due to run now."""
|
||||
now = datetime.now()
|
||||
jobs = load_jobs()
|
||||
due = []
|
||||
|
||||
for job in jobs:
|
||||
if not job.get("enabled", True):
|
||||
continue
|
||||
|
||||
next_run = job.get("next_run_at")
|
||||
if not next_run:
|
||||
continue
|
||||
|
||||
next_run_dt = datetime.fromisoformat(next_run)
|
||||
if next_run_dt <= now:
|
||||
due.append(job)
|
||||
|
||||
return due
|
||||
|
||||
|
||||
def save_job_output(job_id: str, output: str):
|
||||
"""Save job output to file."""
|
||||
ensure_dirs()
|
||||
job_output_dir = OUTPUT_DIR / job_id
|
||||
job_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
output_file = job_output_dir / f"{timestamp}.md"
|
||||
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
|
||||
return output_file
|
||||
188
cron/scheduler.py
Normal file
188
cron/scheduler.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Cron job scheduler - executes due jobs.
|
||||
|
||||
This module provides:
|
||||
- tick(): Run all due jobs once (for system cron integration)
|
||||
- run_daemon(): Run continuously, checking every 60 seconds
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from cron.jobs import get_due_jobs, mark_job_run, save_job_output
|
||||
|
||||
|
||||
def run_job(job: dict) -> tuple[bool, str, Optional[str]]:
|
||||
"""
|
||||
Execute a single cron job.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, output, error_message)
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
|
||||
job_id = job["id"]
|
||||
job_name = job["name"]
|
||||
prompt = job["prompt"]
|
||||
|
||||
print(f"[cron] Running job '{job_name}' (ID: {job_id})")
|
||||
print(f"[cron] Prompt: {prompt[:100]}{'...' if len(prompt) > 100 else ''}")
|
||||
|
||||
try:
|
||||
# Create agent with default settings
|
||||
# Jobs run in isolated sessions (no prior context)
|
||||
agent = AIAgent(
|
||||
model=os.getenv("HERMES_MODEL", "anthropic/claude-sonnet-4"),
|
||||
quiet_mode=True,
|
||||
session_id=f"cron_{job_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
)
|
||||
|
||||
# Run the conversation
|
||||
result = agent.run_conversation(prompt)
|
||||
|
||||
# Extract final response
|
||||
final_response = result.get("final_response", "")
|
||||
if not final_response:
|
||||
final_response = "(No response generated)"
|
||||
|
||||
# Build output document
|
||||
output = f"""# Cron Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
|
||||
## Response
|
||||
|
||||
{final_response}
|
||||
"""
|
||||
|
||||
print(f"[cron] Job '{job_name}' completed successfully")
|
||||
return True, output, None
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {str(e)}"
|
||||
print(f"[cron] Job '{job_name}' failed: {error_msg}")
|
||||
|
||||
# Build error output
|
||||
output = f"""# Cron Job: {job_name} (FAILED)
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
|
||||
## Error
|
||||
|
||||
```
|
||||
{error_msg}
|
||||
|
||||
{traceback.format_exc()}
|
||||
```
|
||||
"""
|
||||
return False, output, error_msg
|
||||
|
||||
|
||||
def tick(verbose: bool = True) -> int:
|
||||
"""
|
||||
Check and run all due jobs.
|
||||
|
||||
This is designed to be called by system cron every minute:
|
||||
*/1 * * * * cd ~/hermes-agent && python -c "from cron import tick; tick()"
|
||||
|
||||
Args:
|
||||
verbose: Whether to print status messages
|
||||
|
||||
Returns:
|
||||
Number of jobs executed
|
||||
"""
|
||||
due_jobs = get_due_jobs()
|
||||
|
||||
if verbose and not due_jobs:
|
||||
print(f"[cron] {datetime.now().strftime('%H:%M:%S')} - No jobs due")
|
||||
return 0
|
||||
|
||||
if verbose:
|
||||
print(f"[cron] {datetime.now().strftime('%H:%M:%S')} - {len(due_jobs)} job(s) due")
|
||||
|
||||
executed = 0
|
||||
for job in due_jobs:
|
||||
try:
|
||||
success, output, error = run_job(job)
|
||||
|
||||
# Save output to file
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
print(f"[cron] Output saved to: {output_file}")
|
||||
|
||||
# Mark job as run (handles repeat counting, next_run computation)
|
||||
mark_job_run(job["id"], success, error)
|
||||
executed += 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"[cron] Error processing job {job['id']}: {e}")
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
|
||||
return executed
|
||||
|
||||
|
||||
def run_daemon(check_interval: int = 60, verbose: bool = True):
|
||||
"""
|
||||
Run the cron daemon continuously.
|
||||
|
||||
Checks for due jobs every `check_interval` seconds.
|
||||
|
||||
Args:
|
||||
check_interval: Seconds between checks (default: 60)
|
||||
verbose: Whether to print status messages
|
||||
"""
|
||||
print(f"[cron] Starting daemon (checking every {check_interval}s)")
|
||||
print(f"[cron] Press Ctrl+C to stop")
|
||||
print()
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
tick(verbose=verbose)
|
||||
except Exception as e:
|
||||
print(f"[cron] Tick error: {e}")
|
||||
|
||||
time.sleep(check_interval)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n[cron] Daemon stopped")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Allow running directly: python cron/scheduler.py [daemon|tick]
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Hermes Cron Scheduler")
|
||||
parser.add_argument("mode", choices=["daemon", "tick"], default="tick", nargs="?",
|
||||
help="Mode: 'tick' to run once, 'daemon' to run continuously")
|
||||
parser.add_argument("--interval", type=int, default=60,
|
||||
help="Check interval in seconds for daemon mode")
|
||||
parser.add_argument("--quiet", "-q", action="store_true",
|
||||
help="Suppress status messages")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.mode == "daemon":
|
||||
run_daemon(check_interval=args.interval, verbose=not args.quiet)
|
||||
else:
|
||||
tick(verbose=not args.quiet)
|
||||
Reference in New Issue
Block a user