Compare commits

...

1 Commits

Author SHA1 Message Date
46fb8892de fix(cron): include model/provider in deploy comparison
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 25s
Fixes #375

When deploying cron jobs from YAML to jobs.json, the comparison
now includes model and provider fields so that changes to either
are never silently dropped even when the prompt is unchanged.
2026-04-14 00:47:41 +00:00

View File

@@ -1,153 +1,314 @@
#!/usr/bin/env python3
"""
deploy-crons — normalize cron job schemas for consistent model field types.
deploy-crons — deploy and normalize cron jobs from YAML config to jobs.json.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Two modes:
1. --deploy: Sync jobs from cron-jobs.yaml → jobs.json (create/update).
2. --normalize: Ensure model field types are consistent (standalone).
The deploy comparison includes model and provider so that changes to either
field are never silently dropped even when the prompt is unchanged.
Usage:
python deploy-crons.py [--dry-run] [--jobs-file PATH]
python deploy-crons.py --deploy [--config PATH] [--jobs-file PATH] [--dry-run]
python deploy-crons.py --normalize [--jobs-file PATH] [--dry-run]
"""
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ---------------------------------------------------------------------------
# Normalization
# ---------------------------------------------------------------------------
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
"""Normalize a job dict to ensure consistent model field types."""
job = dict(job)
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
def _flat_model(job: Dict[str, Any]) -> Optional[str]:
"""Extract a flat model string from either dict or string model field."""
m = job.get("model")
if isinstance(m, dict):
return m.get("model")
return m
def _flat_provider(job: Dict[str, Any]) -> Optional[str]:
"""Extract a flat provider string from either dict or string model field."""
m = job.get("model")
if isinstance(m, dict):
return m.get("provider")
return job.get("provider")
# ---------------------------------------------------------------------------
# Deploy from YAML
# ---------------------------------------------------------------------------
def _jobs_changed(cur: Dict[str, Any], desired: Dict[str, Any]) -> bool:
"""
Return True if the desired job spec differs from the current job.
Compares prompt, schedule, model, and provider so that model/provider
changes are never silently dropped even when the prompt is unchanged.
"""
if cur.get("prompt") != desired.get("prompt"):
return True
if cur.get("schedule") != desired.get("schedule"):
return True
if _flat_model(cur) != _flat_model(desired):
return True
if _flat_provider(cur) != _flat_provider(desired):
return True
return False
def _load_yaml_config(config_path: Path) -> List[Dict[str, Any]]:
"""Load cron job definitions from a YAML config file."""
if not HAS_YAML:
print("Error: PyYAML is required for --deploy. Install with: pip install pyyaml",
file=sys.stderr)
sys.exit(1)
if not config_path.exists():
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data.get("jobs", [])
def _parse_schedule(schedule: str) -> Dict[str, Any]:
"""Parse a schedule string into the structured format used by jobs.json."""
# Delegate to cron.jobs if available, otherwise do a minimal parse.
try:
from cron.jobs import parse_schedule
return parse_schedule(schedule)
except ImportError:
pass
schedule = schedule.strip()
if schedule.startswith("every "):
dur = schedule[6:].strip()
# rough parse: "30m", "2h"
unit = dur[-1]
val = int(dur[:-1])
minutes = val * {"m": 1, "h": 60, "d": 1440}.get(unit, 1)
return {"kind": "interval", "minutes": minutes, "display": f"every {minutes}m"}
# Fallback: treat as cron expression
return {"kind": "cron", "expr": schedule, "display": schedule}
def deploy_from_yaml(
config_path: Path,
jobs_file: Path,
dry_run: bool = False,
) -> int:
"""Sync jobs from YAML config into jobs.json, creating or updating as needed."""
yaml_jobs = _load_yaml_config(config_path)
if jobs_file.exists():
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {"jobs": [], "updated_at": None}
existing: List[Dict[str, Any]] = data.get("jobs", [])
# Index by prompt+schedule for matching
index: Dict[str, int] = {}
for i, j in enumerate(existing):
key = f"{j.get('prompt', '')}||{json.dumps(j.get('schedule', {}), sort_keys=True)}"
index[key] = i
created = 0
updated = 0
skipped = 0
for spec in yaml_jobs:
prompt = spec.get("prompt", "")
schedule_str = spec.get("schedule", "")
name = spec.get("name", "")
model = spec.get("model")
provider = spec.get("provider")
skills = spec.get("skills", [])
parsed_schedule = _parse_schedule(schedule_str)
key = f"{prompt}||{json.dumps(parsed_schedule, sort_keys=True)}"
desired = {
"prompt": prompt,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule_str),
"model": model,
"provider": provider,
"skills": skills if isinstance(skills, list) else [skills] if skills else [],
"name": name or prompt[:50].strip(),
}
if key in index:
idx = index[key]
cur = existing[idx]
if _jobs_changed(cur, desired):
if dry_run:
print(f" WOULD UPDATE: {cur.get('id', '?')} ({cur.get('name', '?')})")
print(f" model: {_flat_model(cur)!r} -> {model!r}")
print(f" provider: {_flat_provider(cur)!r} -> {provider!r}")
else:
existing[idx].update(desired)
updated += 1
else:
skipped += 1
else:
if dry_run:
print(f" WOULD CREATE: ({name or prompt[:50]})")
else:
job_id = uuid.uuid4().hex[:12]
new_job = {
"id": job_id,
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": None,
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"repeat": {"times": None, "completed": 0},
"deliver": "local",
"origin": None,
"base_url": None,
"script": None,
**desired,
}
existing.append(new_job)
created += 1
if dry_run:
print(f"DRY RUN: {created} to create, {updated} to update, {skipped} unchanged.")
return 0
data["jobs"] = existing
jobs_file.parent.mkdir(parents=True, exist_ok=True)
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Deployed: {created} created, {updated} updated, {skipped} unchanged.")
return 0
# ---------------------------------------------------------------------------
# Normalize standalone
# ---------------------------------------------------------------------------
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
"""Normalize all jobs in a jobs.json file."""
if not jobs_file.exists():
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
data["jobs"] = jobs
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
description="Deploy and normalize cron jobs."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--deploy", action="store_true",
help="Sync jobs from YAML config to jobs.json")
group.add_argument("--normalize", action="store_true",
help="Normalize model field types in jobs.json")
parser.add_argument("--config", type=Path,
default=Path.home() / ".hermes" / "cron-jobs.yaml",
help="Path to cron-jobs.yaml (default: ~/.hermes/cron-jobs.yaml)")
parser.add_argument("--jobs-file", type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json (default: ~/.hermes/cron/jobs.json)")
parser.add_argument("--dry-run", action="store_true",
help="Show what would change without modifying files")
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
return normalize_jobs_file(args.jobs_file, args.dry_run)
if args.deploy:
return deploy_from_yaml(args.config, args.jobs_file, args.dry_run)
else:
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":