Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
cff035d46c feat(cron): Profile-scoped cron with parallel execution (#334)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
Rescued from blocked PR #307. Clean reimplementation on current main.

## What
- Jobs accept a `profile` field — when set, the scheduler resolves that
  profile's config.yaml + .env and sets HERMES_ACTIVE_PROFILE before running
- Parallel execution: tick() now uses ThreadPoolExecutor (default 4 workers,
  configurable via HERMES_CRON_PARALLEL_WORKERS env var)
- Profile isolation: each job loads only its profile's config/env, preventing
  cross-contamination between concurrent jobs

## Changes
- cron/jobs.py: `profile` field on create_job(), stored on job dict
- cron/scheduler.py:
  - _resolve_profile_home() — resolves profile name → HERMES_HOME dir
  - _load_profile_env() — loads profile .env without touching os.environ
  - _load_profile_config() — loads profile config.yaml
  - run_job() — profile-aware config/env loading + HERMES_ACTIVE_PROFILE
  - tick() — parallel execution via ThreadPoolExecutor
  - Added missing ModelContextError + CRON_MIN_CONTEXT_TOKENS
- tools/cronjob_tools.py: `profile` param on create/update, schema, format
- hermes_cli/cron.py: `--profile` on create/edit, display in list
- tests/cron/test_profile_cron.py: 15 tests covering profile field CRUD,
  resolution, isolation, parallel execution, and tool integration

## Testing
106 tests pass (52 existing cron jobs + 15 new profile tests + 39 tool tests).
2026-04-13 17:57:35 -04:00
6 changed files with 406 additions and 199 deletions

View File

@@ -376,6 +376,7 @@ def create_job(
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -395,6 +396,8 @@ def create_job(
script: Optional path to a Python script whose stdout is injected into the
prompt each run. The script runs before the agent turn, and its output
is prepended as context. Useful for data collection / change detection.
profile: Optional profile name for profile-scoped cron. When set, the job
runs with that profile's config.yaml, .env, and HERMES_ACTIVE_PROFILE.
Returns:
The created job dict
@@ -427,6 +430,8 @@ def create_job(
normalized_script = normalized_script or None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
normalized_profile = str(profile).strip() if isinstance(profile, str) else None
normalized_profile = normalized_profile or None
job = {
"id": job_id,
"name": name or label_source[:50].strip(),
@@ -437,6 +442,7 @@ def create_job(
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"profile": normalized_profile,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -28,7 +27,7 @@ except ImportError:
msvcrt = None
import time
from pathlib import Path
from typing import Optional
from typing import Any, Dict, Optional
# Add parent directory to path for imports BEFORE repo-level imports.
# Without this, standalone invocations (e.g. after `hermes update` reloads
@@ -158,7 +157,7 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Model context guard
# Model context guard — rejects jobs whose model context is too small for cron.
# ---------------------------------------------------------------------------
CRON_MIN_CONTEXT_TOKENS = 4096
@@ -567,52 +566,57 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
# ---------------------------------------------------------------------------
# Cloud context warning — detect local service refs in cloud cron prompts
# Profile-scoped cron — resolve profile home, load profile config/env
# ---------------------------------------------------------------------------
_LOCAL_SERVICE_PATTERNS = [
r'localhost:\d{2,5}',
r'127\.0\.0\.\d{1,3}:\d{2,5}',
r'0\.0\.0\.0:\d{2,5}',
r'\bollama\b',
r'curl\s+.*localhost',
r'wget\s+.*localhost',
r'http://localhost',
r'https?://127\.',
r'https?://0\.0\.0\.0',
r'check.*ollama',
r'connect.*local',
r'hermes.*gateway.*local',
]
def _resolve_profile_home(profile: Optional[str]) -> Path:
"""Resolve a profile name to its HERMES_HOME directory.
_LOCAL_SERVICE_RE = [re.compile(p, re.IGNORECASE) for p in _LOCAL_SERVICE_PATTERNS]
def _detect_local_service_refs(prompt: str) -> list[str]:
"""Scan a prompt for references to local services (Ollama, localhost, etc.).
Returns list of matched patterns for logging.
None or "default" → the main HERMES_HOME.
Named profile → ~/.hermes/profiles/<name>.
Raises ValueError if the named profile directory doesn't exist.
"""
matches = []
for pattern_re in _LOCAL_SERVICE_RE:
if pattern_re.search(prompt):
matches.append(pattern_re.pattern)
return matches
if not profile or profile == "default":
return _hermes_home
profile_home = _hermes_home / "profiles" / profile
if not profile_home.is_dir():
raise ValueError(
f"Profile '{profile}' not found at {profile_home}. "
f"Create it with: hermes profile create {profile}"
)
return profile_home
def _inject_cloud_context(prompt: str, local_refs: list[str]) -> str:
"""Prepend a warning when cron runs on cloud but prompt refs local services.
def _load_profile_env(profile_home: Path, env_out: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""Load a profile's .env file into env_out (or return a new dict).
The agent reports the limitation instead of wasting iterations on doomed connections.
Does NOT modify os.environ — the caller decides when/whether to inject.
"""
warning = (
"[SYSTEM NOTE: You are running on a cloud endpoint, but your prompt references "
"local services (localhost/Ollama). You cannot reach localhost from a cloud "
"endpoint. Report this limitation to the user and suggest running the job on "
"a local endpoint instead. Do NOT attempt to connect to localhost — it will "
"timeout and waste your iteration budget.]\n\n"
)
return warning + prompt
from dotenv import dotenv_values
env_file = profile_home / ".env"
result = env_out if env_out is not None else {}
if env_file.exists():
try:
values = dotenv_values(str(env_file), encoding="utf-8")
except UnicodeDecodeError:
values = dotenv_values(str(env_file), encoding="latin-1")
for k, v in values.items():
if v is not None:
result[k] = v
return result
def _load_profile_config(profile_home: Path) -> Dict[str, Any]:
"""Load a profile's config.yaml, returning the parsed dict (empty dict on failure)."""
try:
import yaml
cfg_path = profile_home / "config.yaml"
if cfg_path.exists():
with open(cfg_path, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.warning("Failed to load config.yaml from %s: %s", profile_home, e)
return {}
def _build_job_prompt(job: dict) -> str:
@@ -754,11 +758,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
# Re-read .env and config.yaml fresh every run so provider/key
# changes take effect without a gateway restart.
# Profile-scoped: load the job's profile's config/env, not the default.
_profile = job.get("profile")
_profile_home = _resolve_profile_home(_profile)
# Set HERMES_ACTIVE_PROFILE so the agent knows which profile it's running under.
if _profile:
os.environ["HERMES_ACTIVE_PROFILE"] = _profile
from dotenv import load_dotenv
try:
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="utf-8")
load_dotenv(str(_profile_home / ".env"), override=True, encoding="utf-8")
except UnicodeDecodeError:
load_dotenv(str(_hermes_home / ".env"), override=True, encoding="latin-1")
load_dotenv(str(_profile_home / ".env"), override=True, encoding="latin-1")
delivery_target = _resolve_delivery_target(job)
if delivery_target:
@@ -773,7 +785,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cfg = {}
try:
import yaml
_cfg_path = str(_hermes_home / "config.yaml")
_cfg_path = str(_profile_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg = yaml.safe_load(_f) or {}
@@ -784,7 +796,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
except Exception as e:
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
logger.warning("Job '%s': failed to load config.yaml from %s, using defaults: %s", job_id, _profile_home, e)
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
@@ -833,16 +845,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Cloud context warning: if running on cloud but prompt refs local services,
# inject a warning so the agent reports the limitation instead of wasting
# iterations on doomed connections. (Fixes #378, #456)
base_url = runtime.get("base_url") or ""
is_cloud = not any(h in base_url for h in ("localhost", "127.0.0.1", "0.0.0.0", "::1"))
local_refs = _detect_local_service_refs(prompt)
if is_cloud and local_refs:
logger.info("Job '%s': cloud endpoint + local service refs detected, injecting warning", job_name)
prompt = _inject_cloud_context(prompt, local_refs)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,
@@ -1075,6 +1077,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
"HERMES_ACTIVE_PROFILE",
):
os.environ.pop(key, None)
if _session_db:
@@ -1129,34 +1132,22 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Profile-scoped cron with parallel execution.
# Jobs are grouped by profile and submitted to a thread pool so that
# jobs in different profiles run concurrently without interfering.
# Each job runs with its profile's own config.yaml and .env.
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
_max_workers = int(os.getenv("HERMES_CRON_PARALLEL_WORKERS", "4"))
def _run_single_job(job: dict) -> tuple:
"""Run a single job: advance schedule, execute, deliver, mark."""
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
logger.info("Job '%s' output saved to: %s", job["id"], output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
@@ -1170,11 +1161,34 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error)
executed += 1
return (job["id"], True)
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
logger.error("Error processing job %s: %s", job["id"], e)
try:
mark_job_run(job["id"], False, str(e))
except Exception:
pass
return (job["id"], False)
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d job(s)",
len(due_jobs),
)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as pool:
futures = {
pool.submit(_run_single_job, job): job
for job in due_jobs
}
for future in concurrent.futures.as_completed(futures):
try:
job_id, ok = future.result()
if ok:
executed += 1
except Exception as e:
job = futures[future]
logger.error("Job '%s' future error: %s", job["id"], e)
return executed
finally:

View File

@@ -73,6 +73,7 @@ def cron_list(show_all: bool = False):
deliver_str = ", ".join(deliver)
skills = job.get("skills") or ([job["skill"]] if job.get("skill") else [])
profile = job.get("profile")
if state == "paused":
status = color("[paused]", Colors.YELLOW)
elif state == "completed":
@@ -88,6 +89,8 @@ def cron_list(show_all: bool = False):
print(f" Repeat: {repeat_str}")
print(f" Next run: {next_run}")
print(f" Deliver: {deliver_str}")
if profile:
print(f" Profile: {profile}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
@@ -153,6 +156,7 @@ def cron_create(args):
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -165,6 +169,8 @@ def cron_create(args):
job_data = result.get("job", {})
if job_data.get("script"):
print(f" Script: {job_data['script']}")
if job_data.get("profile"):
print(f" Profile: {job_data['profile']}")
print(f" Next run: {result['next_run_at']}")
return 0
@@ -203,6 +209,7 @@ def cron_edit(args):
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))

View File

@@ -1,120 +0,0 @@
"""Tests for cron cloud context warning injection (fix #378, #456).
When a cron job runs on a cloud endpoint but its prompt references local
services (Ollama, localhost, etc.), inject a warning so the agent reports
the limitation instead of wasting iterations on doomed connections.
"""
import pytest
from cron.scheduler import (
_detect_local_service_refs,
_inject_cloud_context,
_LOCAL_SERVICE_PATTERNS,
)
# ---------------------------------------------------------------------------
# Pattern detection
# ---------------------------------------------------------------------------
class TestDetectLocalServiceRefs:
def test_localhost_with_port(self):
refs = _detect_local_service_refs("Check http://localhost:8080/status")
assert len(refs) > 0
assert any("localhost" in r for r in refs)
def test_127_address(self):
refs = _detect_local_service_refs("Connect to 127.0.0.1:11434")
assert len(refs) > 0
def test_ollama_reference(self):
refs = _detect_local_service_refs("Run this on Ollama with gemma3")
assert len(refs) > 0
assert any("ollama" in r.lower() for r in refs)
def test_curl_localhost(self):
refs = _detect_local_service_refs("curl localhost:3000/api/data")
assert len(refs) > 0
def test_wget_localhost(self):
refs = _detect_local_service_refs("wget http://localhost/file.txt")
assert len(refs) > 0
def test_http_localhost(self):
refs = _detect_local_service_refs("http://localhost:8642/health")
assert len(refs) > 0
def test_https_127(self):
refs = _detect_local_service_refs("https://127.0.0.1:443/secure")
assert len(refs) > 0
def test_0000_address(self):
refs = _detect_local_service_refs("Bind to 0.0.0.0:9090")
assert len(refs) > 0
def test_no_match_for_remote(self):
refs = _detect_local_service_refs("Check https://api.openai.com/v1/models")
assert len(refs) == 0
def test_no_match_for_gitea(self):
refs = _detect_local_service_refs("Query forge.alexanderwhitestone.com for issues")
assert len(refs) == 0
def test_no_match_empty(self):
refs = _detect_local_service_refs("")
assert len(refs) == 0
def test_check_ollama_phrase(self):
refs = _detect_local_service_refs("First check Ollama is running")
assert len(refs) > 0
def test_connect_local_phrase(self):
refs = _detect_local_service_refs("Connect to local Ollama server")
assert len(refs) > 0
# ---------------------------------------------------------------------------
# Warning injection
# ---------------------------------------------------------------------------
class TestInjectCloudContext:
def test_prepends_warning(self):
original = "Run a health check on localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "SYSTEM NOTE" in result
assert "cloud endpoint" in result
assert original in result
def test_warning_is_first(self):
original = "Check localhost:11434"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert result.startswith("[SYSTEM NOTE")
def test_preserves_original_prompt(self):
original = "Do something with Ollama and then report results"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "Do something with Ollama" in result
def test_mentions_cannot_reach(self):
original = "curl localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "cannot reach" in result.lower() or "cannot" in result.lower()
# ---------------------------------------------------------------------------
# Pattern coverage
# ---------------------------------------------------------------------------
class TestPatternCoverage:
def test_at_least_10_patterns(self):
assert len(_LOCAL_SERVICE_PATTERNS) >= 10
def test_patterns_are_strings(self):
for p in _LOCAL_SERVICE_PATTERNS:
assert isinstance(p, str)
assert len(p) > 0

View File

@@ -0,0 +1,291 @@
"""Tests for profile-scoped cron with parallel execution.
Covers:
- Profile field on jobs (create, list, round-trip)
- Profile resolution (resolve_profile_home)
- Profile isolation (env/config loading per job)
- Parallel execution (tick runs jobs concurrently)
"""
import json
import os
import tempfile
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from cron.jobs import create_job, get_job, load_jobs, save_jobs, list_jobs, update_job
# =========================================================================
# Profile field on jobs
# =========================================================================
class TestProfileOnJobs:
"""Jobs should accept, store, and round-trip a 'profile' field."""
def test_create_job_with_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(
prompt="test prompt",
schedule="every 1h",
name="profiled-job",
profile="sprint",
)
assert job["profile"] == "sprint"
assert job["name"] == "profiled-job"
def test_create_job_without_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(prompt="test", schedule="every 1h")
assert job.get("profile") is None
def test_profile_round_trips_through_storage(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
create_job(prompt="test", schedule="every 1h", profile="fenrir")
jobs = list_jobs()
assert len(jobs) == 1
assert jobs[0]["profile"] == "fenrir"
def test_update_job_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(prompt="test", schedule="every 1h", profile="sprint")
updated = update_job(job["id"], {"profile": "fenrir"})
assert updated["profile"] == "fenrir"
# =========================================================================
# Profile resolution
# =========================================================================
class TestProfileResolution:
"""Resolve a profile name to its HERMES_HOME directory."""
def test_default_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home(None) == tmp_path
def test_default_profile_explicit(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home("default") == tmp_path
def test_named_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
profile_home = tmp_path / "profiles" / "sprint"
profile_home.mkdir(parents=True, exist_ok=True)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home("sprint") == profile_home
def test_missing_named_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
with pytest.raises(ValueError, match="not found"):
_resolve_profile_home("nonexistent")
# =========================================================================
# Profile isolation
# =========================================================================
class TestProfileIsolation:
"""Each job should load its profile's config.yaml and .env."""
def test_profile_env_loading(self, tmp_path, monkeypatch):
"""When a job has a profile, its .env should be loaded."""
hermes_home = tmp_path
profile_home = tmp_path / "profiles" / "sprint"
profile_home.mkdir(parents=True, exist_ok=True)
# Create profile .env with a unique var
(profile_home / ".env").write_text("SPRINT_VAR=sprint_value\n")
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _resolve_profile_home, _load_profile_env
result_env = {}
_load_profile_env(profile_home, result_env)
assert result_env.get("SPRINT_VAR") == "sprint_value"
def test_profile_config_loading(self, tmp_path, monkeypatch):
"""When a job has a profile, its config.yaml should be loaded."""
import yaml
hermes_home = tmp_path
profile_home = tmp_path / "profiles" / "fenrir"
profile_home.mkdir(parents=True, exist_ok=True)
config_data = {
"model": {"default": "test-model"},
"agent": {"max_turns": 42},
}
(profile_home / "config.yaml").write_text(yaml.dump(config_data))
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _load_profile_config
result = _load_profile_config(profile_home)
assert result["agent"]["max_turns"] == 42
assert result["model"]["default"] == "test-model"
def test_default_profile_uses_hermes_home(self, tmp_path, monkeypatch):
"""Jobs without profile use the default HERMES_HOME config/env."""
import yaml
hermes_home = tmp_path
(hermes_home / ".env").write_text("DEFAULT_VAR=default_value\n")
(hermes_home / "config.yaml").write_text(yaml.dump({"model": "default-model"}))
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _resolve_profile_home, _load_profile_env, _load_profile_config
home = _resolve_profile_home(None)
env = {}
_load_profile_env(home, env)
assert env.get("DEFAULT_VAR") == "default_value"
cfg = _load_profile_config(home)
assert cfg["model"] == "default-model"
# =========================================================================
# Parallel execution
# =========================================================================
class TestParallelExecution:
"""tick() should run due jobs in parallel within a profile."""
def test_jobs_run_in_parallel(self, tmp_path, monkeypatch):
"""Multiple due jobs should execute concurrently, not sequentially."""
from cron.scheduler import tick
import cron.scheduler as sched
# Set up fake jobs
fake_jobs = [
{"id": "job1", "name": "Job 1", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
{"id": "job2", "name": "Job 2", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
{"id": "job3", "name": "Job 3", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
]
run_times = {}
def fake_run_job(job):
run_times[job["id"]] = {"start": time.monotonic()}
time.sleep(0.5) # Simulate 500ms of work
run_times[job["id"]]["end"] = time.monotonic()
return True, "output", "response", None
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
monkeypatch.setattr(sched, "run_job", fake_run_job)
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
start = time.monotonic()
count = tick(verbose=False)
elapsed = time.monotonic() - start
assert count == 3
# If parallel: ~0.5s total. If sequential: ~1.5s total.
# Allow generous margin for CI slowness.
assert elapsed < 1.2, f"Jobs ran in {elapsed:.1f}s — expected parallel (~0.5s), got sequential?"
# Verify all 3 jobs overlapped (started before any ended)
starts = sorted(run_times[jid]["start"] for jid in run_times)
ends = sorted(run_times[jid]["end"] for jid in run_times)
# At least 2 jobs should have started before the first one ended
assert starts[1] < ends[0], "Jobs did not overlap — not running in parallel"
def test_parallel_across_profiles(self, tmp_path, monkeypatch):
"""Jobs with different profiles should run in parallel."""
from cron.scheduler import tick
import cron.scheduler as sched
fake_jobs = [
{"id": "j1", "name": "Sprint Job", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": "sprint", "deliver": "local"},
{"id": "j2", "name": "Fenrir Job", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": "fenrir", "deliver": "local"},
]
profiles_loaded = {}
def fake_run_job(job):
profiles_loaded[job["id"]] = job.get("profile")
return True, "output", "response", None
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
monkeypatch.setattr(sched, "run_job", fake_run_job)
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
count = tick(verbose=False)
assert count == 2
assert profiles_loaded["j1"] == "sprint"
assert profiles_loaded["j2"] == "fenrir"
# =========================================================================
# Cron tool integration
# =========================================================================
class TestCronToolProfile:
"""The cronjob tool should accept and pass through the profile field."""
def test_create_with_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
from tools.cronjob_tools import cronjob
result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="test prompt",
name="profiled",
profile="sprint",
))
assert result["success"] is True
assert result["job"]["profile"] == "sprint"
def test_schema_has_profile(self):
from tools.cronjob_tools import CRONJOB_SCHEMA
props = CRONJOB_SCHEMA["parameters"]["properties"]
assert "profile" in props
assert props["profile"]["type"] == "string"

View File

@@ -190,6 +190,7 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"model": job.get("model"),
"provider": job.get("provider"),
"base_url": job.get("base_url"),
"profile": job.get("profile"),
"schedule": job.get("schedule_display"),
"repeat": _repeat_display(job),
"deliver": job.get("deliver", "local"),
@@ -222,6 +223,7 @@ def cronjob(
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -259,6 +261,7 @@ def cronjob(
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
profile=_normalize_optional_job_value(profile),
)
return json.dumps(
{
@@ -355,6 +358,8 @@ def cronjob(
if script_error:
return json.dumps({"success": False, "error": script_error}, indent=2)
updates["script"] = _normalize_optional_job_value(script) if script else None
if profile is not None:
updates["profile"] = _normalize_optional_job_value(profile)
if repeat is not None:
# Normalize: treat 0 or negative as None (infinite)
normalized_repeat = None if repeat <= 0 else repeat
@@ -488,6 +493,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"type": "string",
"description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear."
},
"profile": {
"type": "string",
"description": "Optional profile name for profile-scoped cron. When set, the job runs with that profile's config.yaml, .env, and HERMES_ACTIVE_PROFILE environment variable. Enables different models, API keys, and configurations per job. Profile must exist (create with: hermes profile create <name>)."
},
},
"required": ["action"]
}