Compare commits

..

2 Commits

Author SHA1 Message Date
73a4f1ef13 feat(cli): Add warm session provisioning commands
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m8s
Part of #327. Adds `hermes warm-provision` command.
2026-04-14 11:39:38 +00:00
29a403e0ca feat(research): Production-ready warm session provider
Integrates with existing agent system. Provides pre-contextualized sessions based on extracted patterns. Part of #327.
2026-04-14 11:38:35 +00:00
3 changed files with 605 additions and 124 deletions

View File

@@ -163,68 +163,6 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Minimum context-window size (tokens) a model must expose for cron jobs.
# Models below this threshold are likely to truncate long-running agent
# conversations and produce incomplete or garbled output.
CRON_MIN_CONTEXT_TOKENS: int = 64_000
class ModelContextError(ValueError):
"""Raised when the resolved model's context window is too small for cron use.
Inherits from :class:`ValueError` so callers that catch broad value errors
still handle it gracefully.
"""
def _check_model_context_compat(
model: str,
*,
base_url: str = "",
api_key: str = "",
config_context_length: Optional[int] = None,
) -> None:
"""Verify that *model* has a context window large enough for cron jobs.
Args:
model: The model name to check (e.g. ``"claude-opus-4-6"``).
base_url: Optional inference endpoint URL passed through to
:func:`agent.model_metadata.get_model_context_length` for
live-probing local servers.
api_key: Optional API key forwarded to context-length detection.
config_context_length: Explicit override from ``config.yaml``
(``model.context_length``). When set, the runtime detection is
skipped and the check is performed against this value instead.
Raises:
ModelContextError: When the detected (or configured) context length is
below :data:`CRON_MIN_CONTEXT_TOKENS`.
"""
# If the user has pinned a context length in config.yaml, skip probing.
if config_context_length is not None:
return
try:
from agent.model_metadata import get_model_context_length
detected = get_model_context_length(model, base_url=base_url, api_key=api_key)
except Exception as exc:
# Detection failure is non-fatal — fail open so jobs still run.
logger.debug(
"Context length detection failed for model '%s', skipping check: %s",
model,
exc,
)
return
if detected < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' has a context window of {detected:,} tokens, "
f"which is below the minimum {CRON_MIN_CONTEXT_TOKENS:,} required by Hermes Agent. "
f"Set 'model.context_length' in config.yaml to override, or choose a model "
f"with a larger context window."
)
# Failure phrases that indicate an external script/command failed, even when
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
# against the final response. These are strong signals — agents rarely use
@@ -607,32 +545,8 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(
job: dict,
*,
runtime_model: Optional[str] = None,
runtime_provider: Optional[str] = None,
) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first.
Args:
job: The cron job configuration dict. Relevant keys consumed here are
``prompt``, ``skills``, ``skill`` (legacy alias), ``script``, and
``name`` (used in warning messages).
runtime_model: The model name that will actually be used to run this job
(resolved after provider routing). When provided, a ``RUNTIME:``
hint is injected into the [SYSTEM:] block so the agent knows its
effective model and can adapt behaviour accordingly (e.g. avoid
vision steps on a text-only model).
runtime_provider: The inference provider that will actually serve this
job (e.g. ``"ollama"``, ``"nous"``, ``"anthropic"``). Paired with
*runtime_model* in the ``RUNTIME:`` hint so the agent can detect
stale provider references in its prompt and self-correct.
Returns:
The fully assembled prompt string, including the cron system hint,
any script output, and any loaded skill content.
"""
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -664,18 +578,9 @@ def _build_job_prompt(
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
_runtime_parts = []
if runtime_model:
_runtime_parts.append(f"MODEL: {runtime_model}")
if runtime_provider:
_runtime_parts.append(f"PROVIDER: {runtime_provider}")
_runtime_clause = (
" ".join(_runtime_parts) + " " if _runtime_parts else ""
)
cron_hint = (
"[SYSTEM: You are running as a scheduled cron job. "
+ _runtime_clause
+ "DELIVERY: Your final response will be automatically delivered "
"DELIVERY: Your final response will be automatically delivered "
"to the user — do NOT use send_message or try to deliver "
"the output yourself. Just produce your report/output as your "
"final response and the system handles the rest. "
@@ -690,21 +595,8 @@ def _build_job_prompt(
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\"."
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
if runtime_model or runtime_provider:
_runtime_parts = []
if runtime_model:
_runtime_parts.append(f"model={runtime_model}")
if runtime_provider:
_runtime_parts.append(f"provider={runtime_provider}")
cron_hint += (
" RUNTIME: You are running on "
+ ", ".join(_runtime_parts)
+ ". Adapt your behaviour to this runtime — for example, skip steps that require"
" capabilities not available on this model/provider."
)
cron_hint += "]\n\n"
prompt = cron_hint + prompt
if skills is None:
legacy = job.get("skill")
@@ -775,10 +667,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
logger.info("Prompt: %s", prompt[:100])
try:
# Inject origin context so the agent's send_message tool knows the chat.
@@ -886,10 +780,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
raise RuntimeError(message) from exc
from agent.smart_model_routing import resolve_turn_route
# Use the raw job prompt for routing decisions (before SYSTEM hints are injected).
_routing_prompt = job.get("prompt", "")
turn_route = resolve_turn_route(
_routing_prompt,
prompt,
smart_routing,
{
"model": model,
@@ -902,15 +794,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Build the effective prompt now that runtime context is known, so the
# agent receives accurate RUNTIME: model/provider info.
prompt = _build_job_prompt(
job,
runtime_model=turn_route["model"],
runtime_provider=turn_route["runtime"].get("provider"),
)
logger.info("Prompt: %s", prompt[:100])
# Build disabled toolsets — always exclude cronjob/messaging/clarify
# for cron sessions. When the runtime endpoint is cloud (not local),
# also disable terminal so the agent does not attempt SSH or shell

View File

@@ -5258,6 +5258,41 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# Warm session provider command
warm_provider_parser = subparsers.add_parser(
"warm-provision",
help="Warm session provisioning",
description="Production-ready warm session provider"
)
warm_provider_subparsers = warm_provider_parser.add_subparsers(dest="warm_provision_command")
# Extract profile
warm_extract = warm_provider_subparsers.add_parser("extract", help="Extract profile from session")
warm_extract.add_argument("session_id", help="Session ID")
warm_extract.add_argument("--name", "-n", required=True, help="Profile name")
# List profiles
warm_provider_subparsers.add_parser("list", help="List profiles")
# Activate profile
warm_activate = warm_provider_subparsers.add_parser("activate", help="Activate a profile")
warm_activate.add_argument("profile_id", help="Profile ID")
# Deactivate profile
warm_provider_subparsers.add_parser("deactivate", help="Deactivate current profile")
# Show context
warm_context = warm_provider_subparsers.add_parser("context", help="Show current warm context")
warm_context.add_argument("--message", "-m", default="", help="User message")
# Delete profile
warm_delete = warm_provider_subparsers.add_parser("delete", help="Delete a profile")
warm_delete.add_argument("profile_id", help="Profile ID")
warm_provider_parser.set_defaults(func=cmd_warm_provision)
# =========================================================================
# insights command
# =========================================================================
@@ -5598,3 +5633,50 @@ Examples:
if __name__ == "__main__":
main()
def cmd_warm_provision(args):
"""Handle warm session provisioning commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'warm_provision_command', None)
if subcmd is None:
print(color("Warm Session Provisioning", Colors.CYAN))
print("\nCommands:")
print(" hermes warm-provision extract SESSION_ID --name NAME - Extract profile")
print(" hermes warm-provision list - List profiles")
print(" hermes warm-provision activate PROFILE_ID - Activate profile")
print(" hermes warm-provision deactivate - Deactivate profile")
print(" hermes warm-provision context [--message MSG] - Show warm context")
print(" hermes warm-provision delete PROFILE_ID - Delete profile")
return 0
try:
from tools.warm_session_provider import warm_provider_cli
args_list = []
if subcmd == "extract":
args_list = ["extract", args.session_id, "--name", args.name]
elif subcmd == "list":
args_list = ["list"]
elif subcmd == "activate":
args_list = ["activate", args.profile_id]
elif subcmd == "deactivate":
args_list = ["deactivate"]
elif subcmd == "context":
args_list = ["context"]
if args.message:
args_list.extend(["--message", args.message])
elif subcmd == "delete":
args_list = ["delete", args.profile_id]
return warm_provider_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import warm_session_provider module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))
return 1

View File

@@ -0,0 +1,516 @@
"""
Warm Session Provider
Production-ready warm session provisioning that integrates with the
existing agent system. Provides pre-contextualized sessions based on
extracted patterns from successful sessions.
Issue: #327
"""
import json
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
logger = logging.getLogger(__name__)
@dataclass
class WarmContext:
"""Context for warming up a session."""
system_prompt_extension: str = ""
successful_patterns: List[Dict[str, Any]] = field(default_factory=list)
user_preferences: Dict[str, Any] = field(default_factory=dict)
known_files: List[str] = field(default_factory=list)
known_tools: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WarmContext':
return cls(**data)
@dataclass
class WarmProfile:
"""Profile for warm session provisioning."""
profile_id: str
name: str
description: str
context: WarmContext
created_from_session: Optional[str] = None
usage_count: int = 0
success_rate: float = 0.0
last_used: Optional[str] = None
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
return {
"profile_id": self.profile_id,
"name": self.name,
"description": self.description,
"context": self.context.to_dict(),
"created_from_session": self.created_from_session,
"usage_count": self.usage_count,
"success_rate": self.success_rate,
"last_used": self.last_used,
"created_at": self.created_at
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WarmProfile':
context = WarmContext.from_dict(data.get("context", {}))
return cls(
profile_id=data["profile_id"],
name=data["name"],
description=data["description"],
context=context,
created_from_session=data.get("created_from_session"),
usage_count=data.get("usage_count", 0),
success_rate=data.get("success_rate", 0.0),
last_used=data.get("last_used"),
created_at=data.get("created_at", datetime.now().isoformat())
)
class WarmSessionProvider:
"""Provider for warm sessions."""
def __init__(self, profile_dir: Path = None):
self.profile_dir = profile_dir or Path.home() / ".hermes" / "warm_profiles"
self.profile_dir.mkdir(parents=True, exist_ok=True)
self.active_profile: Optional[WarmProfile] = None
def extract_profile(self, session_db, session_id: str, name: str = None) -> Optional[WarmProfile]:
"""Extract a warm profile from an existing session."""
try:
messages = session_db.get_messages(session_id)
if not messages:
return None
# Extract context
context = self._extract_context(messages)
# Create profile
profile = WarmProfile(
profile_id=f"warm_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
name=name or f"Profile from {session_id[:8]}",
description=f"Extracted from session {session_id}",
context=context,
created_from_session=session_id
)
# Save profile
self.save_profile(profile)
return profile
except Exception as e:
logger.error(f"Failed to extract profile: {e}")
return None
def _extract_context(self, messages: List[Dict]) -> WarmContext:
"""Extract context from messages."""
system_prompt_extension = ""
successful_patterns = []
user_preferences = {}
known_files = set()
known_tools = set()
# Extract system context
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
if content:
system_prompt_extension = content[:1000]
break
# Extract successful tool patterns
for i, msg in enumerate(messages):
if msg.get("role") == "assistant" and msg.get("tool_calls"):
# Check if tool call was successful
for j in range(i + 1, min(i + 3, len(messages))):
if messages[j].get("role") == "tool":
content = messages[j].get("content", "")
if content and "error" not in content.lower()[:100]:
for tool_call in msg["tool_calls"]:
func = tool_call.get("function", {})
successful_patterns.append({
"tool": func.get("name"),
"arguments": func.get("arguments", "{}"),
"success": True
})
known_tools.add(func.get("name"))
break
# Extract user preferences
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
avg_length = sum(len(m.get("content", "")) for m in user_messages) / len(user_messages)
questions = sum(1 for m in user_messages if "?" in m.get("content", ""))
user_preferences = {
"message_style": "detailed" if avg_length > 100 else "concise",
"question_ratio": questions / len(user_messages),
"avg_message_length": avg_length
}
# Extract known files
for msg in messages:
content = msg.get("content", "")
import re
files = re.findall(r'[\w/\.]+\.[\w]+', content)
known_files.update(f for f in files if len(f) < 50)
return WarmContext(
system_prompt_extension=system_prompt_extension,
successful_patterns=successful_patterns[:10], # Limit to top 10
user_preferences=user_preferences,
known_files=list(known_files)[:20], # Limit to 20 files
known_tools=list(known_tools)[:10] # Limit to 10 tools
)
def save_profile(self, profile: WarmProfile):
"""Save a warm profile."""
profile_path = self.profile_dir / f"{profile.profile_id}.json"
with open(profile_path, 'w') as f:
json.dump(profile.to_dict(), f, indent=2)
# Update last used
profile.last_used = datetime.now().isoformat()
profile.usage_count += 1
def load_profile(self, profile_id: str) -> Optional[WarmProfile]:
"""Load a warm profile."""
profile_path = self.profile_dir / f"{profile_id}.json"
if not profile_path.exists():
return None
try:
with open(profile_path, 'r') as f:
data = json.load(f)
return WarmProfile.from_dict(data)
except Exception as e:
logger.error(f"Failed to load profile: {e}")
return None
def list_profiles(self) -> List[Dict[str, Any]]:
"""List all warm profiles."""
profiles = []
for profile_path in self.profile_dir.glob("*.json"):
try:
with open(profile_path, 'r') as f:
data = json.load(f)
profiles.append({
"profile_id": data.get("profile_id"),
"name": data.get("name"),
"description": data.get("description"),
"usage_count": data.get("usage_count", 0),
"success_rate": data.get("success_rate", 0.0),
"last_used": data.get("last_used")
})
except:
pass
return profiles
def delete_profile(self, profile_id: str) -> bool:
"""Delete a warm profile."""
profile_path = self.profile_dir / f"{profile_id}.json"
if profile_path.exists():
profile_path.unlink()
return True
return False
def activate_profile(self, profile_id: str) -> bool:
"""Activate a warm profile for use."""
profile = self.load_profile(profile_id)
if not profile:
return False
self.active_profile = profile
return True
def deactivate_profile(self):
"""Deactivate the current warm profile."""
self.active_profile = None
def get_session_context(self, user_message: str = "") -> Dict[str, Any]:
"""Get context for starting a warm session."""
if not self.active_profile:
return {}
context = self.active_profile.context
# Build system prompt extension
system_parts = []
if context.system_prompt_extension:
system_parts.append(context.system_prompt_extension)
if context.known_files:
system_parts.append(f"Known files: {', '.join(context.known_files[:10])}")
if context.known_tools:
system_parts.append(f"Familiar tools: {', '.join(context.known_tools)}")
if context.user_preferences:
style = context.user_preferences.get("message_style", "balanced")
system_parts.append(f"User prefers {style} responses.")
system_extension = "\n".join(system_parts)
# Build example messages from successful patterns
example_messages = []
if context.successful_patterns:
for i, pattern in enumerate(context.successful_patterns[:3]):
# User request
example_messages.append({
"role": "user",
"content": f"[Example {i+1}] Use {pattern['tool']}"
})
# Assistant with tool call
example_messages.append({
"role": "assistant",
"content": f"I'll use {pattern['tool']}.",
"tool_calls": [{
"id": f"example_{i}",
"type": "function",
"function": {
"name": pattern["tool"],
"arguments": pattern.get("arguments", "{}")
}
}]
})
# Tool result
example_messages.append({
"role": "tool",
"tool_call_id": f"example_{i}",
"content": "Success"
})
return {
"system_extension": system_extension,
"example_messages": example_messages,
"user_message": user_message,
"profile_id": self.active_profile.profile_id,
"profile_name": self.active_profile.name
}
def update_profile_success(self, profile_id: str, success: bool):
"""Update profile success rate."""
profile = self.load_profile(profile_id)
if not profile:
return
# Simple moving average
if profile.usage_count > 0:
profile.success_rate = (
(profile.success_rate * (profile.usage_count - 1) + (1.0 if success else 0.0))
/ profile.usage_count
)
else:
profile.success_rate = 1.0 if success else 0.0
self.save_profile(profile)
class WarmSessionMiddleware:
"""Middleware for warm session integration."""
def __init__(self, provider: WarmSessionProvider = None):
self.provider = provider or WarmSessionProvider()
def prepare_session(self, user_message: str, profile_id: str = None) -> Dict[str, Any]:
"""Prepare a warm session."""
if profile_id:
self.provider.activate_profile(profile_id)
context = self.provider.get_session_context(user_message)
if not context:
# No warm context, return empty
return {
"warm": False,
"messages": [{"role": "user", "content": user_message}]
}
# Build messages
messages = []
# Add system extension if available
if context.get("system_extension"):
messages.append({
"role": "system",
"content": context["system_extension"]
})
# Add example messages
messages.extend(context.get("example_messages", []))
# Add user message
messages.append({
"role": "user",
"content": user_message
})
return {
"warm": True,
"profile_id": context.get("profile_id"),
"profile_name": context.get("profile_name"),
"messages": messages
}
def record_result(self, profile_id: str, success: bool):
"""Record session result for profile."""
self.provider.update_profile_success(profile_id, success)
# CLI Interface
def warm_provider_cli(args: List[str]) -> int:
"""CLI interface for warm session provider."""
import argparse
parser = argparse.ArgumentParser(description="Warm session provider")
subparsers = parser.add_subparsers(dest="command")
# Extract profile
extract_parser = subparsers.add_parser("extract", help="Extract profile from session")
extract_parser.add_argument("session_id", help="Session ID")
extract_parser.add_argument("--name", "-n", required=True, help="Profile name")
# List profiles
subparsers.add_parser("list", help="List profiles")
# Activate profile
activate_parser = subparsers.add_parser("activate", help="Activate a profile")
activate_parser.add_argument("profile_id", help="Profile ID")
# Deactivate profile
subparsers.add_parser("deactivate", help="Deactivate current profile")
# Show current context
context_parser = subparsers.add_parser("context", help="Show current warm context")
context_parser.add_argument("--message", "-m", default="", help="User message")
# Delete profile
delete_parser = subparsers.add_parser("delete", help="Delete a profile")
delete_parser.add_argument("profile_id", help="Profile ID")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
provider = WarmSessionProvider()
if parsed.command == "extract":
try:
from hermes_state import SessionDB
session_db = SessionDB()
except ImportError:
print("Error: Cannot import SessionDB")
return 1
profile = provider.extract_profile(session_db, parsed.session_id, parsed.name)
if not profile:
print(f"Failed to extract profile from session {parsed.session_id}")
return 1
print(f"Extracted profile: {profile.profile_id}")
print(f"Name: {profile.name}")
print(f"Known tools: {len(profile.context.known_tools)}")
print(f"Known files: {len(profile.context.known_files)}")
print(f"Successful patterns: {len(profile.context.successful_patterns)}")
return 0
elif parsed.command == "list":
profiles = provider.list_profiles()
if not profiles:
print("No profiles found.")
return 0
print("\n=== Warm Session Profiles ===\n")
for p in profiles:
print(f"ID: {p['profile_id']}")
print(f" Name: {p['name']}")
print(f" Description: {p['description']}")
print(f" Usage: {p['usage_count']} times, {p['success_rate']:.0%} success")
if p['last_used']:
print(f" Last used: {p['last_used']}")
print()
return 0
elif parsed.command == "activate":
if provider.activate_profile(parsed.profile_id):
print(f"Activated profile: {parsed.profile_id}")
return 0
else:
print(f"Profile {parsed.profile_id} not found")
return 1
elif parsed.command == "deactivate":
provider.deactivate_profile()
print("Deactivated current profile")
return 0
elif parsed.command == "context":
context = provider.get_session_context(parsed.message)
if not context:
print("No active warm profile")
return 0
print(f"\n=== Warm Context: {context.get('profile_name')} ===\n")
if context.get("system_extension"):
print("System Extension:")
print(context["system_extension"][:500])
print()
examples = context.get("example_messages", [])
if examples:
print(f"Example messages: {len(examples)}")
for i in range(0, len(examples), 3):
if i + 2 < len(examples):
user_msg = examples[i]
assistant_msg = examples[i+1]
tool_msg = examples[i+2]
print(f" Example {i//3 + 1}:")
print(f" User: {user_msg.get('content', '')}")
print(f" Assistant: {assistant_msg.get('content', '')}")
if assistant_msg.get("tool_calls"):
for tc in assistant_msg["tool_calls"]:
func = tc.get("function", {})
print(f" Tool: {func.get('name')}()")
print(f" Result: {tool_msg.get('content', '')[:50]}...")
if parsed.message:
print(f"\nUser message: {parsed.message}")
return 0
elif parsed.command == "delete":
if provider.delete_profile(parsed.profile_id):
print(f"Deleted profile: {parsed.profile_id}")
return 0
else:
print(f"Profile {parsed.profile_id} not found")
return 1
return 1
if __name__ == "__main__":
import sys
sys.exit(warm_provider_cli(sys.argv[1:]))