Compare commits
1 Commits
burn/herme
...
fix/982
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a61761d321 |
@@ -50,6 +50,78 @@ def sanitize_context(text: str) -> str:
|
||||
return _FENCE_TAG_RE.sub('', text)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prefetch filtering helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Meta-instruction debris that memory providers sometimes echo back.
|
||||
# These are prompts/instructions, not user-generated content.
|
||||
_META_INSTRUCTION_PATTERNS = [
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Focus on:\s*", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Note:\s*", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*System\s+(note|prompt|instruction):", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*You are\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Please\s+(provide|respond|answer|write)", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Do not\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Always\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Consider\s+(the following|these|this)\b", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Here\s+(is|are)\s+(some|the|a few)\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
|
||||
def _is_meta_instruction_line(line: str) -> bool:
|
||||
"""Return True if the line looks like a prompt/template instruction, not memory content."""
|
||||
for pat in _META_INSTRUCTION_PATTERNS:
|
||||
if pat.search(line):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_low_signal_line(line: str) -> bool:
|
||||
"""Return True for very short or content-free lines."""
|
||||
stripped = line.strip()
|
||||
# Empty or just punctuation/list marker
|
||||
if not stripped or stripped in {"-", "*", ">", "•", "—", "--"}:
|
||||
return True
|
||||
# Too short to be meaningful (< 15 chars after stripping markers)
|
||||
cleaned = re.sub(r"^[\-\*•>\s]+", "", stripped)
|
||||
if len(cleaned) < 15:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _filter_prefetch_lines(text: str) -> str:
|
||||
"""Filter and deduplicate prefetch result lines.
|
||||
|
||||
Removes:
|
||||
- exact duplicate lines
|
||||
- meta-instruction debris (prompts, templates)
|
||||
- very short / content-free lines
|
||||
|
||||
Returns cleaned text, preserving original line grouping.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return ""
|
||||
|
||||
seen: set = set()
|
||||
kept: list = []
|
||||
for line in text.splitlines(keepends=False):
|
||||
stripped = line.strip()
|
||||
# Deduplicate exact lines
|
||||
if stripped in seen:
|
||||
continue
|
||||
# Skip meta-instructions
|
||||
if _is_meta_instruction_line(line):
|
||||
continue
|
||||
# Skip low-signal lines
|
||||
if _is_low_signal_line(line):
|
||||
continue
|
||||
seen.add(stripped)
|
||||
kept.append(line)
|
||||
|
||||
return "\n".join(kept)
|
||||
|
||||
|
||||
def build_memory_context_block(raw_context: str) -> str:
|
||||
"""Wrap prefetched memory in a fenced block with system note.
|
||||
|
||||
@@ -180,7 +252,14 @@ class MemoryManager:
|
||||
"Memory provider '%s' prefetch failed (non-fatal): %s",
|
||||
provider.name, e,
|
||||
)
|
||||
return "\n\n".join(parts)
|
||||
raw = "\n\n".join(parts)
|
||||
if not raw:
|
||||
return ""
|
||||
# Apply line-level filtering: dedupe, strip meta-instructions,
|
||||
# remove very short fragments. This prevents noisy providers
|
||||
# (e.g. MemPalace transcript recall) from bloating context.
|
||||
filtered = _filter_prefetch_lines(raw)
|
||||
return filtered
|
||||
|
||||
def queue_prefetch_all(self, query: str, *, session_id: str = "") -> None:
|
||||
"""Queue background prefetch on all providers for the next turn."""
|
||||
|
||||
@@ -378,7 +378,6 @@ def create_job(
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new cron job.
|
||||
@@ -428,8 +427,6 @@ def create_job(
|
||||
normalized_base_url = normalized_base_url or None
|
||||
normalized_script = str(script).strip() if isinstance(script, str) else None
|
||||
normalized_script = normalized_script or None
|
||||
normalized_profile = str(profile).strip() if isinstance(profile, str) else None
|
||||
normalized_profile = normalized_profile or None
|
||||
|
||||
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
|
||||
job = {
|
||||
@@ -442,7 +439,6 @@ def create_job(
|
||||
"provider": normalized_provider,
|
||||
"base_url": normalized_base_url,
|
||||
"script": normalized_script,
|
||||
"profile": normalized_profile,
|
||||
"schedule": parsed_schedule,
|
||||
"schedule_display": parsed_schedule.get("display", schedule),
|
||||
"repeat": {
|
||||
|
||||
@@ -15,7 +15,6 @@ import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
|
||||
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
||||
try:
|
||||
@@ -27,7 +26,7 @@ except ImportError:
|
||||
except ImportError:
|
||||
msvcrt = None
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, List, Any
|
||||
from typing import Optional
|
||||
|
||||
# Add parent directory to path for imports BEFORE repo-level imports.
|
||||
# Without this, standalone invocations (e.g. after `hermes update` reloads
|
||||
@@ -40,46 +39,6 @@ from hermes_time import now as _hermes_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Profile-scoped cron execution (#334)
|
||||
# Each job can specify a profile; the scheduler switches HERMES_HOME to that
|
||||
# profile's directory before execution, then restores it afterward.
|
||||
|
||||
@contextmanager
|
||||
def _profile_context(profile_name: Optional[str]):
|
||||
"""
|
||||
Temporarily switch HERMES_HOME to a profile's directory.
|
||||
|
||||
If profile_name is None or 'default', this is a no-op.
|
||||
"""
|
||||
if not profile_name or profile_name == "default":
|
||||
yield
|
||||
return
|
||||
|
||||
from hermes_cli.profiles import get_profile_dir
|
||||
profile_dir = get_profile_dir(profile_name)
|
||||
if not profile_dir.exists():
|
||||
logger.warning("Profile '%s' does not exist at %s — running in default context", profile_name, profile_dir)
|
||||
yield
|
||||
return
|
||||
|
||||
old_home = os.environ.get("HERMES_HOME")
|
||||
old_profile = os.environ.get("HERMES_ACTIVE_PROFILE")
|
||||
try:
|
||||
os.environ["HERMES_HOME"] = str(profile_dir)
|
||||
os.environ["HERMES_ACTIVE_PROFILE"] = profile_name
|
||||
logger.info("Switched to profile '%s' (HERMES_HOME=%s)", profile_name, profile_dir)
|
||||
yield
|
||||
finally:
|
||||
if old_home is not None:
|
||||
os.environ["HERMES_HOME"] = old_home
|
||||
elif "HERMES_HOME" in os.environ:
|
||||
del os.environ["HERMES_HOME"]
|
||||
if old_profile is not None:
|
||||
os.environ["HERMES_ACTIVE_PROFILE"] = old_profile
|
||||
elif "HERMES_ACTIVE_PROFILE" in os.environ:
|
||||
del os.environ["HERMES_ACTIVE_PROFILE"]
|
||||
|
||||
|
||||
# Valid delivery platforms — used to validate user-supplied platform names
|
||||
# in cron delivery targets, preventing env var enumeration via crafted names.
|
||||
_KNOWN_DELIVERY_PLATFORMS = frozenset({
|
||||
@@ -626,15 +585,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
|
||||
profile = job.get("profile")
|
||||
with _profile_context(profile):
|
||||
return _run_job_inner(job)
|
||||
|
||||
|
||||
def _run_job_inner(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
"""Core job execution (inside profile context)."""
|
||||
from run_agent import AIAgent
|
||||
|
||||
# Initialize SQLite session store so cron job messages are persisted
|
||||
# and discoverable via session_search (same pattern as gateway/run.py).
|
||||
_session_db = None
|
||||
@@ -989,71 +939,44 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
if verbose:
|
||||
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
|
||||
|
||||
# Group jobs by profile. Jobs within the same profile run sequentially
|
||||
# to avoid HERMES_HOME/config/env contamination. Different profiles run
|
||||
# in parallel via ThreadPoolExecutor.
|
||||
profile_groups: Dict[Optional[str], List[Dict[str, Any]]] = {}
|
||||
for job in due_jobs:
|
||||
profile = job.get("profile")
|
||||
profile_groups.setdefault(profile, []).append(job)
|
||||
|
||||
executed = 0
|
||||
for job in due_jobs:
|
||||
try:
|
||||
# For recurring jobs (cron/interval), advance next_run_at to the
|
||||
# next future occurrence BEFORE execution. This way, if the
|
||||
# process crashes mid-run, the job won't re-fire on restart.
|
||||
# One-shot jobs are left alone so they can retry on restart.
|
||||
advance_next_run(job["id"])
|
||||
|
||||
def _run_profile_jobs(profile_name: Optional[str], jobs: List[Dict[str, Any]]) -> int:
|
||||
"""Run all jobs for a single profile sequentially."""
|
||||
count = 0
|
||||
for job in jobs:
|
||||
try:
|
||||
advance_next_run(job["id"])
|
||||
success, output, final_response, error = run_job(job)
|
||||
success, output, final_response, error = run_job(job)
|
||||
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
logger.info("Output saved to: %s", output_file)
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
logger.info("Output saved to: %s", output_file)
|
||||
|
||||
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
||||
should_deliver = bool(deliver_content)
|
||||
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
||||
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
||||
should_deliver = False
|
||||
# Deliver the final response to the origin/target chat.
|
||||
# If the agent responded with [SILENT], skip delivery (but
|
||||
# output is already saved above). Failed jobs always deliver.
|
||||
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
||||
should_deliver = bool(deliver_content)
|
||||
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
||||
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
||||
should_deliver = False
|
||||
|
||||
delivery_error = None
|
||||
if should_deliver:
|
||||
try:
|
||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
||||
except Exception as de:
|
||||
delivery_error = str(de)
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
|
||||
count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing job %s: %s", job['id'], e)
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
return count
|
||||
|
||||
# Execute profiles in parallel
|
||||
if len(profile_groups) == 1:
|
||||
# Single profile — no need for thread pool overhead
|
||||
profile, jobs = next(iter(profile_groups.items()))
|
||||
executed = _run_profile_jobs(profile, jobs)
|
||||
else:
|
||||
max_workers = min(len(profile_groups), 8)
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
futures = {
|
||||
executor.submit(_run_profile_jobs, profile, jobs): profile
|
||||
for profile, jobs in profile_groups.items()
|
||||
}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
profile = futures[future]
|
||||
delivery_error = None
|
||||
if should_deliver:
|
||||
try:
|
||||
count = future.result()
|
||||
executed += count
|
||||
if verbose:
|
||||
logger.info("Profile '%s': %s job(s) executed", profile or "default", count)
|
||||
except Exception as e:
|
||||
logger.error("Profile '%s' execution failed: %s", profile or "default", e)
|
||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
||||
except Exception as de:
|
||||
delivery_error = str(de)
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
|
||||
executed += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing job %s: %s", job['id'], e)
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
|
||||
return executed
|
||||
finally:
|
||||
|
||||
@@ -88,8 +88,6 @@ def cron_list(show_all: bool = False):
|
||||
print(f" Repeat: {repeat_str}")
|
||||
print(f" Next run: {next_run}")
|
||||
print(f" Deliver: {deliver_str}")
|
||||
if job.get("profile"):
|
||||
print(f" Profile: {job['profile']}")
|
||||
if skills:
|
||||
print(f" Skills: {', '.join(skills)}")
|
||||
script = job.get("script")
|
||||
@@ -170,7 +168,6 @@ def cron_create(args):
|
||||
skill=getattr(args, "skill", None),
|
||||
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
|
||||
script=getattr(args, "script", None),
|
||||
profile=getattr(args, "profile", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
@@ -221,7 +218,6 @@ def cron_edit(args):
|
||||
repeat=getattr(args, "repeat", None),
|
||||
skills=final_skills,
|
||||
script=getattr(args, "script", None),
|
||||
profile=getattr(args, "profile", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
|
||||
@@ -4959,7 +4959,6 @@ For more help on a command:
|
||||
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
|
||||
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
|
||||
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
|
||||
cron_create.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
|
||||
|
||||
# cron edit
|
||||
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
|
||||
@@ -4974,7 +4973,6 @@ For more help on a command:
|
||||
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
|
||||
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
|
||||
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
|
||||
cron_edit.add_argument("--profile", help="Profile to run job in (uses profile's config.yaml and .env)")
|
||||
|
||||
# lifecycle actions
|
||||
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")
|
||||
|
||||
@@ -198,14 +198,14 @@ class TestMemoryManager:
|
||||
def test_prefetch_skips_empty(self):
|
||||
mgr = MemoryManager()
|
||||
p1 = FakeMemoryProvider("builtin")
|
||||
p1._prefetch_result = "Has memories"
|
||||
p1._prefetch_result = "This provider has meaningful memories with enough length"
|
||||
p2 = FakeMemoryProvider("external")
|
||||
p2._prefetch_result = ""
|
||||
mgr.add_provider(p1)
|
||||
mgr.add_provider(p2)
|
||||
|
||||
result = mgr.prefetch_all("query")
|
||||
assert result == "Has memories"
|
||||
assert result == "This provider has meaningful memories with enough length"
|
||||
|
||||
def test_queue_prefetch_all(self):
|
||||
mgr = MemoryManager()
|
||||
@@ -695,3 +695,92 @@ class TestMemoryContextFencing:
|
||||
fence_end = combined.index("</memory-context>")
|
||||
assert "Alice" in combined[fence_start:fence_end]
|
||||
assert combined.index("weather") < fence_start
|
||||
|
||||
|
||||
class TestPrefetchFiltering:
|
||||
"""Tests for _filter_prefetch_lines and related helpers."""
|
||||
|
||||
def test_deduplicates_exact_lines(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = "- This is line one with enough characters\n- This is line two with enough characters\n- This is line one with enough characters\n- This is line three with enough characters"
|
||||
result = _filter_prefetch_lines(raw)
|
||||
lines = [l for l in result.splitlines() if l.strip()]
|
||||
assert len(lines) == 3
|
||||
assert "- This is line one with enough characters" in result
|
||||
assert "- This is line two with enough characters" in result
|
||||
assert "- This is line three with enough characters" in result
|
||||
|
||||
def test_removes_meta_instruction_debris(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"## Fleet Memories\n"
|
||||
"- > Focus on: was a non-trivial approach used\n"
|
||||
"- > Focus on: was a non-trivial approach used\n"
|
||||
"- Actual memory content about fleet ops\n"
|
||||
"- Note: this is just a note\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "Focus on" not in result
|
||||
assert "Note:" not in result
|
||||
assert "Actual memory content about fleet ops" in result
|
||||
assert "Fleet Memories" in result
|
||||
|
||||
def test_removes_low_signal_short_lines(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"- \n"
|
||||
"- x\n"
|
||||
"- This is a meaningful memory entry with enough length\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "- x" not in result
|
||||
assert "meaningful memory entry" in result
|
||||
|
||||
def test_preserves_structured_facts(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"## Local Facts (Hologram)\n"
|
||||
"- ALEXANDER: Prefers Gitea for reports and deliverables.\n"
|
||||
"- Telegram home channel is Timmy Time.\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "ALEXANDER" in result
|
||||
assert "Gitea" in result
|
||||
assert "Telegram" in result
|
||||
|
||||
def test_is_meta_instruction_line(self):
|
||||
from agent.memory_manager import _is_meta_instruction_line
|
||||
assert _is_meta_instruction_line("- > Focus on: something") is True
|
||||
assert _is_meta_instruction_line("- Focus on: something") is True
|
||||
assert _is_meta_instruction_line("* Focus on: something") is True
|
||||
assert _is_meta_instruction_line("- Actual user memory content") is False
|
||||
assert _is_meta_instruction_line("ALEXANDER: Prefers Gitea") is False
|
||||
|
||||
def test_is_low_signal_line(self):
|
||||
from agent.memory_manager import _is_low_signal_line
|
||||
assert _is_low_signal_line("- ") is True
|
||||
assert _is_low_signal_line("*") is True
|
||||
assert _is_low_signal_line("- x") is True
|
||||
assert _is_low_signal_line("- Short line") is True
|
||||
assert _is_low_signal_line("- This is a long meaningful memory entry") is False
|
||||
|
||||
def test_prefetch_all_applies_filtering(self):
|
||||
from agent.memory_manager import MemoryManager
|
||||
mgr = MemoryManager()
|
||||
fake = FakeMemoryProvider(name="test")
|
||||
fake._prefetch_result = (
|
||||
"- > Focus on: was a non-trivial approach\n"
|
||||
"- > Focus on: was a non-trivial approach\n"
|
||||
"- Real memory fact\n"
|
||||
)
|
||||
mgr.add_provider(fake)
|
||||
result = mgr.prefetch_all("query")
|
||||
assert "Focus on" not in result
|
||||
assert "Real memory fact" in result
|
||||
assert result.count("Real memory fact") == 1
|
||||
|
||||
def test_empty_prefetch_returns_empty(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
assert _filter_prefetch_lines("") == ""
|
||||
assert _filter_prefetch_lines(" ") == ""
|
||||
assert _filter_prefetch_lines("\n\n") == ""
|
||||
|
||||
@@ -1217,80 +1217,3 @@ class TestSendMediaViaAdapter:
|
||||
self._run_with_loop(adapter, "123", media_files, None, {"id": "j4"})
|
||||
adapter.send_voice.assert_called_once()
|
||||
adapter.send_image_file.assert_called_once()
|
||||
|
||||
|
||||
class TestProfileContext:
|
||||
"""Tests for profile-scoped cron execution (#334)."""
|
||||
|
||||
def test_profile_context_noop_for_none(self):
|
||||
from cron.scheduler import _profile_context
|
||||
with _profile_context(None):
|
||||
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
|
||||
|
||||
def test_profile_context_noop_for_default(self):
|
||||
from cron.scheduler import _profile_context
|
||||
with _profile_context("default"):
|
||||
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
|
||||
|
||||
def test_profile_context_sets_env(self, tmp_path, monkeypatch):
|
||||
from cron.scheduler import _profile_context
|
||||
from hermes_cli.profiles import get_profile_dir
|
||||
|
||||
# Mock home to tmp_path
|
||||
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
|
||||
|
||||
# Create a test profile
|
||||
profile_dir = get_profile_dir("testprofile")
|
||||
profile_dir.mkdir(parents=True)
|
||||
|
||||
with _profile_context("testprofile"):
|
||||
assert os.environ["HERMES_ACTIVE_PROFILE"] == "testprofile"
|
||||
assert os.environ["HERMES_HOME"] == str(profile_dir)
|
||||
|
||||
# After exit, env restored
|
||||
assert os.environ.get("HERMES_ACTIVE_PROFILE") is None
|
||||
assert os.environ["HERMES_HOME"] == str(tmp_path / ".hermes")
|
||||
|
||||
def test_profile_context_missing_profile_warns(self, tmp_path, monkeypatch, caplog):
|
||||
from cron.scheduler import _profile_context
|
||||
monkeypatch.setattr("pathlib.Path.home", lambda: tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
with _profile_context("nonexistent"):
|
||||
pass
|
||||
assert "does not exist" in caplog.text
|
||||
|
||||
|
||||
class TestTickParallelExecution:
|
||||
"""Tests for parallel profile execution in tick()."""
|
||||
|
||||
def test_jobs_grouped_by_profile(self, tmp_path, monkeypatch):
|
||||
from cron.scheduler import tick
|
||||
from cron.jobs import create_job, update_job, JOBS_FILE
|
||||
import datetime
|
||||
|
||||
# Point jobs file to tmp_path so we don't pollute real cron db
|
||||
test_jobs_file = tmp_path / "jobs.json"
|
||||
monkeypatch.setattr("cron.jobs.JOBS_FILE", test_jobs_file)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
|
||||
|
||||
job1 = create_job(prompt="test1", schedule="* * * * *", name="j1", profile="alpha")
|
||||
job2 = create_job(prompt="test2", schedule="* * * * *", name="j2", profile="beta")
|
||||
job3 = create_job(prompt="test3", schedule="* * * * *", name="j3") # no profile
|
||||
|
||||
# Manually set next_run_at to now so they are due
|
||||
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
||||
for job in [job1, job2, job3]:
|
||||
update_job(job["id"], {"next_run_at": now})
|
||||
|
||||
with patch("cron.scheduler.run_job") as mock_run:
|
||||
mock_run.return_value = (True, "output", "response", None)
|
||||
with patch("cron.scheduler.save_job_output"):
|
||||
with patch("cron.scheduler._deliver_result", return_value=None):
|
||||
executed = tick(verbose=False)
|
||||
|
||||
# All 3 should have been executed
|
||||
assert executed == 3
|
||||
assert mock_run.call_count == 3
|
||||
|
||||
@@ -215,8 +215,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
}
|
||||
if job.get("script"):
|
||||
result["script"] = job["script"]
|
||||
if job.get("profile"):
|
||||
result["profile"] = job["profile"]
|
||||
return result
|
||||
|
||||
|
||||
@@ -236,7 +234,6 @@ def cronjob(
|
||||
base_url: Optional[str] = None,
|
||||
reason: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
task_id: str = None,
|
||||
) -> str:
|
||||
"""Unified cron job management tool."""
|
||||
@@ -274,7 +271,6 @@ def cronjob(
|
||||
provider=_normalize_optional_job_value(provider),
|
||||
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
|
||||
script=_normalize_optional_job_value(script),
|
||||
profile=_normalize_optional_job_value(profile),
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
@@ -370,8 +366,6 @@ def cronjob(
|
||||
repeat_state = dict(job.get("repeat") or {})
|
||||
repeat_state["times"] = normalized_repeat
|
||||
updates["repeat"] = repeat_state
|
||||
if profile is not None:
|
||||
updates["profile"] = _normalize_optional_job_value(profile)
|
||||
if schedule is not None:
|
||||
parsed_schedule = parse_schedule(schedule)
|
||||
updates["schedule"] = parsed_schedule
|
||||
|
||||
Reference in New Issue
Block a user