Compare commits

..

3 Commits

Author SHA1 Message Date
Timmy Time
9919114541 Fix #372: Runtime-aware cron prompts with provider mismatch detection
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m18s
When cron jobs run on cloud providers (Nous, OpenRouter), prompts
written for local Ollama fail because they assume SSH or localhost.

This fix injects runtime context into prompts so agents know what
they can actually do based on the runtime provider.

Changes:
- Added _classify_runtime() to detect local vs cloud providers
- Added _detect_provider_mismatch() to warn about stale prompts
- Updated _build_job_prompt() to inject runtime context block
- Added early model/provider resolution in run_job()
- Added provider mismatch warning logging
- Fixed missing ModelContextError import in cron/__init__.py
- Added 8 tests for runtime classification and prompt building

Runtime context injected:
- LOCAL: 'you have access to local machine, Ollama, SSH keys'
- CLOUD: 'you do NOT have local machine access. Do NOT assume SSH...'

Fixes #372
2026-04-13 21:49:00 -04:00
954fd992eb Merge pull request 'perf: lazy session creation — defer DB write until first message (#314)' (#449) from whip/314-1776127532 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 55s
Forge CI / smoke-and-build (pull_request) Failing after 1m12s
perf: lazy session creation (#314)

Closes #314.
2026-04-14 01:08:13 +00:00
Metatron
f35f56e397 perf: lazy session creation — defer DB write until first message (closes #314)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 56s
Remove eager create_session() call from AIAgent.__init__(). Sessions
are now created lazily on first _flush_messages_to_session_db() call
via ensure_session() which uses INSERT OR IGNORE.

Impact: eliminates 32.4% of sessions (3,564 of 10,985) that were
created at agent init but never received any messages.

The existing ensure_session() fallback in _flush_messages_to_session_db()
already handles this pattern — it was originally designed for recovery
after transient SQLite lock failures. Now it's the primary creation path.

Compression-initiated sessions still use create_session() directly
(line ~5995) since they have messages to write immediately.
2026-04-13 20:52:06 -04:00
7 changed files with 204 additions and 978 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -545,8 +545,75 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
# ---------------------------------------------------------------------------
# Runtime classification & provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown'."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
return "cloud"
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return stale provider group referenced in prompt, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -576,6 +643,33 @@ def _build_job_prompt(job: dict) -> str:
f"{prompt}"
)
# Runtime context injection — tells the agent what it can actually do.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (
@@ -595,9 +689,9 @@ def _build_job_prompt(job: dict) -> str:
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
"\\\"[SCRIPT_FAILED]: script exited with code 1\\\".]\\\\n\\\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -667,7 +761,32 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# Early model/provider resolution for runtime context injection
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml as _y
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = _y.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -779,6 +898,17 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Provider mismatch warning
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will adapt via runtime context. Consider updating prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -63,16 +63,6 @@ def _looks_like_phone(value: str) -> bool:
return bool(_PHONE_RE.match(value.strip()))
from .config import (
# Session template manager for code-first seeding
try:
from tools.session_template_manager import SessionTemplateManager, TaskType
HAS_TEMPLATE_MANAGER = True
except ImportError:
HAS_TEMPLATE_MANAGER = False
SessionTemplateManager = None
TaskType = None
Platform,
GatewayConfig,
SessionResetPolicy, # noqa: F401 — re-exported via gateway/__init__.py
@@ -536,10 +526,6 @@ class SessionStore:
except Exception as e:
print(f"[gateway] Warning: SQLite session store unavailable, falling back to JSONL: {e}")
# Initialize session template manager
self._init_template_manager()
def _ensure_loaded(self) -> None:
"""Load sessions index from disk if not already loaded."""
with self._lock:
@@ -1093,112 +1079,3 @@ def build_session_context(
context.updated_at = session_entry.updated_at
return context
def _init_template_manager(self):
"""Initialize session template manager if available."""
if not HAS_TEMPLATE_MANAGER:
self.template_manager = None
return
try:
self.template_manager = SessionTemplateManager()
logger.info("Session template manager initialized")
except Exception as e:
logger.warning(f"Failed to initialize template manager: {e}")
self.template_manager = None
def inject_session_template(self, session_id: str, task_type: Optional[str] = None) -> bool:
"""
Inject a session template into a new session to establish feedback loops.
Args:
session_id: Session ID to inject template into
task_type: Optional task type (code, file, research, mixed). If None, defaults to code.
Returns:
True if template was injected, False otherwise
"""
if not self.template_manager:
return False
try:
# Get task type
if task_type:
try:
task_type_enum = TaskType(task_type)
except ValueError:
logger.warning(f"Invalid task type: {task_type}")
return False
else:
# Default to CODE since research shows it's most effective
task_type_enum = TaskType.CODE
# Get template for task type
template = self.template_manager.get_template_for_task(task_type_enum)
if not template:
logger.debug(f"No template found for task type: {task_type_enum.value}")
return False
# Note: Actual injection would happen when messages are loaded
# This is a placeholder for the integration point
logger.info(f"Template {template.template_id} available for session {session_id}")
return True
except Exception as e:
logger.error(f"Failed to inject template into session {session_id}: {e}")
return False
def list_session_templates(self, task_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List available session templates.
Args:
task_type: Optional task type filter
Returns:
List of template dictionaries
"""
if not self.template_manager:
return []
try:
task_type_enum = TaskType(task_type) if task_type else None
templates = self.template_manager.list_templates(task_type_enum)
return [t.to_dict() for t in templates]
except Exception as e:
logger.error(f"Failed to list templates: {e}")
return []
def create_session_template(self, session_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
max_calls: int = 10) -> Optional[str]:
"""
Create a session template from a completed session.
Args:
session_id: Session ID to create template from
name: Template name
description: Template description
max_calls: Maximum number of tool calls to extract
Returns:
Template ID if created, None otherwise
"""
if not self.template_manager:
return None
try:
template = self.template_manager.create_template_from_session(
session_id,
name=name,
description=description,
max_calls=max_calls
)
if template:
return template.template_id
return None
except Exception as e:
logger.error(f"Failed to create template from session {session_id}: {e}")
return None

View File

@@ -1001,30 +1001,10 @@ class AIAgent:
self._session_db = session_db
self._parent_session_id = parent_session_id
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
if self._session_db:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
model=self.model,
model_config={
"max_iterations": self.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
},
user_id=None,
parent_session_id=self._parent_session_id,
)
except Exception as e:
# Transient SQLite lock contention (e.g. CLI and gateway writing
# concurrently) must NOT permanently disable session_search for
# this agent. Keep _session_db alive — subsequent message
# flushes and session_search calls will still work once the
# lock clears. The session row may be missing from the index
# for this run, but that is recoverable (flushes upsert rows).
logger.warning(
"Session DB create_session failed (session_search still available): %s", e
)
# Lazy session creation: defer until first message flush (#314).
# _flush_messages_to_session_db() calls ensure_session() which uses
# INSERT OR IGNORE — creating the row only when messages arrive.
# This eliminates 32% of sessions that are created but never used.
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore

View File

@@ -0,0 +1,64 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
class TestDetectProviderMismatch:
def test_detects_ollama_reference_on_cloud(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_no_mismatch_when_prompt_matches(self):
assert _detect_provider_mismatch("Check Nous model", "nous") is None
class TestBuildJobPrompt:
def test_includes_runtime_context_for_cloud(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
assert "RUNTIME: cloud API" in prompt
def test_includes_runtime_context_for_local(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="qwen2.5:7b", runtime_provider="ollama")
assert "RUNTIME: local" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -1,316 +0,0 @@
"""
Test session template manager functionality.
"""
import json
import pytest
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from tools.session_template_manager import (
SessionTemplateManager,
SessionTemplate,
ToolCallTemplate,
TaskType
)
class TestSessionTemplateManager:
"""Test session template manager."""
def test_classify_task_type_code(self):
"""Test task type classification for code-heavy sessions."""
manager = SessionTemplateManager()
tool_calls = [
{"tool_name": "execute_code"},
{"tool_name": "execute_code"},
{"tool_name": "execute_code"},
{"tool_name": "read_file"},
]
task_type = manager.classify_task_type(tool_calls)
assert task_type == TaskType.CODE
def test_classify_task_type_file(self):
"""Test task type classification for file-heavy sessions."""
manager = SessionTemplateManager()
tool_calls = [
{"tool_name": "read_file"},
{"tool_name": "write_file"},
{"tool_name": "patch"},
{"tool_name": "search_files"},
]
task_type = manager.classify_task_type(tool_calls)
assert task_type == TaskType.FILE
def test_classify_task_type_research(self):
"""Test task type classification for research-heavy sessions."""
manager = SessionTemplateManager()
tool_calls = [
{"tool_name": "web_search"},
{"tool_name": "web_fetch"},
{"tool_name": "browser_navigate"},
]
task_type = manager.classify_task_type(tool_calls)
assert task_type == TaskType.RESEARCH
def test_classify_task_type_mixed(self):
"""Test task type classification for mixed sessions."""
manager = SessionTemplateManager()
tool_calls = [
{"tool_name": "execute_code"},
{"tool_name": "read_file"},
{"tool_name": "web_search"},
]
task_type = manager.classify_task_type(tool_calls)
assert task_type == TaskType.MIXED
def test_template_creation(self):
"""Test creating a template."""
with tempfile.TemporaryDirectory() as tmpdir:
template_dir = Path(tmpdir)
manager = SessionTemplateManager(template_dir=template_dir)
# Create a mock template
tool_calls = [
ToolCallTemplate(
tool_name="execute_code",
arguments={"code": "print('hello')"},
result="hello",
success=True,
execution_time=0.1,
turn_number=0
)
]
template = SessionTemplate(
template_id="test_template",
task_type=TaskType.CODE,
name="Test Template",
description="A test template",
tool_calls=tool_calls,
source_session_id=None,
created_at=1234567890.0,
success_rate=1.0,
usage_count=0
)
manager.templates["test_template"] = template
manager._save_template(template)
# Verify template was saved
template_file = template_dir / "test_template.json"
assert template_file.exists()
# Verify template can be loaded
with open(template_file) as f:
data = json.load(f)
assert data["template_id"] == "test_template"
assert data["task_type"] == "code"
def test_template_injection(self):
"""Test injecting a template into messages."""
with tempfile.TemporaryDirectory() as tmpdir:
template_dir = Path(tmpdir)
manager = SessionTemplateManager(template_dir=template_dir)
# Create a mock template
tool_calls = [
ToolCallTemplate(
tool_name="execute_code",
arguments={"code": "print('hello')"},
result="hello",
success=True,
execution_time=0.1,
turn_number=0
)
]
template = SessionTemplate(
template_id="test_template",
task_type=TaskType.CODE,
name="Test Template",
description="A test template",
tool_calls=tool_calls,
source_session_id=None,
created_at=1234567890.0,
success_rate=1.0,
usage_count=0
)
manager.templates["test_template"] = template
# Test message injection
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"}
]
updated_messages = manager.inject_template_into_messages(template, messages)
# Verify template was injected
assert len(updated_messages) > len(messages)
assert any("Session template loaded" in str(msg.get("content", ""))
for msg in updated_messages)
# Verify usage count was updated
assert template.usage_count == 1
def test_get_template_for_task(self):
"""Test getting template for task type."""
with tempfile.TemporaryDirectory() as tmpdir:
template_dir = Path(tmpdir)
manager = SessionTemplateManager(template_dir=template_dir)
# Create templates for different task types
code_template = SessionTemplate(
template_id="code_template",
task_type=TaskType.CODE,
name="Code Template",
description="A code template",
tool_calls=[],
source_session_id=None,
created_at=1234567890.0,
success_rate=1.0,
usage_count=0
)
file_template = SessionTemplate(
template_id="file_template",
task_type=TaskType.FILE,
name="File Template",
description="A file template",
tool_calls=[],
source_session_id=None,
created_at=1234567891.0,
success_rate=0.9,
usage_count=5
)
manager.templates["code_template"] = code_template
manager.templates["file_template"] = file_template
# Test getting code template
template = manager.get_template_for_task(TaskType.CODE)
assert template is not None
assert template.template_id == "code_template"
# Test getting file template
template = manager.get_template_for_task(TaskType.FILE)
assert template is not None
assert template.template_id == "file_template"
# Test getting non-existent template
template = manager.get_template_for_task(TaskType.RESEARCH)
assert template is None
class TestToolCallTemplate:
"""Test tool call template."""
def test_to_dict(self):
"""Test converting to dictionary."""
template = ToolCallTemplate(
tool_name="execute_code",
arguments={"code": "print('hello')"},
result="hello",
success=True,
execution_time=0.1,
turn_number=0
)
data = template.to_dict()
assert data["tool_name"] == "execute_code"
assert data["arguments"] == {"code": "print('hello')"}
assert data["result"] == "hello"
assert data["success"] is True
def test_from_dict(self):
"""Test creating from dictionary."""
data = {
"tool_name": "execute_code",
"arguments": {"code": "print('hello')"},
"result": "hello",
"success": True,
"execution_time": 0.1,
"turn_number": 0
}
template = ToolCallTemplate.from_dict(data)
assert template.tool_name == "execute_code"
assert template.arguments == {"code": "print('hello')"}
assert template.result == "hello"
class TestSessionTemplate:
"""Test session template."""
def test_to_dict(self):
"""Test converting to dictionary."""
tool_calls = [
ToolCallTemplate(
tool_name="execute_code",
arguments={"code": "print('hello')"},
result="hello",
success=True,
execution_time=0.1,
turn_number=0
)
]
template = SessionTemplate(
template_id="test_template",
task_type=TaskType.CODE,
name="Test Template",
description="A test template",
tool_calls=tool_calls,
source_session_id=None,
created_at=1234567890.0,
success_rate=1.0,
usage_count=0
)
data = template.to_dict()
assert data["template_id"] == "test_template"
assert data["task_type"] == "code"
assert len(data["tool_calls"]) == 1
def test_from_dict(self):
"""Test creating from dictionary."""
data = {
"template_id": "test_template",
"task_type": "code",
"name": "Test Template",
"description": "A test template",
"tool_calls": [
{
"tool_name": "execute_code",
"arguments": {"code": "print('hello')"},
"result": "hello",
"success": True,
"execution_time": 0.1,
"turn_number": 0
}
],
"source_session_id": None,
"created_at": 1234567890.0,
"success_rate": 1.0,
"usage_count": 0
}
template = SessionTemplate.from_dict(data)
assert template.template_id == "test_template"
assert template.task_type == TaskType.CODE
assert len(template.tool_calls) == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,507 +0,0 @@
"""
Session Template Manager for Hermes Agent.
Extracts successful tool calls from completed sessions and creates templates
that can be injected into new sessions to establish feedback loops early.
Based on research finding: code-heavy sessions (execute_code dominant in first
30 turns) improve over time. File-heavy sessions degrade. The key is
deterministic feedback loops, not arbitrary context.
"""
import json
import logging
import os
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
# Default template directory
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
"""Task type classification for session templates."""
CODE = "code"
FILE = "file"
RESEARCH = "research"
MIXED = "mixed"
@dataclass
class ToolCallTemplate:
"""A single tool call template extracted from a successful session."""
tool_name: str
arguments: Dict[str, Any]
result: str
success: bool
execution_time: float
turn_number: int
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallTemplate':
"""Create from dictionary."""
return cls(**data)
@dataclass
class SessionTemplate:
"""A complete session template with multiple tool calls."""
template_id: str
task_type: TaskType
name: str
description: str
tool_calls: List[ToolCallTemplate]
source_session_id: Optional[str]
created_at: float
success_rate: float
usage_count: int
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
data = asdict(self)
data['task_type'] = self.task_type.value
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
"""Create from dictionary."""
data['task_type'] = TaskType(data['task_type'])
return cls(**data)
class SessionTemplateManager:
"""Manages session templates for seeding new sessions."""
def __init__(self, template_dir: Optional[Path] = None, db_path: Optional[Path] = None):
"""
Initialize the session template manager.
Args:
template_dir: Directory to store templates (default: ~/.hermes/session-templates/)
db_path: Path to the session database (default: ~/.hermes/state.db)
"""
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
self.db_path = db_path or Path.home() / ".hermes" / "state.db"
# Ensure template directory exists
self.template_dir.mkdir(parents=True, exist_ok=True)
# Load existing templates
self.templates: Dict[str, SessionTemplate] = {}
self._load_templates()
def _load_templates(self):
"""Load all templates from the template directory."""
for template_file in self.template_dir.glob("*.json"):
try:
with open(template_file, 'r') as f:
data = json.load(f)
template = SessionTemplate.from_dict(data)
self.templates[template.template_id] = template
except Exception as e:
logger.warning(f"Failed to load template {template_file}: {e}")
def _save_template(self, template: SessionTemplate):
"""Save a template to disk."""
template_file = self.template_dir / f"{template.template_id}.json"
with open(template_file, 'w') as f:
json.dump(template.to_dict(), f, indent=2)
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
"""
Classify the task type based on tool calls.
Args:
tool_calls: List of tool calls from a session
Returns:
TaskType classification
"""
if not tool_calls:
return TaskType.MIXED
# Count tool types
code_tools = {'execute_code', 'code_execution'}
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
code_count = sum(1 for tc in tool_calls if tc.get('tool_name') in code_tools)
file_count = sum(1 for tc in tool_calls if tc.get('tool_name') in file_tools)
research_count = sum(1 for tc in tool_calls if tc.get('tool_name') in research_tools)
total = len(tool_calls)
if total == 0:
return TaskType.MIXED
# Determine dominant type
code_ratio = code_count / total
file_ratio = file_count / total
research_ratio = research_count / total
if code_ratio > 0.6:
return TaskType.CODE
elif file_ratio > 0.6:
return TaskType.FILE
elif research_ratio > 0.6:
return TaskType.RESEARCH
else:
return TaskType.MIXED
def extract_successful_tool_calls(self, session_id: str, max_calls: int = 10) -> List[ToolCallTemplate]:
"""
Extract successful tool calls from a completed session.
Args:
session_id: Session ID to extract from
max_calls: Maximum number of tool calls to extract
Returns:
List of ToolCallTemplate objects
"""
if not self.db_path.exists():
logger.warning(f"Session database not found: {self.db_path}")
return []
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
# Get messages for the session
cursor = conn.execute("""
SELECT role, content, tool_calls, tool_name, timestamp
FROM messages
WHERE session_id = ?
ORDER BY timestamp
""", (session_id,))
messages = cursor.fetchall()
conn.close()
# Extract tool calls
tool_call_templates = []
turn_number = 0
for msg in messages:
if msg['role'] == 'assistant' and msg['tool_calls']:
try:
tool_calls = json.loads(msg['tool_calls'])
for tc in tool_calls:
if len(tool_call_templates) >= max_calls:
break
tool_name = tc.get('function', {}).get('name')
if not tool_name:
continue
# Parse arguments
try:
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
except:
arguments = {}
# Create template (result will be filled from tool response)
template = ToolCallTemplate(
tool_name=tool_name,
arguments=arguments,
result="", # Will be filled from tool response
success=True, # Assume successful if we got a response
execution_time=0.0, # Not tracked in current schema
turn_number=turn_number
)
tool_call_templates.append(template)
turn_number += 1
except json.JSONDecodeError:
continue
elif msg['role'] == 'tool' and tool_call_templates:
# Fill in the result for the last tool call
if tool_call_templates[-1].result == "":
tool_call_templates[-1].result = msg['content'] or ""
return tool_call_templates
except Exception as e:
logger.error(f"Failed to extract tool calls from session {session_id}: {e}")
return []
def create_template_from_session(self, session_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
max_calls: int = 10) -> Optional[SessionTemplate]:
"""
Create a session template from a completed session.
Args:
session_id: Session ID to create template from
name: Template name (auto-generated if None)
description: Template description (auto-generated if None)
max_calls: Maximum number of tool calls to include
Returns:
SessionTemplate object or None if failed
"""
# Extract tool calls
tool_calls = self.extract_successful_tool_calls(session_id, max_calls)
if not tool_calls:
logger.warning(f"No successful tool calls found in session {session_id}")
return None
# Classify task type
task_type = self.classify_task_type([tc.to_dict() for tc in tool_calls])
# Generate template ID
template_id = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
# Auto-generate name and description if not provided
if not name:
name = f"{task_type.value.title()} Template from {session_id[:8]}"
if not description:
tool_names = [tc.tool_name for tc in tool_calls]
description = f"Template with {len(tool_calls)} successful tool calls: {', '.join(tool_names[:3])}"
# Create template
template = SessionTemplate(
template_id=template_id,
task_type=task_type,
name=name,
description=description,
tool_calls=tool_calls,
source_session_id=session_id,
created_at=time.time(),
success_rate=1.0, # All extracted calls were successful
usage_count=0
)
# Save template
self.templates[template_id] = template
self._save_template(template)
logger.info(f"Created template {template_id} from session {session_id}")
return template
def get_template_for_task(self, task_type: TaskType) -> Optional[SessionTemplate]:
"""
Get the best template for a given task type.
Args:
task_type: Type of task
Returns:
Best matching SessionTemplate or None
"""
matching_templates = [
t for t in self.templates.values()
if t.task_type == task_type
]
if not matching_templates:
return None
# Sort by success rate and usage count
matching_templates.sort(
key=lambda t: (t.success_rate, -t.usage_count),
reverse=True
)
return matching_templates[0]
def inject_template_into_messages(self, template: SessionTemplate,
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Inject a template into a list of messages for a new session.
Args:
template: Template to inject
messages: Existing messages list
Returns:
Modified messages list with template injected
"""
if not template.tool_calls:
return messages
# Create template injection messages
template_messages = []
# Add system message about template
template_messages.append({
"role": "system",
"content": f"Session template loaded: {template.name}\n"
f"Task type: {template.task_type.value}\n"
f"This template contains {len(template.tool_calls)} successful tool calls "
f"to establish a feedback loop early."
})
# Add tool calls and results from template
for i, tool_call in enumerate(template.tool_calls):
# Add assistant message with tool call
template_messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"template_{template.template_id}_{i}",
"type": "function",
"function": {
"name": tool_call.tool_name,
"arguments": json.dumps(tool_call.arguments)
}
}]
})
# Add tool response
template_messages.append({
"role": "tool",
"tool_call_id": f"template_{template.template_id}_{i}",
"content": tool_call.result
})
# Insert template messages at the beginning (after any existing system messages)
insert_index = 0
for i, msg in enumerate(messages):
if msg.get("role") != "system":
break
insert_index = i + 1
# Insert template messages
for i, msg in enumerate(template_messages):
messages.insert(insert_index + i, msg)
# Update template usage count
template.usage_count += 1
self._save_template(template)
return messages
def list_templates(self, task_type: Optional[TaskType] = None) -> List[SessionTemplate]:
"""
List all templates, optionally filtered by task type.
Args:
task_type: Optional task type filter
Returns:
List of SessionTemplate objects
"""
templates = list(self.templates.values())
if task_type:
templates = [t for t in templates if t.task_type == task_type]
# Sort by creation time (newest first)
templates.sort(key=lambda t: t.created_at, reverse=True)
return templates
def delete_template(self, template_id: str) -> bool:
"""
Delete a template.
Args:
template_id: ID of template to delete
Returns:
True if deleted, False if not found
"""
if template_id not in self.templates:
return False
# Remove from memory
del self.templates[template_id]
# Remove from disk
template_file = self.template_dir / f"{template_id}.json"
if template_file.exists():
template_file.unlink()
logger.info(f"Deleted template {template_id}")
return True
# CLI interface for template management
def main():
"""CLI interface for session template management."""
import argparse
parser = argparse.ArgumentParser(description="Session Template Manager")
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
# Create template from session
create_parser = subparsers.add_parser("create", help="Create template from session")
create_parser.add_argument("session_id", help="Session ID to create template from")
create_parser.add_argument("--name", help="Template name")
create_parser.add_argument("--description", help="Template description")
create_parser.add_argument("--max-calls", type=int, default=10, help="Max tool calls to extract")
# List templates
list_parser = subparsers.add_parser("list", help="List templates")
list_parser.add_argument("--type", choices=["code", "file", "research", "mixed"],
help="Filter by task type")
# Delete template
delete_parser = subparsers.add_parser("delete", help="Delete template")
delete_parser.add_argument("template_id", help="Template ID to delete")
args = parser.parse_args()
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create template manager
manager = SessionTemplateManager()
if args.command == "create":
template = manager.create_template_from_session(
args.session_id,
name=args.name,
description=args.description,
max_calls=args.max_calls
)
if template:
print(f"Created template: {template.template_id}")
print(f" Name: {template.name}")
print(f" Type: {template.task_type.value}")
print(f" Tool calls: {len(template.tool_calls)}")
else:
print("Failed to create template")
elif args.command == "list":
task_type = TaskType(args.type) if args.type else None
templates = manager.list_templates(task_type)
if not templates:
print("No templates found")
return
print(f"Found {len(templates)} templates:")
for t in templates:
print(f" {t.template_id}: {t.name}")
print(f" Type: {t.task_type.value}")
print(f" Tool calls: {len(t.tool_calls)}")
print(f" Usage: {t.usage_count}")
print(f" Created: {datetime.fromtimestamp(t.created_at).isoformat()}")
print()
elif args.command == "delete":
if manager.delete_template(args.template_id):
print(f"Deleted template: {args.template_id}")
else:
print(f"Template not found: {args.template_id}")
else:
parser.print_help()
if __name__ == "__main__":
main()