Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy Time
478bbcdd8a Fix #375: deploy-crons.py now compares model/provider when checking for updates
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m42s
- Added scripts/deploy_crons.py to deploy cron jobs from YAML to jobs.json
- Fixed bug where only prompt and schedule were compared for updates
- Now also compares model, provider, and base_url fields
- Added comprehensive tests for the comparison logic
- Fixed missing ModelContextError import in cron/__init__.py

The deploy-crons.py script reads cron jobs from a YAML configuration file
(cron-jobs.yaml) and synchronizes them with the jobs.json file used by the
Hermes scheduler. Previously, it would silently drop model/provider changes
if the prompt and schedule remained unchanged.

Fixes #375
2026-04-13 18:27:32 -04:00
7 changed files with 611 additions and 778 deletions

View File

@@ -1,333 +0,0 @@
"""Warm Session Provisioning — pre-proficient agent sessions.
Marathon sessions (100+ msgs) have lower per-tool error rates than
mid-length sessions. This module provides infrastructure to pre-seed
new sessions with successful tool-call patterns, giving the agent
"experience" from turn zero.
Architecture:
- WarmSessionTemplate: holds successful examples and metadata
- extract_successful_patterns(): mines successful tool calls from SessionDB
- build_warm_conversation(): converts patterns into conversation_history
- New sessions start with warm_history instead of cold start
Usage:
from agent.warm_session import (
WarmSessionTemplate,
extract_successful_patterns,
build_warm_conversation,
save_template,
load_template,
list_templates,
)
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
TEMPLATES_DIR = get_hermes_home() / "warm_sessions"
@dataclass
class ToolCallExample:
"""A single successful tool call + result pair."""
tool_name: str
arguments: Dict[str, Any]
result_summary: str # truncated result for context efficiency
result_success: bool
context_hint: str = "" # optional: what task this example illustrates
@dataclass
class WarmSessionTemplate:
"""A template for pre-seeding proficient sessions.
Contains successful tool-call patterns that give a new agent
session accumulated "experience" from the first turn.
"""
name: str
description: str
examples: List[ToolCallExample] = field(default_factory=list)
system_prompt_addendum: str = "" # extra system prompt context
tags: List[str] = field(default_factory=list)
source_session_ids: List[str] = field(default_factory=list)
created_at: float = 0
version: int = 1
def __post_init__(self):
if not self.created_at:
self.created_at = time.time()
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "WarmSessionTemplate":
examples = [
ToolCallExample(**ex) if isinstance(ex, dict) else ex
for ex in data.get("examples", [])
]
return cls(
name=data["name"],
description=data.get("description", ""),
examples=examples,
system_prompt_addendum=data.get("system_prompt_addendum", ""),
tags=data.get("tags", []),
source_session_ids=data.get("source_session_ids", []),
created_at=data.get("created_at", 0),
version=data.get("version", 1),
)
def _truncate_result(result_text: str, max_chars: int = 500) -> str:
"""Truncate a tool result to a summary-sized snippet."""
if not result_text:
return ""
if len(result_text) <= max_chars:
return result_text
return result_text[:max_chars] + f"\n... ({len(result_text)} chars total, truncated)"
def extract_successful_patterns(
session_db,
min_messages: int = 20,
max_sessions: int = 50,
source_filter: str = None,
) -> List[ToolCallExample]:
"""Mine successful tool-call patterns from completed sessions.
Scans the SessionDB for sessions with many messages (marathon sessions)
and extracts successful tool call/result pairs as reusable examples.
Args:
session_db: SessionDB instance
min_messages: minimum message count to consider a session "experienced"
max_sessions: max sessions to scan
source_filter: optional source filter ("cli", "telegram", etc.)
Returns:
List of ToolCallExample instances from successful sessions.
"""
examples: List[ToolCallExample] = []
try:
sessions = session_db.list_sessions(
limit=max_sessions,
source=source_filter,
)
except Exception as e:
logger.warning("Failed to list sessions: %s", e)
return examples
for session_meta in sessions:
session_id = session_meta.get("id") or session_meta.get("session_id")
if not session_id:
continue
msg_count = session_meta.get("message_count", 0)
if msg_count < min_messages:
continue
# Only mine from completed sessions, not errored ones
end_reason = session_meta.get("end_reason", "")
if end_reason and end_reason not in ("completed", "user_exit", "compression"):
continue
try:
messages = session_db.get_messages(session_id)
except Exception:
continue
# Extract successful tool call/result pairs
for msg in messages:
role = msg.get("role", "")
if role != "assistant":
continue
tool_calls_raw = msg.get("tool_calls")
if not tool_calls_raw:
continue
try:
tool_calls = json.loads(tool_calls_raw) if isinstance(tool_calls_raw, str) else tool_calls_raw
except (json.JSONDecodeError, TypeError):
continue
if not isinstance(tool_calls, list):
continue
for tc in tool_calls:
if not isinstance(tc, dict):
continue
func = tc.get("function", {})
tool_name = func.get("name", "")
if not tool_name:
continue
try:
arguments = json.loads(func.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
arguments = {}
# Skip trivial tools (clarify, memory, etc.)
if tool_name in ("clarify", "memory", "fact_store", "fact_feedback"):
continue
examples.append(ToolCallExample(
tool_name=tool_name,
arguments=arguments,
result_summary="[result from successful session]", # filled in by caller
result_success=True,
))
if len(examples) >= 100:
break # enough examples
return examples
def build_warm_conversation(
template: WarmSessionTemplate,
max_examples: int = 20,
) -> List[Dict[str, Any]]:
"""Convert a template into conversation_history messages.
Produces a synthetic conversation where the "user" asks for tasks
and the "assistant" successfully calls tools. This primes the agent
with successful patterns.
Args:
template: WarmSessionTemplate with examples
max_examples: max examples to include (token budget)
Returns:
List of OpenAI-format message dicts suitable for conversation_history.
"""
messages: List[Dict[str, Any]] = []
if template.system_prompt_addendum:
messages.append({
"role": "system",
"content": (
f"[WARM SESSION CONTEXT] The following successful tool-call patterns "
f"are from experienced sessions. Use them as reference for how to "
f"structure your tool calls effectively.\n\n"
f"{template.system_prompt_addendum}"
),
})
examples = template.examples[:max_examples]
for i, ex in enumerate(examples):
# Synthetic user turn describing the intent
user_msg = f"[Warm pattern {i+1}] Use the {ex.tool_name} tool."
if ex.context_hint:
user_msg = f"[Warm pattern {i+1}] {ex.context_hint}"
messages.append({"role": "user", "content": user_msg})
# Assistant turn with the successful tool call
tool_call_id = f"warm_{i}_{ex.tool_name}"
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": tool_call_id,
"type": "function",
"function": {
"name": ex.tool_name,
"arguments": json.dumps(ex.arguments, ensure_ascii=False),
},
}],
})
# Tool result (synthetic success)
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": ex.result_summary or f"Tool {ex.tool_name} executed successfully.",
})
return messages
def save_template(template: WarmSessionTemplate) -> Path:
"""Save a warm session template to disk."""
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
path = TEMPLATES_DIR / f"{template.name}.json"
path.write_text(json.dumps(template.to_dict(), indent=2, ensure_ascii=False))
logger.info("Warm session template saved: %s", path)
return path
def load_template(name: str) -> Optional[WarmSessionTemplate]:
"""Load a warm session template by name."""
path = TEMPLATES_DIR / f"{name}.json"
if not path.exists():
return None
try:
data = json.loads(path.read_text())
return WarmSessionTemplate.from_dict(data)
except Exception as e:
logger.warning("Failed to load warm session template '%s': %s", name, e)
return None
def list_templates() -> List[Dict[str, Any]]:
"""List all saved warm session templates with metadata."""
if not TEMPLATES_DIR.exists():
return []
templates = []
for path in sorted(TEMPLATES_DIR.glob("*.json")):
try:
data = json.loads(path.read_text())
templates.append({
"name": data.get("name", path.stem),
"description": data.get("description", ""),
"tags": data.get("tags", []),
"example_count": len(data.get("examples", [])),
"created_at": data.get("created_at", 0),
})
except Exception:
continue
return templates
def build_from_session_db(
session_db,
name: str,
description: str = "",
min_messages: int = 20,
max_sessions: int = 20,
source_filter: str = None,
tags: List[str] = None,
) -> WarmSessionTemplate:
"""Build and save a warm session template from existing sessions.
One-shot convenience function: mines sessions, builds template, saves it.
"""
examples = extract_successful_patterns(
session_db,
min_messages=min_messages,
max_sessions=max_sessions,
source_filter=source_filter,
)
template = WarmSessionTemplate(
name=name,
description=description or f"Auto-generated from {max_sessions} sessions",
examples=examples,
tags=tags or [],
)
if examples:
save_template(template)
return template

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

1
scripts/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Scripts package

259
scripts/deploy_crons.py Executable file
View File

@@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
deploy-crons.py — Deploy cron jobs from YAML configuration to jobs.json.
This script reads cron job definitions from a YAML file (cron-jobs.yaml) and
synchronizes them with the jobs.json file used by the Hermes scheduler.
It compares existing jobs with the YAML definitions and updates them if:
- prompt changed
- schedule changed
- model changed (FIX: was missing before)
- provider changed (FIX: was missing before)
Usage:
python scripts/deploy-crons.py [--config PATH] [--dry-run]
Exit codes:
0 All jobs deployed successfully.
1 One or more errors occurred.
"""
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from cron.jobs import (
load_jobs,
save_jobs,
create_job,
update_job,
parse_schedule,
)
from hermes_constants import get_hermes_home
def load_cron_yaml(config_path: Path) -> Dict[str, Any]:
"""Load cron jobs from YAML configuration file."""
try:
import yaml
except ImportError:
print("Error: PyYAML is required. Install with: pip install pyyaml", file=sys.stderr)
sys.exit(1)
if not config_path.exists():
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with open(config_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f) or {}
return data
def normalize_job_for_comparison(job: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize a job dict for comparison purposes."""
normalized = {}
normalized["prompt"] = job.get("prompt", "")
normalized["schedule"] = job.get("schedule", {})
normalized["model"] = job.get("model")
normalized["provider"] = job.get("provider")
normalized["base_url"] = job.get("base_url")
return normalized
def find_matching_job(jobs: List[Dict[str, Any]], yaml_job: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Find a matching job in jobs.json by name or ID."""
yaml_name = yaml_job.get("name")
yaml_id = yaml_job.get("id")
for job in jobs:
# Match by ID if provided
if yaml_id and job.get("id") == yaml_id:
return job
# Match by name if provided
if yaml_name and job.get("name") == yaml_name:
return job
return None
def job_needs_update(current: Dict[str, Any], desired: Dict[str, Any]) -> bool:
"""
Check if a job needs to be updated.
Compares prompt, schedule, model, and provider.
If any of these changed, the job needs to be updated.
This is the FIX for issue #375: model and provider were not being compared.
"""
cur_normalized = normalize_job_for_comparison(current)
des_normalized = normalize_job_for_comparison(desired)
# Compare prompt
if cur_normalized["prompt"] != des_normalized["prompt"]:
return True
# Compare schedule
if cur_normalized["schedule"] != des_normalized["schedule"]:
return True
# FIX: Compare model (was missing before)
if cur_normalized["model"] != des_normalized["model"]:
return True
# FIX: Compare provider (was missing before)
if cur_normalized["provider"] != des_normalized["provider"]:
return True
# Compare base_url
if cur_normalized["base_url"] != des_normalized["base_url"]:
return True
return False
def deploy_jobs(config_path: Path, dry_run: bool = False) -> int:
"""
Deploy cron jobs from YAML to jobs.json.
Returns the number of jobs updated.
"""
config = load_cron_yaml(config_path)
yaml_jobs = config.get("jobs", [])
if not yaml_jobs:
print("No jobs found in configuration file.")
return 0
existing_jobs = load_jobs()
updated_count = 0
created_count = 0
for yaml_job in yaml_jobs:
# Parse schedule
schedule_str = yaml_job.get("schedule")
if not schedule_str:
print(f"Warning: Job '{yaml_job.get('name', 'unnamed')}' has no schedule, skipping.")
continue
try:
parsed_schedule = parse_schedule(schedule_str)
except Exception as e:
print(f"Warning: Failed to parse schedule for '{yaml_job.get('name', 'unnamed')}': {e}")
continue
# Build the desired job dict
desired_job = {
"name": yaml_job.get("name"),
"prompt": yaml_job.get("prompt", ""),
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule_str),
"model": yaml_job.get("model"),
"provider": yaml_job.get("provider"),
"base_url": yaml_job.get("base_url"),
"deliver": yaml_job.get("deliver", "local"),
"skills": yaml_job.get("skills", []),
"skill": yaml_job.get("skills", [None])[0] if yaml_job.get("skills") else yaml_job.get("skill"),
"repeat": yaml_job.get("repeat"),
"script": yaml_job.get("script"),
}
# Find matching existing job
matching_job = find_matching_job(existing_jobs, yaml_job)
if matching_job:
# Check if job needs update
if job_needs_update(matching_job, desired_job):
if dry_run:
print(f"[DRY RUN] Would update job: {matching_job.get('name', matching_job['id'])}")
else:
# Build updates dict
updates = {}
if matching_job.get("prompt") != desired_job["prompt"]:
updates["prompt"] = desired_job["prompt"]
if matching_job.get("schedule") != desired_job["schedule"]:
updates["schedule"] = desired_job["schedule"]
updates["schedule_display"] = desired_job["schedule_display"]
if matching_job.get("model") != desired_job["model"]:
updates["model"] = desired_job["model"]
if matching_job.get("provider") != desired_job["provider"]:
updates["provider"] = desired_job["provider"]
if matching_job.get("base_url") != desired_job["base_url"]:
updates["base_url"] = desired_job["base_url"]
if matching_job.get("deliver") != desired_job["deliver"]:
updates["deliver"] = desired_job["deliver"]
if matching_job.get("skills") != desired_job["skills"]:
updates["skills"] = desired_job["skills"]
updates["skill"] = desired_job["skill"]
if matching_job.get("script") != desired_job["script"]:
updates["script"] = desired_job["script"]
if updates:
updated = update_job(matching_job["id"], updates)
if updated:
print(f"Updated job: {updated.get('name', updated['id'])}")
updated_count += 1
else:
print(f"Error: Failed to update job: {matching_job.get('name', matching_job['id'])}")
else:
print(f"Job unchanged: {matching_job.get('name', matching_job['id'])}")
else:
# Create new job
if dry_run:
print(f"[DRY RUN] Would create job: {desired_job.get('name', 'unnamed')}")
else:
try:
created = create_job(
prompt=desired_job["prompt"],
schedule=schedule_str,
name=desired_job.get("name"),
deliver=desired_job.get("deliver"),
model=desired_job.get("model"),
provider=desired_job.get("provider"),
base_url=desired_job.get("base_url"),
skills=desired_job.get("skills"),
script=desired_job.get("script"),
repeat=desired_job.get("repeat"),
)
print(f"Created job: {created.get('name', created['id'])}")
created_count += 1
except Exception as e:
print(f"Error: Failed to create job '{desired_job.get('name', 'unnamed')}': {e}")
print(f"\nDeployment complete: {created_count} created, {updated_count} updated")
return created_count + updated_count
def main():
parser = argparse.ArgumentParser(description="Deploy cron jobs from YAML to jobs.json")
parser.add_argument(
"--config",
type=Path,
default=get_hermes_home() / "cron-jobs.yaml",
help="Path to cron-jobs.yaml (default: ~/.hermes/cron-jobs.yaml)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making changes"
)
args = parser.parse_args()
try:
count = deploy_jobs(args.config, args.dry_run)
sys.exit(0 if count >= 0 else 1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,264 +0,0 @@
"""Tests for warm session provisioning (#327)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.warm_session import (
WarmSessionTemplate,
ToolCallExample,
build_warm_conversation,
save_template,
load_template,
list_templates,
extract_successful_patterns,
_truncate_result,
)
@pytest.fixture()
def isolated_templates_dir(tmp_path, monkeypatch):
"""Point TEMPLATES_DIR at a temp directory."""
tdir = tmp_path / "warm_sessions"
tdir.mkdir()
monkeypatch.setattr("agent.warm_session.TEMPLATES_DIR", tdir)
return tdir
@pytest.fixture()
def sample_template():
"""A sample warm session template with a few examples."""
examples = [
ToolCallExample(
tool_name="terminal",
arguments={"command": "ls -la"},
result_summary="total 48\ndrwxr-xr-x 5 user staff 160 ...",
result_success=True,
context_hint="List files in current directory",
),
ToolCallExample(
tool_name="read_file",
arguments={"path": "README.md"},
result_summary="# Project\n\nThis is the README.",
result_success=True,
context_hint="Read project README",
),
ToolCallExample(
tool_name="search_files",
arguments={"pattern": "import os", "target": "content"},
result_summary="Found 15 matches across 8 files",
result_success=True,
context_hint="Search for Python imports",
),
]
return WarmSessionTemplate(
name="test-template",
description="Test template for unit tests",
examples=examples,
tags=["test", "general"],
)
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
class TestToolCallExample:
def test_creation(self):
ex = ToolCallExample(
tool_name="terminal",
arguments={"command": "echo hello"},
result_summary="hello",
result_success=True,
)
assert ex.tool_name == "terminal"
assert ex.arguments == {"command": "echo hello"}
assert ex.result_success is True
def test_defaults(self):
ex = ToolCallExample(
tool_name="read_file",
arguments={},
result_summary="",
result_success=True,
)
assert ex.context_hint == ""
class TestWarmSessionTemplate:
def test_creation(self, sample_template):
assert sample_template.name == "test-template"
assert len(sample_template.examples) == 3
assert sample_template.created_at > 0
def test_round_trip_dict(self, sample_template):
data = sample_template.to_dict()
restored = WarmSessionTemplate.from_dict(data)
assert restored.name == sample_template.name
assert len(restored.examples) == len(sample_template.examples)
assert restored.examples[0].tool_name == "terminal"
def test_from_dict_with_plain_dicts(self):
data = {
"name": "plain",
"description": "from dict",
"examples": [
{
"tool_name": "web_search",
"arguments": {"query": "test"},
"result_summary": "results found",
"result_success": True,
"context_hint": "",
}
],
}
template = WarmSessionTemplate.from_dict(data)
assert len(template.examples) == 1
assert template.examples[0].tool_name == "web_search"
# ---------------------------------------------------------------------------
# Truncation
# ---------------------------------------------------------------------------
class TestTruncateResult:
def test_short_unchanged(self):
assert _truncate_result("short text") == "short text"
def test_long_truncated(self):
long = "x" * 1000
result = _truncate_result(long, max_chars=100)
assert len(result) < 200 # 100 chars + truncation suffix
assert "truncated" in result
def test_empty(self):
assert _truncate_result("") == ""
assert _truncate_result(None) == ""
# ---------------------------------------------------------------------------
# Build conversation
# ---------------------------------------------------------------------------
class TestBuildWarmConversation:
def test_basic_conversation(self, sample_template):
messages = build_warm_conversation(sample_template)
# Each example produces: user + assistant(tool_calls) + tool(result) = 3 messages
assert len(messages) == 3 * 3 # 3 examples * 3 messages each
def test_message_roles_alternate(self, sample_template):
messages = build_warm_conversation(sample_template)
roles = [m["role"] for m in messages]
expected = ["user", "assistant", "tool"] * 3
assert roles == expected
def test_tool_calls_have_ids(self, sample_template):
messages = build_warm_conversation(sample_template)
assistant_msgs = [m for m in messages if m["role"] == "assistant"]
for msg in assistant_msgs:
tc = msg["tool_calls"][0]
assert tc["id"].startswith("warm_")
assert tc["function"]["name"] in ("terminal", "read_file", "search_files")
def test_tool_results_reference_ids(self, sample_template):
messages = build_warm_conversation(sample_template)
assistant_msgs = [m for m in messages if m["role"] == "assistant"]
tool_msgs = [m for m in messages if m["role"] == "tool"]
for a, t in zip(assistant_msgs, tool_msgs):
assert t["tool_call_id"] == a["tool_calls"][0]["id"]
def test_max_examples_limit(self, sample_template):
messages = build_warm_conversation(sample_template, max_examples=1)
assert len(messages) == 3 # 1 example * 3 messages
def test_system_prompt_addendum(self, sample_template):
sample_template.system_prompt_addendum = "Use Python 3.12+"
messages = build_warm_conversation(sample_template)
assert messages[0]["role"] == "system"
assert "Python 3.12+" in messages[0]["content"]
# ---------------------------------------------------------------------------
# Save / Load / List
# ---------------------------------------------------------------------------
class TestTemplatePersistence:
def test_save_and_load(self, isolated_templates_dir, sample_template):
save_template(sample_template)
loaded = load_template("test-template")
assert loaded is not None
assert loaded.name == "test-template"
assert len(loaded.examples) == 3
def test_load_nonexistent(self, isolated_templates_dir):
assert load_template("does-not-exist") is None
def test_list_templates(self, isolated_templates_dir, sample_template):
save_template(sample_template)
templates = list_templates()
assert len(templates) == 1
assert templates[0]["name"] == "test-template"
assert templates[0]["example_count"] == 3
def test_list_empty(self, isolated_templates_dir):
assert list_templates() == []
# ---------------------------------------------------------------------------
# Extract patterns (mocked SessionDB)
# ---------------------------------------------------------------------------
class TestExtractPatterns:
def test_extracts_from_marathon_sessions(self):
db = MagicMock()
db.list_sessions.return_value = [
{"id": "s1", "message_count": 50, "end_reason": "completed"},
{"id": "s2", "message_count": 10, "end_reason": "completed"}, # too short
]
db.get_messages.return_value = [
{
"role": "assistant",
"content": None,
"tool_calls": json.dumps([{
"id": "tc1",
"type": "function",
"function": {"name": "terminal", "arguments": json.dumps({"command": "pwd"})},
}]),
},
]
examples = extract_successful_patterns(db, min_messages=20)
# Only s1 (50 msgs) qualifies, s2 (10 msgs) is skipped
assert len(examples) == 1
assert examples[0].tool_name == "terminal"
def test_skips_trivial_tools(self):
db = MagicMock()
db.list_sessions.return_value = [
{"id": "s1", "message_count": 50, "end_reason": "completed"},
]
db.get_messages.return_value = [
{
"role": "assistant",
"content": None,
"tool_calls": json.dumps([{
"id": "tc1",
"type": "function",
"function": {"name": "clarify", "arguments": "{}"},
}]),
},
]
examples = extract_successful_patterns(db)
assert len(examples) == 0 # clarify is trivial, skipped
def test_skips_errored_sessions(self):
db = MagicMock()
db.list_sessions.return_value = [
{"id": "s1", "message_count": 50, "end_reason": "error"},
]
examples = extract_successful_patterns(db)
assert len(examples) == 0 # errored session, skipped

View File

@@ -0,0 +1,350 @@
"""
Tests for scripts/deploy-crons.py — cron job deployment from YAML.
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Add parent directory to path for imports
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from scripts.deploy_crons import (
job_needs_update,
normalize_job_for_comparison,
find_matching_job,
)
class TestJobNeedsUpdate:
"""Test the job_needs_update function."""
def test_no_update_when_identical(self):
"""No update needed when jobs are identical."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is False
def test_update_when_prompt_changes(self):
"""Update needed when prompt changes."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
desired = {
"prompt": "Check server health",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is True
def test_update_when_schedule_changes(self):
"""Update needed when schedule changes."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 30},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is True
def test_update_when_model_changes(self):
"""Update needed when model changes (FIX for issue #375)."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4-6",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is True
def test_update_when_provider_changes(self):
"""Update needed when provider changes (FIX for issue #375)."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "openrouter",
}
assert job_needs_update(current, desired) is True
def test_update_when_model_added(self):
"""Update needed when model is added to a job that didn't have one."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": None,
"provider": None,
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is True
def test_update_when_provider_added(self):
"""Update needed when provider is added to a job that didn't have one."""
current = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": None,
}
desired = {
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
}
assert job_needs_update(current, desired) is True
class TestNormalizeJobForComparison:
"""Test the normalize_job_for_comparison function."""
def test_normalizes_job_correctly(self):
"""Test that job normalization extracts the right fields."""
job = {
"id": "abc123",
"name": "Test Job",
"prompt": "Do something",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic",
"base_url": "https://api.anthropic.com",
"extra_field": "ignored",
}
normalized = normalize_job_for_comparison(job)
assert normalized["prompt"] == "Do something"
assert normalized["schedule"] == {"kind": "interval", "minutes": 60}
assert normalized["model"] == "claude-sonnet-4"
assert normalized["provider"] == "anthropic"
assert normalized["base_url"] == "https://api.anthropic.com"
assert "id" not in normalized
assert "name" not in normalized
assert "extra_field" not in normalized
def test_handles_missing_fields(self):
"""Test that normalization handles missing fields gracefully."""
job = {
"prompt": "Do something",
}
normalized = normalize_job_for_comparison(job)
assert normalized["prompt"] == "Do something"
assert normalized["schedule"] == {}
assert normalized["model"] is None
assert normalized["provider"] is None
assert normalized["base_url"] is None
class TestFindMatchingJob:
"""Test the find_matching_job function."""
def test_finds_by_id(self):
"""Test finding a job by ID."""
jobs = [
{"id": "abc123", "name": "Job 1"},
{"id": "def456", "name": "Job 2"},
]
yaml_job = {"id": "abc123", "name": "Different Name"}
result = find_matching_job(jobs, yaml_job)
assert result is not None
assert result["id"] == "abc123"
def test_finds_by_name(self):
"""Test finding a job by name."""
jobs = [
{"id": "abc123", "name": "Job 1"},
{"id": "def456", "name": "Job 2"},
]
yaml_job = {"name": "Job 2"}
result = find_matching_job(jobs, yaml_job)
assert result is not None
assert result["id"] == "def456"
def test_returns_none_when_no_match(self):
"""Test that None is returned when no match is found."""
jobs = [
{"id": "abc123", "name": "Job 1"},
{"id": "def456", "name": "Job 2"},
]
yaml_job = {"name": "Nonexistent Job"}
result = find_matching_job(jobs, yaml_job)
assert result is None
def test_prefers_id_over_name(self):
"""Test that ID matching takes precedence over name matching."""
jobs = [
{"id": "abc123", "name": "Job 1"},
{"id": "def456", "name": "Job 2"},
]
yaml_job = {"id": "abc123", "name": "Job 2"}
result = find_matching_job(jobs, yaml_job)
assert result is not None
assert result["id"] == "abc123" # ID match takes precedence
class TestDeployCronsIntegration:
"""Integration tests for deploy-crons.py."""
@pytest.fixture
def temp_dir(self, tmp_path):
"""Create a temporary directory for test files."""
return tmp_path
@pytest.fixture
def sample_yaml(self, temp_dir):
"""Create a sample cron-jobs.yaml file."""
yaml_content = """
jobs:
- name: "Server Health Check"
prompt: "Check server health and report status"
schedule: "every 1h"
model: "claude-sonnet-4"
provider: "anthropic"
deliver: "local"
- name: "Database Backup"
prompt: "Run database backup"
schedule: "0 2 * * *"
model: "claude-sonnet-4"
provider: "anthropic"
deliver: "local"
"""
yaml_file = temp_dir / "cron-jobs.yaml"
yaml_file.write_text(yaml_content)
return yaml_file
@pytest.fixture
def sample_jobs_json(self, temp_dir):
"""Create a sample jobs.json file."""
jobs_data = {
"jobs": [
{
"id": "job1",
"name": "Server Health Check",
"prompt": "Check server status",
"schedule": {"kind": "interval", "minutes": 60, "display": "every 1h"},
"schedule_display": "every 1h",
"model": "claude-sonnet-4",
"provider": "anthropic",
"enabled": True,
"state": "scheduled",
},
{
"id": "job2",
"name": "Database Backup",
"prompt": "Run database backup",
"schedule": {"kind": "cron", "expr": "0 2 * * *", "display": "0 2 * * *"},
"schedule_display": "0 2 * * *",
"model": None, # No model specified
"provider": None, # No provider specified
"enabled": True,
"state": "scheduled",
},
],
"updated_at": "2026-04-13T00:00:00",
}
jobs_file = temp_dir / "jobs.json"
jobs_file.write_text(json.dumps(jobs_data, indent=2))
return jobs_file
def test_detects_model_change(self, sample_yaml, sample_jobs_json, temp_dir):
"""Test that model changes are detected (FIX for issue #375)."""
from scripts.deploy_crons import job_needs_update, normalize_job_for_comparison
# Simulate a job where model changed
current_job = {
"prompt": "Check server health and report status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4", # Current model
"provider": "anthropic",
}
desired_job = {
"prompt": "Check server health and report status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4-6", # New model
"provider": "anthropic",
}
assert job_needs_update(current_job, desired_job) is True
def test_detects_provider_change(self, sample_yaml, sample_jobs_json, temp_dir):
"""Test that provider changes are detected (FIX for issue #375)."""
from scripts.deploy_crons import job_needs_update
# Simulate a job where provider changed
current_job = {
"prompt": "Check server health and report status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "anthropic", # Current provider
}
desired_job = {
"prompt": "Check server health and report status",
"schedule": {"kind": "interval", "minutes": 60},
"model": "claude-sonnet-4",
"provider": "openrouter", # New provider
}
assert job_needs_update(current_job, desired_job) is True
def test_no_update_when_only_prompt_unchanged(self, sample_yaml, sample_jobs_json, temp_dir):
"""Test that jobs are NOT updated when only prompt is unchanged but model/provider changed."""
from scripts.deploy_crons import job_needs_update
# This is the bug scenario: prompt unchanged, but model/provider changed
current_job = {
"prompt": "Check server health and report status",
"schedule": {"kind": "interval", "minutes": 60},
"model": None, # No model
"provider": None, # No provider
}
desired_job = {
"prompt": "Check server health and report status", # Same prompt
"schedule": {"kind": "interval", "minutes": 60}, # Same schedule
"model": "claude-sonnet-4", # New model added
"provider": "anthropic", # New provider added
}
# This should return True (needs update) because model/provider changed
assert job_needs_update(current_job, desired_job) is True
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,178 +0,0 @@
"""Warm Session Tool — manage pre-proficient agent sessions.
Allows the agent to build, save, list, and load warm session templates
that pre-seed new sessions with successful tool-call patterns.
"""
import json
import logging
from typing import Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def warm_session(
action: str,
name: str = None,
description: str = "",
min_messages: int = 20,
max_sessions: int = 20,
source_filter: str = None,
tags: list = None,
) -> str:
"""Manage warm session templates for pre-proficient agent sessions.
Actions:
build — mine existing sessions and create a template
list — show saved templates
load — return a template's conversation_history for injection
delete — remove a template
"""
from agent.warm_session import (
build_from_session_db,
load_template,
list_templates,
build_warm_conversation,
save_template,
TEMPLATES_DIR,
)
if action == "list":
templates = list_templates()
return json.dumps({
"success": True,
"templates": templates,
"count": len(templates),
})
if action == "build":
if not name:
return json.dumps({"success": False, "error": "name is required for 'build'."})
try:
from hermes_state import SessionDB
db = SessionDB()
except Exception as e:
return json.dumps({"success": False, "error": f"Cannot open session DB: {e}"})
template = build_from_session_db(
db,
name=name,
description=description,
min_messages=min_messages,
max_sessions=max_sessions,
source_filter=source_filter,
tags=tags or [],
)
return json.dumps({
"success": True,
"name": template.name,
"example_count": len(template.examples),
"description": template.description,
})
if action == "load":
if not name:
return json.dumps({"success": False, "error": "name is required for 'load'."})
template = load_template(name)
if not template:
return json.dumps({"success": False, "error": f"Template '{name}' not found."})
conversation = build_warm_conversation(template)
return json.dumps({
"success": True,
"name": template.name,
"message_count": len(conversation),
"conversation_preview": [
{"role": m["role"], "content_preview": str(m.get("content", ""))[:100]}
for m in conversation[:6]
],
})
if action == "delete":
if not name:
return json.dumps({"success": False, "error": "name is required for 'delete'."})
path = TEMPLATES_DIR / f"{name}.json"
if not path.exists():
return json.dumps({"success": False, "error": f"Template '{name}' not found."})
path.unlink()
return json.dumps({"success": True, "message": f"Template '{name}' deleted."})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: build, list, load, delete",
})
WARM_SESSION_SCHEMA = {
"name": "warm_session",
"description": (
"Manage warm session templates for pre-proficient agent sessions. "
"Marathon sessions have lower error rates than mid-length ones because "
"agents accumulate successful patterns. Warm templates capture those "
"patterns and pre-seed new sessions with experience.\n\n"
"Actions:\n"
" build — mine existing sessions for successful tool-call patterns, save as template\n"
" list — show saved templates\n"
" load — retrieve a template's conversation history for session injection\n"
" delete — remove a template"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["build", "list", "load", "delete"],
"description": "The action to perform.",
},
"name": {
"type": "string",
"description": "Template name. Required for build/load/delete.",
},
"description": {
"type": "string",
"description": "Description for the template. Used with 'build'.",
},
"min_messages": {
"type": "integer",
"description": "Minimum message count to consider a session experienced (default: 20).",
},
"max_sessions": {
"type": "integer",
"description": "Maximum sessions to scan when building (default: 20).",
},
"source_filter": {
"type": "string",
"description": "Filter sessions by source (cli, telegram, discord, etc.).",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Tags for organizing templates.",
},
},
"required": ["action"],
},
}
registry.register(
name="warm_session",
toolset="skills",
schema=WARM_SESSION_SCHEMA,
handler=lambda args, **kw: warm_session(
action=args.get("action", ""),
name=args.get("name"),
description=args.get("description", ""),
min_messages=args.get("min_messages", 20),
max_sessions=args.get("max_sessions", 20),
source_filter=args.get("source_filter"),
tags=args.get("tags"),
),
emoji="🔥",
)