Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
c325a70ddb feat: profile-scoped cron with parallel execution (#334)
All checks were successful
Lint / lint (pull_request) Successful in 17s
- Add optional profile field to cron jobs (jobs.py, cronjob_tools.py)
- Jobs with a profile run inside that profile's HERMES_HOME context
  using a new _profile_context() context manager in scheduler.py
- tick() now groups due jobs by profile and executes profiles in
  parallel via ThreadPoolExecutor, while jobs within a profile run
  sequentially to avoid config/env contamination
- CLI: add --profile to hermes cron create and cron edit
- Display profile in hermes cron list output
- Tests: profile context env switching, parallel tick grouping

Closes #334
2026-04-22 03:31:37 -04:00
9 changed files with 215 additions and 71 deletions

View File

@@ -378,6 +378,7 @@ def create_job(
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -427,6 +428,8 @@ def create_job(
normalized_base_url = normalized_base_url or None
normalized_script = str(script).strip() if isinstance(script, str) else None
normalized_script = normalized_script or None
normalized_profile = str(profile).strip() if isinstance(profile, str) else None
normalized_profile = normalized_profile or None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
job = {
@@ -439,6 +442,7 @@ def create_job(
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"profile": normalized_profile,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {

View File

@@ -15,6 +15,7 @@ import logging
import os
import subprocess
import sys
from contextlib import contextmanager
# fcntl is Unix-only; on Windows use msvcrt for file locking
try:
@@ -26,7 +27,7 @@ except ImportError:
except ImportError:
msvcrt = None
from pathlib import Path
from typing import Optional
from typing import Optional, Dict, List, Any
# Add parent directory to path for imports BEFORE repo-level imports.
# Without this, standalone invocations (e.g. after `hermes update` reloads
@@ -39,6 +40,46 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# Profile-scoped cron execution (#334)
# Each job can specify a profile; the scheduler switches HERMES_HOME to that
# profile's directory before execution, then restores it afterward.
@contextmanager
def _profile_context(profile_name: Optional[str]):
"""
Temporarily switch HERMES_HOME to a profile's directory.
If profile_name is None or 'default', this is a no-op.
"""
if not profile_name or profile_name == "default":
yield
return
from hermes_cli.profiles import get_profile_dir
profile_dir = get_profile_dir(profile_name)
if not profile_dir.exists():
logger.warning("Profile '%s' does not exist at %s — running in default context", profile_name, profile_dir)
yield
return
old_home = os.environ.get("HERMES_HOME")
old_profile = os.environ.get("HERMES_ACTIVE_PROFILE")
try:
os.environ["HERMES_HOME"] = str(profile_dir)
os.environ["HERMES_ACTIVE_PROFILE"] = profile_name
logger.info("Switched to profile '%s' (HERMES_HOME=%s)", profile_name, profile_dir)
yield
finally:
if old_home is not None:
os.environ["HERMES_HOME"] = old_home
elif "HERMES_HOME" in os.environ:
del os.environ["HERMES_HOME"]
if old_profile is not None:
os.environ["HERMES_ACTIVE_PROFILE"] = old_profile
elif "HERMES_ACTIVE_PROFILE" in os.environ:
del os.environ["HERMES_ACTIVE_PROFILE"]
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -585,6 +626,15 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
from run_agent import AIAgent
profile = job.get("profile")
with _profile_context(profile):
return _run_job_inner(job)
def _run_job_inner(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""Core job execution (inside profile context)."""
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -939,44 +989,71 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0
# Group jobs by profile. Jobs within the same profile run sequentially
# to avoid HERMES_HOME/config/env contamination. Different profiles run
# in parallel via ThreadPoolExecutor.
profile_groups: Dict[Optional[str], List[Dict[str, Any]]] = {}
for job in due_jobs:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
profile = job.get("profile")
profile_groups.setdefault(profile, []).append(job)
success, output, final_response, error = run_job(job)
executed = 0
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
def _run_profile_jobs(profile_name: Optional[str], jobs: List[Dict[str, Any]]) -> int:
"""Run all jobs for a single profile sequentially."""
count = 0
for job in jobs:
try:
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
delivery_error = None
if should_deliver:
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
count += 1
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return count
# Execute profiles in parallel
if len(profile_groups) == 1:
# Single profile — no need for thread pool overhead
profile, jobs = next(iter(profile_groups.items()))
executed = _run_profile_jobs(profile, jobs)
else:
max_workers = min(len(profile_groups), 8)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_run_profile_jobs, profile, jobs): profile
for profile, jobs in profile_groups.items()
}
for future in concurrent.futures.as_completed(futures):
profile = futures[future]
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
executed += 1
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
count = future.result()
executed += count
if verbose:
logger.info("Profile '%s': %s job(s) executed", profile or "default", count)
except Exception as e:
logger.error("Profile '%s' execution failed: %s", profile or "default", e)
return executed
finally:

View File

@@ -88,6 +88,8 @@ def cron_list(show_all: bool = False):
print(f" Repeat: {repeat_str}")
print(f" Next run: {next_run}")
print(f" Deliver: {deliver_str}")
if job.get("profile"):
print(f" Profile: {job['profile']}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
@@ -168,6 +170,7 @@ def cron_create(args):
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -218,6 +221,7 @@ def cron_edit(args):
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))

View File

@@ -4959,6 +4959,7 @@ For more help on a command:
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
cron_create.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
# cron edit
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
@@ -4973,6 +4974,7 @@ For more help on a command:
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
cron_edit.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
# lifecycle actions
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")

View File

@@ -1217,3 +1217,80 @@ class TestSendMediaViaAdapter:
self._run_with_loop(adapter, "123", media_files, None, {"id": "j4"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
class TestProfileContext:
"""Tests for profile-scoped cron execution (#334)."""
def test_profile_context_noop_for_none(self):
from cron.scheduler import _profile_context
with _profile_context(None):
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
def test_profile_context_noop_for_default(self):
from cron.scheduler import _profile_context
with _profile_context("default"):
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
def test_profile_context_sets_env(self, tmp_path, monkeypatch):
from cron.scheduler import _profile_context
from hermes_cli.profiles import get_profile_dir
# Mock home to tmp_path
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
# Create a test profile
profile_dir = get_profile_dir("testprofile")
profile_dir.mkdir(parents=True)
with _profile_context("testprofile"):
assert os.environ["HERMES_ACTIVE_PROFILE"] == "testprofile"
assert os.environ["HERMES_HOME"] == str(profile_dir)
# After exit, env restored
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
assert os.environ["HERMES_HOME"] == str(tmp_path / ".hermes")
def test_profile_context_missing_profile_warns(self, tmp_path, monkeypatch, caplog):
from cron.scheduler import _profile_context
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
with caplog.at_level("WARNING"):
with _profile_context("nonexistent"):
pass
assert "does not exist" in caplog.text
class TestTickParallelExecution:
"""Tests for parallel profile execution in tick()."""
def test_jobs_grouped_by_profile(self, tmp_path, monkeypatch):
from cron.scheduler import tick
from cron.jobs import create_job, update_job, JOBS_FILE
import datetime
# Point jobs file to tmp_path so we don't pollute real cron db
test_jobs_file = tmp_path / "jobs.json"
monkeypatch.setattr("cron.jobs.JOBS_FILE", test_jobs_file)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
job1 = create_job(prompt="test1", schedule="* * * * *", name="j1", profile="alpha")
job2 = create_job(prompt="test2", schedule="* * * * *", name="j2", profile="beta")
job3 = create_job(prompt="test3", schedule="* * * * *", name="j3") # no profile
# Manually set next_run_at to now so they are due
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
for job in [job1, job2, job3]:
update_job(job["id"], {"next_run_at": now})
with patch("cron.scheduler.run_job") as mock_run:
mock_run.return_value = (True, "output", "response", None)
with patch("cron.scheduler.save_job_output"):
with patch("cron.scheduler._deliver_result", return_value=None):
executed = tick(verbose=False)
# All 3 should have been executed
assert executed == 3
assert mock_run.call_count == 3

View File

@@ -26,28 +26,6 @@ class TestHandleFunctionCall:
assert "error" in result
assert "agent loop" in result["error"].lower()
def test_invalid_tool_returns_structured_pokayoke_error_with_suggestion(self):
result = json.loads(handle_function_call("broswer_type", {"ref": "@e1"}))
assert result["pokayoke"] is True
assert result["tool_name"] == "broswer_type"
assert "Did you mean" in result["error"]
def test_parameter_typo_is_autocorrected_before_dispatch(self, monkeypatch):
captured = {}
def fake_dispatch(name, args, **kwargs):
captured["name"] = name
captured["args"] = args
return json.dumps({"ok": True})
monkeypatch.setattr("model_tools.registry.dispatch", fake_dispatch)
result = json.loads(handle_function_call("read_file", {"pathe": "test.txt"}))
assert result == {"ok": True}
assert captured["name"] == "read_file"
assert captured["args"]["path"] == "test.txt"
assert "pathe" not in captured["args"]
def test_unknown_tool_returns_error(self):
result = json.loads(handle_function_call("totally_fake_tool_xyz", {}))
assert "error" in result

View File

@@ -114,9 +114,8 @@ class TestToolCallValidator:
assert len(msgs) == 0
def test_invalid_tool_suggests(self, validator):
is_valid, corrected, params, msgs = validator.validate("broswer_type", {"ref": "@e1"})
is_valid, corrected, params, msgs = validator.validate("browser_typo", {"ref": "@e1"})
assert is_valid is False
assert corrected is None
assert "browser_type" in str(msgs)
def test_auto_correct_tool_name(self, validator):
@@ -131,10 +130,12 @@ class TestToolCallValidator:
assert "ref" in params
assert any("reff" in m and "ref" in m for m in msgs)
def test_circuit_breaker_triggers_on_third_consecutive_failure(self, validator):
validator.validate("nonexistent_tool", {})
validator.validate("nonexistent_tool", {})
def test_circuit_breaker(self, validator):
# Fail 3 times
for _ in range(3):
validator.validate("nonexistent_tool", {})
# 4th attempt should trigger circuit breaker
is_valid, corrected, params, msgs = validator.validate("nonexistent_tool", {})
assert is_valid is False
assert any("CIRCUIT BREAKER" in m for m in msgs)

View File

@@ -215,6 +215,8 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
}
if job.get("script"):
result["script"] = job["script"]
if job.get("profile"):
result["profile"] = job["profile"]
return result
@@ -234,6 +236,7 @@ def cronjob(
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
profile: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -271,6 +274,7 @@ def cronjob(
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
profile=_normalize_optional_job_value(profile),
)
return json.dumps(
{
@@ -366,6 +370,8 @@ def cronjob(
repeat_state = dict(job.get("repeat") or {})
repeat_state["times"] = normalized_repeat
updates["repeat"] = repeat_state
if profile is not None:
updates["profile"] = _normalize_optional_job_value(profile)
if schedule is not None:
parsed_schedule = parse_schedule(schedule)
updates["schedule"] = parsed_schedule

View File

@@ -182,10 +182,7 @@ class ToolCallValidator:
name_valid, corrected_name, name_messages = self.validate_tool_name(tool_name)
if not name_valid:
failure_count = self._record_failure(tool_name)
if failure_count >= self.failure_threshold:
_, _, breaker_messages = self.validate_tool_name(tool_name)
return False, None, params, breaker_messages
self._record_failure(tool_name)
return False, None, params, name_messages
# Use corrected name if provided
@@ -202,8 +199,8 @@ class ToolCallValidator:
all_messages = name_messages + param_warnings
return True, corrected_name, corrected_params, all_messages
def _record_failure(self, tool_name: str) -> int:
"""Record a failure for circuit breaker and return the new count."""
def _record_failure(self, tool_name: str):
"""Record a failure for circuit breaker."""
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
@@ -212,12 +209,10 @@ class ToolCallValidator:
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
f"{count} consecutive failures"
)
return count
def _record_success(self, tool_name: str):
"""Record a success (reset consecutive failure streaks)."""
if self.consecutive_failures:
self.consecutive_failures.clear()
"""Record a success (reset failure counter)."""
self.consecutive_failures.pop(tool_name, None)
def get_diagnostic_message(self, tool_name: str) -> str:
"""Generate diagnostic message for circuit breaker."""