Files
hermes-agent/tests/cron/test_profile_cron.py
Timmy cff035d46c
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
feat(cron): Profile-scoped cron with parallel execution (#334)
Rescued from blocked PR #307. Clean reimplementation on current main.

## What
- Jobs accept a `profile` field — when set, the scheduler resolves that
  profile's config.yaml + .env and sets HERMES_ACTIVE_PROFILE before running
- Parallel execution: tick() now uses ThreadPoolExecutor (default 4 workers,
  configurable via HERMES_CRON_PARALLEL_WORKERS env var)
- Profile isolation: each job loads only its profile's config/env, preventing
  cross-contamination between concurrent jobs

## Changes
- cron/jobs.py: `profile` field on create_job(), stored on job dict
- cron/scheduler.py:
  - _resolve_profile_home() — resolves profile name → HERMES_HOME dir
  - _load_profile_env() — loads profile .env without touching os.environ
  - _load_profile_config() — loads profile config.yaml
  - run_job() — profile-aware config/env loading + HERMES_ACTIVE_PROFILE
  - tick() — parallel execution via ThreadPoolExecutor
  - Added missing ModelContextError + CRON_MIN_CONTEXT_TOKENS
- tools/cronjob_tools.py: `profile` param on create/update, schema, format
- hermes_cli/cron.py: `--profile` on create/edit, display in list
- tests/cron/test_profile_cron.py: 15 tests covering profile field CRUD,
  resolution, isolation, parallel execution, and tool integration

## Testing
106 tests pass (52 existing cron jobs + 15 new profile tests + 39 tool tests).
2026-04-13 17:57:35 -04:00

292 lines
12 KiB
Python

"""Tests for profile-scoped cron with parallel execution.
Covers:
- Profile field on jobs (create, list, round-trip)
- Profile resolution (resolve_profile_home)
- Profile isolation (env/config loading per job)
- Parallel execution (tick runs jobs concurrently)
"""
import json
import os
import tempfile
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from cron.jobs import create_job, get_job, load_jobs, save_jobs, list_jobs, update_job
# =========================================================================
# Profile field on jobs
# =========================================================================
class TestProfileOnJobs:
"""Jobs should accept, store, and round-trip a 'profile' field."""
def test_create_job_with_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(
prompt="test prompt",
schedule="every 1h",
name="profiled-job",
profile="sprint",
)
assert job["profile"] == "sprint"
assert job["name"] == "profiled-job"
def test_create_job_without_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(prompt="test", schedule="every 1h")
assert job.get("profile") is None
def test_profile_round_trips_through_storage(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
create_job(prompt="test", schedule="every 1h", profile="fenrir")
jobs = list_jobs()
assert len(jobs) == 1
assert jobs[0]["profile"] == "fenrir"
def test_update_job_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
job = create_job(prompt="test", schedule="every 1h", profile="sprint")
updated = update_job(job["id"], {"profile": "fenrir"})
assert updated["profile"] == "fenrir"
# =========================================================================
# Profile resolution
# =========================================================================
class TestProfileResolution:
"""Resolve a profile name to its HERMES_HOME directory."""
def test_default_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home(None) == tmp_path
def test_default_profile_explicit(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home("default") == tmp_path
def test_named_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
profile_home = tmp_path / "profiles" / "sprint"
profile_home.mkdir(parents=True, exist_ok=True)
from cron.scheduler import _resolve_profile_home
assert _resolve_profile_home("sprint") == profile_home
def test_missing_named_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.scheduler._hermes_home", tmp_path)
from cron.scheduler import _resolve_profile_home
with pytest.raises(ValueError, match="not found"):
_resolve_profile_home("nonexistent")
# =========================================================================
# Profile isolation
# =========================================================================
class TestProfileIsolation:
"""Each job should load its profile's config.yaml and .env."""
def test_profile_env_loading(self, tmp_path, monkeypatch):
"""When a job has a profile, its .env should be loaded."""
hermes_home = tmp_path
profile_home = tmp_path / "profiles" / "sprint"
profile_home.mkdir(parents=True, exist_ok=True)
# Create profile .env with a unique var
(profile_home / ".env").write_text("SPRINT_VAR=sprint_value\n")
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _resolve_profile_home, _load_profile_env
result_env = {}
_load_profile_env(profile_home, result_env)
assert result_env.get("SPRINT_VAR") == "sprint_value"
def test_profile_config_loading(self, tmp_path, monkeypatch):
"""When a job has a profile, its config.yaml should be loaded."""
import yaml
hermes_home = tmp_path
profile_home = tmp_path / "profiles" / "fenrir"
profile_home.mkdir(parents=True, exist_ok=True)
config_data = {
"model": {"default": "test-model"},
"agent": {"max_turns": 42},
}
(profile_home / "config.yaml").write_text(yaml.dump(config_data))
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _load_profile_config
result = _load_profile_config(profile_home)
assert result["agent"]["max_turns"] == 42
assert result["model"]["default"] == "test-model"
def test_default_profile_uses_hermes_home(self, tmp_path, monkeypatch):
"""Jobs without profile use the default HERMES_HOME config/env."""
import yaml
hermes_home = tmp_path
(hermes_home / ".env").write_text("DEFAULT_VAR=default_value\n")
(hermes_home / "config.yaml").write_text(yaml.dump({"model": "default-model"}))
monkeypatch.setattr("cron.scheduler._hermes_home", hermes_home)
from cron.scheduler import _resolve_profile_home, _load_profile_env, _load_profile_config
home = _resolve_profile_home(None)
env = {}
_load_profile_env(home, env)
assert env.get("DEFAULT_VAR") == "default_value"
cfg = _load_profile_config(home)
assert cfg["model"] == "default-model"
# =========================================================================
# Parallel execution
# =========================================================================
class TestParallelExecution:
"""tick() should run due jobs in parallel within a profile."""
def test_jobs_run_in_parallel(self, tmp_path, monkeypatch):
"""Multiple due jobs should execute concurrently, not sequentially."""
from cron.scheduler import tick
import cron.scheduler as sched
# Set up fake jobs
fake_jobs = [
{"id": "job1", "name": "Job 1", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
{"id": "job2", "name": "Job 2", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
{"id": "job3", "name": "Job 3", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": None, "deliver": "local"},
]
run_times = {}
def fake_run_job(job):
run_times[job["id"]] = {"start": time.monotonic()}
time.sleep(0.5) # Simulate 500ms of work
run_times[job["id"]]["end"] = time.monotonic()
return True, "output", "response", None
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
monkeypatch.setattr(sched, "run_job", fake_run_job)
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
start = time.monotonic()
count = tick(verbose=False)
elapsed = time.monotonic() - start
assert count == 3
# If parallel: ~0.5s total. If sequential: ~1.5s total.
# Allow generous margin for CI slowness.
assert elapsed < 1.2, f"Jobs ran in {elapsed:.1f}s — expected parallel (~0.5s), got sequential?"
# Verify all 3 jobs overlapped (started before any ended)
starts = sorted(run_times[jid]["start"] for jid in run_times)
ends = sorted(run_times[jid]["end"] for jid in run_times)
# At least 2 jobs should have started before the first one ended
assert starts[1] < ends[0], "Jobs did not overlap — not running in parallel"
def test_parallel_across_profiles(self, tmp_path, monkeypatch):
"""Jobs with different profiles should run in parallel."""
from cron.scheduler import tick
import cron.scheduler as sched
fake_jobs = [
{"id": "j1", "name": "Sprint Job", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": "sprint", "deliver": "local"},
{"id": "j2", "name": "Fenrir Job", "prompt": "test", "enabled": True,
"schedule": {"kind": "interval", "minutes": 1},
"next_run_at": "2020-01-01T00:00:00+00:00",
"profile": "fenrir", "deliver": "local"},
]
profiles_loaded = {}
def fake_run_job(job):
profiles_loaded[job["id"]] = job.get("profile")
return True, "output", "response", None
monkeypatch.setattr(sched, "get_due_jobs", lambda: fake_jobs)
monkeypatch.setattr(sched, "run_job", fake_run_job)
monkeypatch.setattr(sched, "advance_next_run", lambda jid: True)
monkeypatch.setattr(sched, "mark_job_run", lambda jid, s, e=None: None)
monkeypatch.setattr(sched, "save_job_output", lambda jid, o: Path("/dev/null"))
monkeypatch.setattr(sched, "_LOCK_DIR", tmp_path)
monkeypatch.setattr(sched, "_LOCK_FILE", tmp_path / "tick.lock")
count = tick(verbose=False)
assert count == 2
assert profiles_loaded["j1"] == "sprint"
assert profiles_loaded["j2"] == "fenrir"
# =========================================================================
# Cron tool integration
# =========================================================================
class TestCronToolProfile:
"""The cronjob tool should accept and pass through the profile field."""
def test_create_with_profile(self, tmp_path, monkeypatch):
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path)
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "output")
tmp_path.mkdir(parents=True, exist_ok=True)
from tools.cronjob_tools import cronjob
result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="test prompt",
name="profiled",
profile="sprint",
))
assert result["success"] is True
assert result["job"]["profile"] == "sprint"
def test_schema_has_profile(self):
from tools.cronjob_tools import CRONJOB_SCHEMA
props = CRONJOB_SCHEMA["parameters"]["properties"]
assert "profile" in props
assert props["profile"]["type"] == "string"