Compare commits
1 Commits
queue/296-
...
burn/profi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cff035d46c |
@@ -376,6 +376,7 @@ def create_job(
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new cron job.
|
||||
@@ -395,6 +396,8 @@ def create_job(
|
||||
script: Optional path to a Python script whose stdout is injected into the
|
||||
prompt each run. The script runs before the agent turn, and its output
|
||||
is prepended as context. Useful for data collection / change detection.
|
||||
profile: Optional profile name for profile-scoped cron. When set, the job
|
||||
runs with that profile's config.yaml, .env, and HERMES_ACTIVE_PROFILE.
|
||||
|
||||
Returns:
|
||||
The created job dict
|
||||
@@ -427,6 +430,8 @@ def create_job(
|
||||
normalized_script = normalized_script or None
|
||||
|
||||
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
|
||||
normalized_profile = str(profile).strip() if isinstance(profile, str) else None
|
||||
normalized_profile = normalized_profile or None
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": name or label_source[:50].strip(),
|
||||
@@ -437,6 +442,7 @@ def create_job(
|
||||
"provider": normalized_provider,
|
||||
"base_url": normalized_base_url,
|
||||
"script": normalized_script,
|
||||
"profile": normalized_profile,
|
||||
"schedule": parsed_schedule,
|
||||
"schedule_display": parsed_schedule.get("display", schedule),
|
||||
"repeat": {
|
||||
|
||||
@@ -27,7 +27,7 @@ except ImportError:
|
||||
msvcrt = None
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
# Add parent directory to path for imports BEFORE repo-level imports.
|
||||
# Without this, standalone invocations (e.g. after `hermes update` reloads
|
||||
@@ -156,6 +156,27 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
|
||||
|
||||
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Model context guard — rejects jobs whose model context is too small for cron.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
CRON_MIN_CONTEXT_TOKENS = 4096
|
||||
|
||||
|
||||
class ModelContextError(ValueError):
|
||||
"""Raised when a job's model has insufficient context for cron execution."""
|
||||
pass
|
||||
|
||||
|
||||
def _check_model_context_compat(model: str, context_length: int) -> None:
|
||||
"""Raise ModelContextError if the model context is below the cron minimum."""
|
||||
if context_length < CRON_MIN_CONTEXT_TOKENS:
|
||||
raise ModelContextError(
|
||||
f"Model '{model}' context ({context_length} tokens) is below the "
|
||||
f"minimum {CRON_MIN_CONTEXT_TOKENS} tokens required for cron jobs."
|
||||
)
|
||||
|
||||
|
||||
# Sentinel: when a cron agent has nothing new to report, it can start its
|
||||
# response with this marker to suppress delivery. Output is still saved
|
||||
# locally for audit.
|
||||
@@ -544,6 +565,60 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
|
||||
return False, f"Script execution failed: {exc}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile-scoped cron — resolve profile home, load profile config/env
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _resolve_profile_home(profile: Optional[str]) -> Path:
|
||||
"""Resolve a profile name to its HERMES_HOME directory.
|
||||
|
||||
None or "default" → the main HERMES_HOME.
|
||||
Named profile → ~/.hermes/profiles/<name>.
|
||||
Raises ValueError if the named profile directory doesn't exist.
|
||||
"""
|
||||
if not profile or profile == "default":
|
||||
return _hermes_home
|
||||
profile_home = _hermes_home / "profiles" / profile
|
||||
if not profile_home.is_dir():
|
||||
raise ValueError(
|
||||
f"Profile '{profile}' not found at {profile_home}. "
|
||||
f"Create it with: hermes profile create {profile}"
|
||||
)
|
||||
return profile_home
|
||||
|
||||
|
||||
def _load_profile_env(profile_home: Path, env_out: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||||
"""Load a profile's .env file into env_out (or return a new dict).
|
||||
|
||||
Does NOT modify os.environ — the caller decides when/whether to inject.
|
||||
"""
|
||||
from dotenv import dotenv_values
|
||||
env_file = profile_home / ".env"
|
||||
result = env_out if env_out is not None else {}
|
||||
if env_file.exists():
|
||||
try:
|
||||
values = dotenv_values(str(env_file), encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
values = dotenv_values(str(env_file), encoding="latin-1")
|
||||
for k, v in values.items():
|
||||
if v is not None:
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
|
||||
def _load_profile_config(profile_home: Path) -> Dict[str, Any]:
|
||||
"""Load a profile's config.yaml, returning the parsed dict (empty dict on failure)."""
|
||||
try:
|
||||
import yaml
|
||||
cfg_path = profile_home / "config.yaml"
|
||||
if cfg_path.exists():
|
||||
with open(cfg_path, encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
except Exception as e:
|
||||
logger.warning("Failed to load config.yaml from %s: %s", profile_home, e)
|
||||
return {}
|
||||
|
||||
|
||||
def _build_job_prompt(job: dict) -> str:
|
||||
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
||||
prompt = job.get("prompt", "")
|
||||
@@ -683,11 +758,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
|
||||
# Re-read .env and config.yaml fresh every run so provider/key
|
||||
# changes take effect without a gateway restart.
|
||||
# Profile-scoped: load the job's profile's config/env, not the default.
|
||||
_profile = job.get("profile")
|
||||
_profile_home = _resolve_profile_home(_profile)
|
||||
|
||||
# Set HERMES_ACTIVE_PROFILE so the agent knows which profile it's running under.
|
||||
if _profile:
|
||||
os.environ["HERMES_ACTIVE_PROFILE"] = _profile
|
||||
|
||||
from dotenv import load_dotenv
|
||||
try:
|
||||
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
|
||||
load_dotenv(str(_profile_home / ".env"), override=True, encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
|
||||
load_dotenv(str(_profile_home / ".env"), override=True, encoding="latin-1")
|
||||
|
||||
delivery_target = _resolve_delivery_target(job)
|
||||
if delivery_target:
|
||||
@@ -702,7 +785,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
_cfg = {}
|
||||
try:
|
||||
import yaml
|
||||
_cfg_path = str(_hermes_home / "config.yaml")
|
||||
_cfg_path = str(_profile_home / "config.yaml")
|
||||
if os.path.exists(_cfg_path):
|
||||
with open(_cfg_path) as _f:
|
||||
_cfg = yaml.safe_load(_f) or {}
|
||||
@@ -713,7 +796,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
elif isinstance(_model_cfg, dict):
|
||||
model = _model_cfg.get("default", model)
|
||||
except Exception as e:
|
||||
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
|
||||
logger.warning("Job '%s': failed to load config.yaml from %s, using defaults: %s", job_id, _profile_home, e)
|
||||
|
||||
# Reasoning config from env or config.yaml
|
||||
from hermes_constants import parse_reasoning_effort
|
||||
@@ -994,6 +1077,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
|
||||
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
|
||||
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
|
||||
"HERMES_ACTIVE_PROFILE",
|
||||
):
|
||||
os.environ.pop(key, None)
|
||||
if _session_db:
|
||||
@@ -1048,34 +1132,22 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
if verbose:
|
||||
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
||||
|
||||
# Profile-scoped cron with parallel execution.
|
||||
# Jobs are grouped by profile and submitted to a thread pool so that
|
||||
# jobs in different profiles run concurrently without interfering.
|
||||
# Each job runs with its profile's own config.yaml and .env.
|
||||
executed = 0
|
||||
for job in due_jobs:
|
||||
# If the interpreter is shutting down (e.g. gateway restart),
|
||||
# stop processing immediately — ThreadPoolExecutor.submit()
|
||||
# will raise RuntimeError for every remaining job.
|
||||
if sys.is_finalizing():
|
||||
logger.warning(
|
||||
"Interpreter finalizing — skipping %d remaining job(s)",
|
||||
len(due_jobs) - executed,
|
||||
)
|
||||
break
|
||||
_max_workers = int(os.getenv("HERMES_CRON_PARALLEL_WORKERS", "4"))
|
||||
|
||||
def _run_single_job(job: dict) -> tuple:
|
||||
"""Run a single job: advance schedule, execute, deliver, mark."""
|
||||
try:
|
||||
# For recurring jobs (cron/interval), advance next_run_at to the
|
||||
# next future occurrence BEFORE execution. This way, if the
|
||||
# process crashes mid-run, the job won't re-fire on restart.
|
||||
# One-shot jobs are left alone so they can retry on restart.
|
||||
advance_next_run(job["id"])
|
||||
|
||||
success, output, final_response, error = run_job(job)
|
||||
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
logger.info("Output saved to: %s", output_file)
|
||||
logger.info("Job '%s' output saved to: %s", job["id"], output_file)
|
||||
|
||||
# Deliver the final response to the origin/target chat.
|
||||
# If the agent responded with [SILENT], skip delivery (but
|
||||
# output is already saved above). Failed jobs always deliver.
|
||||
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
||||
should_deliver = bool(deliver_content)
|
||||
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
||||
@@ -1089,11 +1161,34 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
mark_job_run(job["id"], success, error)
|
||||
executed += 1
|
||||
|
||||
return (job["id"], True)
|
||||
except Exception as e:
|
||||
logger.error("Error processing job %s: %s", job['id'], e)
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
logger.error("Error processing job %s: %s", job["id"], e)
|
||||
try:
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
except Exception:
|
||||
pass
|
||||
return (job["id"], False)
|
||||
|
||||
if sys.is_finalizing():
|
||||
logger.warning(
|
||||
"Interpreter finalizing — skipping %d job(s)",
|
||||
len(due_jobs),
|
||||
)
|
||||
else:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as pool:
|
||||
futures = {
|
||||
pool.submit(_run_single_job, job): job
|
||||
for job in due_jobs
|
||||
}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
job_id, ok = future.result()
|
||||
if ok:
|
||||
executed += 1
|
||||
except Exception as e:
|
||||
job = futures[future]
|
||||
logger.error("Job '%s' future error: %s", job["id"], e)
|
||||
|
||||
return executed
|
||||
finally:
|
||||
|
||||
@@ -73,6 +73,7 @@ def cron_list(show_all: bool = False):
|
||||
deliver_str = ", ".join(deliver)
|
||||
|
||||
skills = job.get("skills") or ([job["skill"]] if job.get("skill") else [])
|
||||
profile = job.get("profile")
|
||||
if state == "paused":
|
||||
status = color("[paused]", Colors.YELLOW)
|
||||
elif state == "completed":
|
||||
@@ -88,6 +89,8 @@ def cron_list(show_all: bool = False):
|
||||
print(f" Repeat: {repeat_str}")
|
||||
print(f" Next run: {next_run}")
|
||||
print(f" Deliver: {deliver_str}")
|
||||
if profile:
|
||||
print(f" Profile: {profile}")
|
||||
if skills:
|
||||
print(f" Skills: {', '.join(skills)}")
|
||||
script = job.get("script")
|
||||
@@ -153,6 +156,7 @@ def cron_create(args):
|
||||
skill=getattr(args, "skill", None),
|
||||
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
|
||||
script=getattr(args, "script", None),
|
||||
profile=getattr(args, "profile", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
@@ -165,6 +169,8 @@ def cron_create(args):
|
||||
job_data = result.get("job", {})
|
||||
if job_data.get("script"):
|
||||
print(f" Script: {job_data['script']}")
|
||||
if job_data.get("profile"):
|
||||
print(f" Profile: {job_data['profile']}")
|
||||
print(f" Next run: {result['next_run_at']}")
|
||||
return 0
|
||||
|
||||
@@ -203,6 +209,7 @@ def cron_edit(args):
|
||||
repeat=getattr(args, "repeat", None),
|
||||
skills=final_skills,
|
||||
script=getattr(args, "script", None),
|
||||
profile=getattr(args, "profile", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
|
||||
291
tests/cron/test_profile_cron.py
Normal file
291
tests/cron/test_profile_cron.py
Normal file
@@ -0,0 +1,291 @@
|
||||
"""Tests for profile-scoped cron with parallel execution.
|
||||
|
||||
Covers:
|
||||
- Profile field on jobs (create, list, round-trip)
|
||||
- Profile resolution (resolve_profile_home)
|
||||
- Profile isolation (env/config loading per job)
|
||||
- Parallel execution (tick runs jobs concurrently)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from cron.jobs import create_job, get_job, load_jobs, save_jobs, list_jobs, update_job
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Profile field on jobs
|
||||
# =========================================================================
|
||||
|
||||
class TestProfileOnJobs:
|
||||
"""Jobs should accept, store, and round-trip a 'profile' field."""
|
||||
|
||||
def test_create_job_with_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
job = create_job(
|
||||
prompt="test prompt",
|
||||
schedule="every 1h",
|
||||
name="profiled-job",
|
||||
profile="sprint",
|
||||
)
|
||||
assert job["profile"] == "sprint"
|
||||
assert job["name"] == "profiled-job"
|
||||
|
||||
def test_create_job_without_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
job = create_job(prompt="test", schedule="every 1h")
|
||||
assert job.get("profile") is None
|
||||
|
||||
def test_profile_round_trips_through_storage(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
create_job(prompt="test", schedule="every 1h", profile="fenrir")
|
||||
jobs = list_jobs()
|
||||
assert len(jobs) == 1
|
||||
assert jobs[0]["profile"] == "fenrir"
|
||||
|
||||
def test_update_job_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
job = create_job(prompt="test", schedule="every 1h", profile="sprint")
|
||||
updated = update_job(job["id"], {"profile": "fenrir"})
|
||||
assert updated["profile"] == "fenrir"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Profile resolution
|
||||
# =========================================================================
|
||||
|
||||
class TestProfileResolution:
|
||||
"""Resolve a profile name to its HERMES_HOME directory."""
|
||||
|
||||
def test_default_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
|
||||
from cron.scheduler import _resolve_profile_home
|
||||
assert _resolve_profile_home(None) == tmp_path
|
||||
|
||||
def test_default_profile_explicit(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
|
||||
from cron.scheduler import _resolve_profile_home
|
||||
assert _resolve_profile_home("default") == tmp_path
|
||||
|
||||
def test_named_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
|
||||
profile_home = tmp_path / "profiles" / "sprint"
|
||||
profile_home.mkdir(parents=True, exist_ok=True)
|
||||
from cron.scheduler import _resolve_profile_home
|
||||
assert _resolve_profile_home("sprint") == profile_home
|
||||
|
||||
def test_missing_named_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
|
||||
from cron.scheduler import _resolve_profile_home
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
_resolve_profile_home("nonexistent")
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Profile isolation
|
||||
# =========================================================================
|
||||
|
||||
class TestProfileIsolation:
|
||||
"""Each job should load its profile's config.yaml and .env."""
|
||||
|
||||
def test_profile_env_loading(self, tmp_path, monkeypatch):
|
||||
"""When a job has a profile, its .env should be loaded."""
|
||||
hermes_home = tmp_path
|
||||
profile_home = tmp_path / "profiles" / "sprint"
|
||||
profile_home.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create profile .env with a unique var
|
||||
(profile_home / ".env").write_text("SPRINT_VAR=sprint_value\n")
|
||||
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
|
||||
|
||||
from cron.scheduler import _resolve_profile_home, _load_profile_env
|
||||
result_env = {}
|
||||
_load_profile_env(profile_home, result_env)
|
||||
assert result_env.get("SPRINT_VAR") == "sprint_value"
|
||||
|
||||
def test_profile_config_loading(self, tmp_path, monkeypatch):
|
||||
"""When a job has a profile, its config.yaml should be loaded."""
|
||||
import yaml
|
||||
hermes_home = tmp_path
|
||||
profile_home = tmp_path / "profiles" / "fenrir"
|
||||
profile_home.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
config_data = {
|
||||
"model": {"default": "test-model"},
|
||||
"agent": {"max_turns": 42},
|
||||
}
|
||||
(profile_home / "config.yaml").write_text(yaml.dump(config_data))
|
||||
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
|
||||
|
||||
from cron.scheduler import _load_profile_config
|
||||
result = _load_profile_config(profile_home)
|
||||
assert result["agent"]["max_turns"] == 42
|
||||
assert result["model"]["default"] == "test-model"
|
||||
|
||||
def test_default_profile_uses_hermes_home(self, tmp_path, monkeypatch):
|
||||
"""Jobs without profile use the default HERMES_HOME config/env."""
|
||||
import yaml
|
||||
hermes_home = tmp_path
|
||||
(hermes_home / ".env").write_text("DEFAULT_VAR=default_value\n")
|
||||
(hermes_home / "config.yaml").write_text(yaml.dump({"model": "default-model"}))
|
||||
|
||||
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
|
||||
|
||||
from cron.scheduler import _resolve_profile_home, _load_profile_env, _load_profile_config
|
||||
home = _resolve_profile_home(None)
|
||||
env = {}
|
||||
_load_profile_env(home, env)
|
||||
assert env.get("DEFAULT_VAR") == "default_value"
|
||||
cfg = _load_profile_config(home)
|
||||
assert cfg["model"] == "default-model"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Parallel execution
|
||||
# =========================================================================
|
||||
|
||||
class TestParallelExecution:
|
||||
"""tick() should run due jobs in parallel within a profile."""
|
||||
|
||||
def test_jobs_run_in_parallel(self, tmp_path, monkeypatch):
|
||||
"""Multiple due jobs should execute concurrently, not sequentially."""
|
||||
from cron.scheduler import tick
|
||||
import cron.scheduler as sched
|
||||
|
||||
# Set up fake jobs
|
||||
fake_jobs = [
|
||||
{"id": "job1", "name": "Job 1", "prompt": "test", "enabled": True,
|
||||
"schedule": {"kind": "interval", "minutes": 1},
|
||||
"next_run_at": "2020-01-01T00:00:00+00:00",
|
||||
"profile": None, "deliver": "local"},
|
||||
{"id": "job2", "name": "Job 2", "prompt": "test", "enabled": True,
|
||||
"schedule": {"kind": "interval", "minutes": 1},
|
||||
"next_run_at": "2020-01-01T00:00:00+00:00",
|
||||
"profile": None, "deliver": "local"},
|
||||
{"id": "job3", "name": "Job 3", "prompt": "test", "enabled": True,
|
||||
"schedule": {"kind": "interval", "minutes": 1},
|
||||
"next_run_at": "2020-01-01T00:00:00+00:00",
|
||||
"profile": None, "deliver": "local"},
|
||||
]
|
||||
|
||||
run_times = {}
|
||||
|
||||
def fake_run_job(job):
|
||||
run_times[job["id"]] = {"start": time.monotonic()}
|
||||
time.sleep(0.5) # Simulate 500ms of work
|
||||
run_times[job["id"]]["end"] = time.monotonic()
|
||||
return True, "output", "response", None
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
|
||||
monkeypatch.setattr(sched, "run_job", fake_run_job)
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
|
||||
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
|
||||
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
|
||||
|
||||
start = time.monotonic()
|
||||
count = tick(verbose=False)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert count == 3
|
||||
# If parallel: ~0.5s total. If sequential: ~1.5s total.
|
||||
# Allow generous margin for CI slowness.
|
||||
assert elapsed < 1.2, f"Jobs ran in {elapsed:.1f}s — expected parallel (~0.5s), got sequential?"
|
||||
|
||||
# Verify all 3 jobs overlapped (started before any ended)
|
||||
starts = sorted(run_times[jid]["start"] for jid in run_times)
|
||||
ends = sorted(run_times[jid]["end"] for jid in run_times)
|
||||
# At least 2 jobs should have started before the first one ended
|
||||
assert starts[1] < ends[0], "Jobs did not overlap — not running in parallel"
|
||||
|
||||
def test_parallel_across_profiles(self, tmp_path, monkeypatch):
|
||||
"""Jobs with different profiles should run in parallel."""
|
||||
from cron.scheduler import tick
|
||||
import cron.scheduler as sched
|
||||
|
||||
fake_jobs = [
|
||||
{"id": "j1", "name": "Sprint Job", "prompt": "test", "enabled": True,
|
||||
"schedule": {"kind": "interval", "minutes": 1},
|
||||
"next_run_at": "2020-01-01T00:00:00+00:00",
|
||||
"profile": "sprint", "deliver": "local"},
|
||||
{"id": "j2", "name": "Fenrir Job", "prompt": "test", "enabled": True,
|
||||
"schedule": {"kind": "interval", "minutes": 1},
|
||||
"next_run_at": "2020-01-01T00:00:00+00:00",
|
||||
"profile": "fenrir", "deliver": "local"},
|
||||
]
|
||||
|
||||
profiles_loaded = {}
|
||||
|
||||
def fake_run_job(job):
|
||||
profiles_loaded[job["id"]] = job.get("profile")
|
||||
return True, "output", "response", None
|
||||
|
||||
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
|
||||
monkeypatch.setattr(sched, "run_job", fake_run_job)
|
||||
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
|
||||
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
|
||||
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
|
||||
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
|
||||
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
|
||||
|
||||
count = tick(verbose=False)
|
||||
assert count == 2
|
||||
assert profiles_loaded["j1"] == "sprint"
|
||||
assert profiles_loaded["j2"] == "fenrir"
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Cron tool integration
|
||||
# =========================================================================
|
||||
|
||||
class TestCronToolProfile:
|
||||
"""The cronjob tool should accept and pass through the profile field."""
|
||||
|
||||
def test_create_with_profile(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
|
||||
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
|
||||
tmp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
from tools.cronjob_tools import cronjob
|
||||
result = json.loads(cronjob(
|
||||
action="create",
|
||||
schedule="every 1h",
|
||||
prompt="test prompt",
|
||||
name="profiled",
|
||||
profile="sprint",
|
||||
))
|
||||
assert result["success"] is True
|
||||
assert result["job"]["profile"] == "sprint"
|
||||
|
||||
def test_schema_has_profile(self):
|
||||
from tools.cronjob_tools import CRONJOB_SCHEMA
|
||||
props = CRONJOB_SCHEMA["parameters"]["properties"]
|
||||
assert "profile" in props
|
||||
assert props["profile"]["type"] == "string"
|
||||
@@ -190,6 +190,7 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"model": job.get("model"),
|
||||
"provider": job.get("provider"),
|
||||
"base_url": job.get("base_url"),
|
||||
"profile": job.get("profile"),
|
||||
"schedule": job.get("schedule_display"),
|
||||
"repeat": _repeat_display(job),
|
||||
"deliver": job.get("deliver", "local"),
|
||||
@@ -222,6 +223,7 @@ def cronjob(
|
||||
base_url: Optional[str] = None,
|
||||
reason: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
task_id: str = None,
|
||||
) -> str:
|
||||
"""Unified cron job management tool."""
|
||||
@@ -259,6 +261,7 @@ def cronjob(
|
||||
provider=_normalize_optional_job_value(provider),
|
||||
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
|
||||
script=_normalize_optional_job_value(script),
|
||||
profile=_normalize_optional_job_value(profile),
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
@@ -355,6 +358,8 @@ def cronjob(
|
||||
if script_error:
|
||||
return json.dumps({"success": False, "error": script_error}, indent=2)
|
||||
updates["script"] = _normalize_optional_job_value(script) if script else None
|
||||
if profile is not None:
|
||||
updates["profile"] = _normalize_optional_job_value(profile)
|
||||
if repeat is not None:
|
||||
# Normalize: treat 0 or negative as None (infinite)
|
||||
normalized_repeat = None if repeat <= 0 else repeat
|
||||
@@ -488,6 +493,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
||||
"type": "string",
|
||||
"description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear."
|
||||
},
|
||||
"profile": {
|
||||
"type": "string",
|
||||
"description": "Optional profile name for profile-scoped cron. When set, the job runs with that profile's config.yaml, .env, and HERMES_ACTIVE_PROFILE environment variable. Enables different models, API keys, and configurations per job. Profile must exist (create with: hermes profile create <name>)."
|
||||
},
|
||||
},
|
||||
"required": ["action"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user