Compare commits

..

1 Commits

Author SHA1 Message Date
628487f7bd fix(cron): rewrite cloud-incompatible prompt instructions (#378)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m9s
Health Monitor prompts say 'Check Ollama is responding' but run
on cloud models that cannot reach localhost. Instead of just
warning the agent, rewrite the instructions to cloud-compatible
equivalents the agent can actually execute.

Changes:
- Add import re
- Add _CLOUD_INCOMPATIBLE_PATTERNS: regex pairs (pattern, replacement)
- Add _rewrite_cloud_incompatible_prompt(): rewrites localhost/Ollama
  references to 'use available tools to check service health'
- Wire into run_job() after resolve_turn_route()

Closes #378
2026-04-14 01:47:00 +00:00
2 changed files with 175 additions and 145 deletions

View File

@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -643,7 +644,56 @@ def _build_job_prompt(job: dict) -> str:
return "\n".join(parts)
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Regex patterns for local service references that fail on cloud endpoints
_CLOUD_INCOMPATIBLE_PATTERNS = [
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
"check service health via available tools"),
]
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
"""Rewrite prompt instructions that assume local service access when running on cloud.
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
Instead of just warning, this rewrites the instruction to a cloud-compatible
equivalent that the agent can actually execute.
Returns the (possibly rewritten) prompt.
"""
try:
from agent.model_metadata import is_local_endpoint
except ImportError:
return prompt
if is_local_endpoint(base_url or ""):
return prompt # Local — no rewrite needed
rewritten = prompt
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
rewritten = pattern.sub(replacement, rewritten)
if rewritten != prompt:
rewritten = (
"[NOTE: Some instructions were adjusted for cloud execution. "
"Local service checks were rewritten to use available tools.]
"
+ rewritten
)
return rewritten
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.

View File

@@ -1,174 +1,154 @@
#!/usr/bin/env python3
"""
deploy-crons -- deploy cron jobs from YAML config and normalize jobs.json.
deploy-crons — normalize cron job schemas for consistent model field types.
Two modes:
--deploy Sync jobs from cron-jobs.yaml into jobs.json (create / update).
--normalize Normalize model field types in existing jobs.json.
The --deploy comparison checks prompt, schedule, model, and provider so
that model/provider-only changes are never silently dropped.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Usage:
python deploy-crons.py --deploy [--config PATH] [--jobs-file PATH] [--dry-run]
python deploy-crons.py --normalize [--jobs-file PATH] [--dry-run]
python deploy-crons.py [--dry-run] [--jobs-file PATH]
"""
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
def _flat_model(job: Dict[str, Any]) -> Optional[str]:
m = job.get("model")
if isinstance(m, dict):
return m.get("model")
return m
def _flat_provider(job: Dict[str, Any]) -> Optional[str]:
m = job.get("model")
if isinstance(m, dict):
return m.get("provider")
return job.get("provider")
from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
job = dict(job)
model, provider = job.get("model"), job.get("provider")
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
d = {}
if isinstance(model, str): d["model"] = model.strip()
if isinstance(provider, str): d["provider"] = provider.strip()
job["model"] = d if d else None
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
def _jobs_changed(cur: Dict[str, Any], desired: Dict[str, Any]) -> bool:
if cur.get("prompt") != desired.get("prompt"): return True
if cur.get("schedule") != desired.get("schedule"): return True
if _flat_model(cur) != _flat_model(desired): return True
if _flat_provider(cur) != _flat_provider(desired): return True
return False
def _parse_schedule(schedule: str) -> Dict[str, Any]:
try:
from cron.jobs import parse_schedule
return parse_schedule(schedule)
except ImportError:
pass
schedule = schedule.strip()
if schedule.startswith("every "):
dur = schedule[6:].strip()
minutes = int(dur[:-1]) * {"m": 1, "h": 60, "d": 1440}.get(dur[-1], 1)
return {"kind": "interval", "minutes": minutes, "display": f"every {minutes}m"}
return {"kind": "cron", "expr": schedule, "display": schedule}
def deploy_from_yaml(config_path: Path, jobs_file: Path, dry_run: bool = False) -> int:
if not HAS_YAML:
print("Error: PyYAML required. pip install pyyaml", file=sys.stderr); return 1
if not config_path.exists():
print(f"Error: {config_path}", file=sys.stderr); return 1
with open(config_path, "r", encoding="utf-8") as f:
yaml_jobs = (yaml.safe_load(f) or {}).get("jobs", [])
if jobs_file.exists():
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
else:
data = {"jobs": [], "updated_at": None}
existing = data.get("jobs", [])
index = {}
for i, j in enumerate(existing):
key = f"{j.get('prompt','')}||{json.dumps(j.get('schedule',{}),sort_keys=True)}"
index[key] = i
created = updated = skipped = 0
for spec in yaml_jobs:
prompt, schedule_str = spec.get("prompt",""), spec.get("schedule","")
name, model, provider = spec.get("name",""), spec.get("model"), spec.get("provider")
skills = spec.get("skills", [])
parsed = _parse_schedule(schedule_str)
key = f"{prompt}||{json.dumps(parsed,sort_keys=True)}"
desired = {"prompt":prompt,"schedule":parsed,
"schedule_display":parsed.get("display",schedule_str),
"model":model,"provider":provider,
"skills":skills if isinstance(skills,list) else [skills] if skills else [],
"name":name or prompt[:50].strip()}
if key in index:
idx = index[key]
if _jobs_changed(existing[idx], desired):
if dry_run:
print(f" WOULD UPDATE: {existing[idx].get('id','?')} model: {_flat_model(existing[idx])!r} -> {model!r} provider: {_flat_provider(existing[idx])!r} -> {provider!r}")
else:
existing[idx].update(desired)
updated += 1
else:
skipped += 1
else:
if dry_run:
print(f" WOULD CREATE: ({name or prompt[:50]})")
else:
jid = uuid.uuid4().hex[:12]
existing.append({"id":jid,"enabled":True,"state":"scheduled",
"paused_at":None,"paused_reason":None,"created_at":None,
"next_run_at":None,"last_run_at":None,"last_status":None,
"last_error":None,"repeat":{"times":None,"completed":0},
"deliver":"local","origin":None,"base_url":None,"script":None,**desired})
created += 1
if dry_run:
print(f"DRY RUN: {created} create, {updated} update, {skipped} unchanged."); return 0
data["jobs"] = existing
jobs_file.parent.mkdir(parents=True, exist_ok=True)
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Deployed: {created} created, {updated} updated, {skipped} unchanged."); return 0
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
if not jobs_file.exists():
print(f"Error: {jobs_file}", file=sys.stderr); return 1
with open(jobs_file, "r", encoding="utf-8") as f:
data = json.load(f)
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
jobs = data.get("jobs", [])
if not jobs: print("No jobs."); return 0
modified = 0
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
om, op = job.get("model"), job.get("provider")
n = normalize_job(job)
if n.get("model") != om or n.get("provider") != op:
jobs[i] = n; modified += 1
print(f"Normalized {job.get('id','?')}: model {om!r} -> {n['model']!r} provider {op!r} -> {n['provider']!r}")
if modified == 0: print("All consistent."); return 0
if dry_run: print(f"DRY RUN: {modified}"); return 0
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
with open(jobs_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified} jobs."); return 0
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
def main():
p = argparse.ArgumentParser(description="Deploy and normalize cron jobs.")
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--deploy", action="store_true")
g.add_argument("--normalize", action="store_true")
p.add_argument("--config", type=Path, default=Path.home()/".hermes"/"cron-jobs.yaml")
p.add_argument("--jobs-file", type=Path, default=Path.home()/".hermes"/"cron"/"jobs.json")
p.add_argument("--dry-run", action="store_true")
a = p.parse_args()
if a.dry_run: print("DRY RUN."); print()
if a.deploy: return deploy_from_yaml(a.config, a.jobs_file, a.dry_run)
else: return normalize_jobs_file(a.jobs_file, a.dry_run)
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":
sys.exit(main())