Files
hermes-agent/deploy-crons.py
Alexander Whitestone 46fb8892de
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 25s
fix(cron): include model/provider in deploy comparison
Fixes #375

When deploying cron jobs from YAML to jobs.json, the comparison
now includes model and provider fields so that changes to either
are never silently dropped even when the prompt is unchanged.
2026-04-14 00:47:41 +00:00

316 lines
11 KiB
Python

#!/usr/bin/env python3
"""
deploy-crons — deploy and normalize cron jobs from YAML config to jobs.json.
Two modes:
1. --deploy: Sync jobs from cron-jobs.yaml → jobs.json (create/update).
2. --normalize: Ensure model field types are consistent (standalone).
The deploy comparison includes model and provider so that changes to either
field are never silently dropped even when the prompt is unchanged.
Usage:
python deploy-crons.py --deploy [--config PATH] [--jobs-file PATH] [--dry-run]
python deploy-crons.py --normalize [--jobs-file PATH] [--dry-run]
"""
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ---------------------------------------------------------------------------
# Normalization
# ---------------------------------------------------------------------------
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize a job dict to ensure consistent model field types."""
job = dict(job)
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
return job
def _flat_model(job: Dict[str, Any]) -> Optional[str]:
"""Extract a flat model string from either dict or string model field."""
m = job.get("model")
if isinstance(m, dict):
return m.get("model")
return m
def _flat_provider(job: Dict[str, Any]) -> Optional[str]:
"""Extract a flat provider string from either dict or string model field."""
m = job.get("model")
if isinstance(m, dict):
return m.get("provider")
return job.get("provider")
# ---------------------------------------------------------------------------
# Deploy from YAML
# ---------------------------------------------------------------------------
def _jobs_changed(cur: Dict[str, Any], desired: Dict[str, Any]) -> bool:
"""
Return True if the desired job spec differs from the current job.
Compares prompt, schedule, model, and provider so that model/provider
changes are never silently dropped even when the prompt is unchanged.
"""
if cur.get("prompt") != desired.get("prompt"):
return True
if cur.get("schedule") != desired.get("schedule"):
return True
if _flat_model(cur) != _flat_model(desired):
return True
if _flat_provider(cur) != _flat_provider(desired):
return True
return False
def _load_yaml_config(config_path: Path) -> List[Dict[str, Any]]:
"""Load cron job definitions from a YAML config file."""
if not HAS_YAML:
print("Error: PyYAML is required for --deploy. Install with: pip install pyyaml",
file=sys.stderr)
sys.exit(1)
if not config_path.exists():
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data.get("jobs", [])
def _parse_schedule(schedule: str) -> Dict[str, Any]:
"""Parse a schedule string into the structured format used by jobs.json."""
# Delegate to cron.jobs if available, otherwise do a minimal parse.
try:
from cron.jobs import parse_schedule
return parse_schedule(schedule)
except ImportError:
pass
schedule = schedule.strip()
if schedule.startswith("every "):
dur = schedule[6:].strip()
# rough parse: "30m", "2h"
unit = dur[-1]
val = int(dur[:-1])
minutes = val * {"m": 1, "h": 60, "d": 1440}.get(unit, 1)
return {"kind": "interval", "minutes": minutes, "display": f"every {minutes}m"}
# Fallback: treat as cron expression
return {"kind": "cron", "expr": schedule, "display": schedule}
def deploy_from_yaml(
config_path: Path,
jobs_file: Path,
dry_run: bool = False,
) -> int:
"""Sync jobs from YAML config into jobs.json, creating or updating as needed."""
yaml_jobs = _load_yaml_config(config_path)
if jobs_file.exists():
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {"jobs": [], "updated_at": None}
existing: List[Dict[str, Any]] = data.get("jobs", [])
# Index by prompt+schedule for matching
index: Dict[str, int] = {}
for i, j in enumerate(existing):
key = f"{j.get('prompt', '')}||{json.dumps(j.get('schedule', {}), sort_keys=True)}"
index[key] = i
created = 0
updated = 0
skipped = 0
for spec in yaml_jobs:
prompt = spec.get("prompt", "")
schedule_str = spec.get("schedule", "")
name = spec.get("name", "")
model = spec.get("model")
provider = spec.get("provider")
skills = spec.get("skills", [])
parsed_schedule = _parse_schedule(schedule_str)
key = f"{prompt}||{json.dumps(parsed_schedule, sort_keys=True)}"
desired = {
"prompt": prompt,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule_str),
"model": model,
"provider": provider,
"skills": skills if isinstance(skills, list) else [skills] if skills else [],
"name": name or prompt[:50].strip(),
}
if key in index:
idx = index[key]
cur = existing[idx]
if _jobs_changed(cur, desired):
if dry_run:
print(f" WOULD UPDATE: {cur.get('id', '?')} ({cur.get('name', '?')})")
print(f" model: {_flat_model(cur)!r} -> {model!r}")
print(f" provider: {_flat_provider(cur)!r} -> {provider!r}")
else:
existing[idx].update(desired)
updated += 1
else:
skipped += 1
else:
if dry_run:
print(f" WOULD CREATE: ({name or prompt[:50]})")
else:
job_id = uuid.uuid4().hex[:12]
new_job = {
"id": job_id,
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": None,
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"repeat": {"times": None, "completed": 0},
"deliver": "local",
"origin": None,
"base_url": None,
"script": None,
**desired,
}
existing.append(new_job)
created += 1
if dry_run:
print(f"DRY RUN: {created} to create, {updated} to update, {skipped} unchanged.")
return 0
data["jobs"] = existing
jobs_file.parent.mkdir(parents=True, exist_ok=True)
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Deployed: {created} created, {updated} updated, {skipped} unchanged.")
return 0
# ---------------------------------------------------------------------------
# Normalize standalone
# ---------------------------------------------------------------------------
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""Normalize all jobs in a jobs.json file."""
if not jobs_file.exists():
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
data["jobs"] = jobs
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Deploy and normalize cron jobs."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--deploy", action="store_true",
help="Sync jobs from YAML config to jobs.json")
group.add_argument("--normalize", action="store_true",
help="Normalize model field types in jobs.json")
parser.add_argument("--config", type=Path,
default=Path.home() / ".hermes" / "cron-jobs.yaml",
help="Path to cron-jobs.yaml (default: ~/.hermes/cron-jobs.yaml)")
parser.add_argument("--jobs-file", type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json (default: ~/.hermes/cron/jobs.json)")
parser.add_argument("--dry-run", action="store_true",
help="Show what would change without modifying files")
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
if args.deploy:
return deploy_from_yaml(args.config, args.jobs_file, args.dry_run)
else:
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":
sys.exit(main())